Commit cc6a319a authored by Leo Gordon's avatar Leo Gordon
Browse files

EXPERIMENTAL sqlite support in eHive

parent fac7096e
......@@ -36,7 +36,7 @@ sub fetch_by_logic_name_or_url {
my $self = shift @_;
my $logic_name_or_url = shift @_;
if($logic_name_or_url =~ m{^\w+://}) {
if($logic_name_or_url =~ m{^\w*://}) {
return Bio::EnsEMBL::Hive::URLFactory->fetch($logic_name_or_url, $self->db);
} else {
return $self->fetch_by_logic_name($logic_name_or_url);
......
......@@ -71,7 +71,7 @@ sub store {
my $sth2 = $self->prepare("INSERT INTO analysis_data (data) VALUES (?)");
$sth2->execute($data);
$data_id = $sth2->{'mysql_insertid'};
$data_id = ($self->dbc->driver eq 'sqlite') ? $self->dbc->db_handle->func('last_insert_rowid') : $sth->{'mysql_insertid'};
$sth2->finish;
return $data_id;
......
......@@ -99,16 +99,20 @@ sub CreateNewJob {
$input_id = "_ext_input_analysis_data_id $input_data_id";
}
my $sql = q{INSERT ignore into job
my $dbc = $analysis->adaptor->db->dbc;
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
$insertion_method = 'INSERT'; # we are expecting this to fire
my $status = $blocked ? 'BLOCKED' : 'READY';
my $sql = qq{$insertion_method INTO job
(input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?,?,?)};
my $status = $blocked ? 'BLOCKED' : 'READY';
my $dbc = $analysis->adaptor->db->dbc;
my $sth = $dbc->prepare($sql);
$sth->execute($input_id, $prev_job_id, $analysis->dbID, $status, $semaphore_count, $semaphored_job_id);
my $job_id = $sth->{'mysql_insertid'};
$sth->execute($input_id, $prev_job_id, $analysis->dbID, $status, $semaphore_count || 0, $semaphored_job_id);
my $job_id = ($dbc->driver eq 'sqlite') ? $dbc->db_handle->func('last_insert_rowid') : $sth->{'mysql_insertid'};
$sth->finish;
$dbc->do("UPDATE analysis_stats SET ".
......@@ -399,11 +403,11 @@ sub update_status {
my $sql = "UPDATE job SET status='".$job->status."' ";
if($job->status eq 'DONE') {
$sql .= ",completed=now()";
$sql .= ",completed=CURRENT_TIMESTAMP";
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
} elsif($job->status eq 'PASSED_ON') {
$sql .= ", completed=now()";
$sql .= ", completed=CURRENT_TIMESTAMP";
} elsif($job->status eq 'READY') {
}
......@@ -450,7 +454,9 @@ sub store_out_files {
$self->dbc->do($sql);
return unless($job->stdout_file or $job->stderr_file);
$sql = "INSERT ignore INTO job_file (job_id, worker_id, retry, type, path) VALUES ";
my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
$sql = "$insertion_method INTO job_file (job_id, worker_id, retry, type, path) VALUES ";
if($job->stdout_file) {
$sql .= sprintf("(%d,%d,%d,'STDOUT','%s')", $job->dbID, $job->worker_id,
$job->retry_count, $job->stdout_file);
......@@ -485,23 +491,22 @@ sub grab_jobs_for_worker {
my $analysis_id = $worker->analysis->dbID();
my $worker_id = $worker->dbID();
my $batch_size = $worker->batch_size();
my $sql_base = qq{
UPDATE job
SET worker_id='$worker_id', status='CLAIMED'
WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0
};
my $update_sql = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
my $selection_start_sql = " WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0";
my $sql_virgin = $sql_base .
" AND retry_count=0".
" LIMIT " . $worker->batch_size;
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $batch_size";
my $any_selection_sql = $selection_start_sql . " LIMIT $batch_size";
my $sql_any = $sql_base .
" LIMIT " . $worker->batch_size;
my $claim_count = $self->dbc->do($sql_virgin);
if($claim_count == 0) {
$claim_count = $self->dbc->do($sql_any);
if($self->dbc->driver eq 'sqlite') {
unless(my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) {
$claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
}
} else {
unless(my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) {
$claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
}
}
my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
......@@ -607,7 +612,7 @@ sub release_and_age_job {
#
$self->dbc->do( qq{
UPDATE job
SET status=IF( $may_retry AND (retry_count<$max_retry_count), 'READY', 'FAILED'), retry_count=retry_count+1
SET status=(CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END), retry_count=retry_count+1
WHERE job_id=$job_id
AND status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
} );
......
......@@ -209,7 +209,7 @@ sub update {
$sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
$sql .= ",num_running_workers=" . $stats->num_running_workers();
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",last_update=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'";
$sql .= ",rc_id=". $stats->rc_id();
$sql .= ",can_be_empty=". $stats->can_be_empty();
......@@ -218,7 +218,7 @@ sub update {
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
$sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
$sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
$sth->execute();
$sth->finish;
$stats->seconds_since_last_update(0); #not exact but good enough :)
......@@ -285,7 +285,7 @@ sub decrease_hive_capacity
my $sql = "UPDATE analysis_stats ".
" SET hive_capacity = hive_capacity - 1, ".
" num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ".
" num_required_workers = (CASE WHEN num_required_workers > 0 THEN num_required_workers - 1 ELSE 0 END) ".
" WHERE analysis_id='$analysis_id' and hive_capacity > 1";
$self->dbc->do($sql);
......@@ -469,7 +469,10 @@ sub _columns {
ast.rc_id
ast.can_be_empty
);
push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
push @columns , ($self->dbc->driver eq 'sqlite')
? "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update "
: "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
return @columns;
}
......@@ -534,7 +537,9 @@ sub _create_new_for_analysis_id {
my $sql;
$sql = "INSERT ignore INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
$sql = "$insertion_method INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute();
......
......@@ -90,26 +90,37 @@ sub _table_info_loader {
my $dbc = $self->dbc();
my $dbname = $dbc->dbname();
my $driver = $dbc->driver();
my $table_name = $self->table_name();
my %column_set = ();
my @primary_key = ();
my $autoinc_id = '';
my $sql = "SELECT column_name,column_key,extra FROM information_schema.columns WHERE table_schema='$dbname' and table_name='$table_name'";
my $sql = {
'mysql' => "SELECT column_name AS name, column_key='PRI' AS pk, extra='auto_increment' AS ai FROM information_schema.columns WHERE table_schema='$dbname' and table_name='$table_name'",
'sqlite'=> "PRAGMA table_info('$table_name')",
}->{$driver} or die "could not find column info for driver='$driver'";
my $sth = $self->prepare($sql);
$sth->execute;
while(my ($column_name, $column_key, $extra) = $sth->fetchrow ) {
$sth->execute;
while(my $row = $sth->fetchrow_hashref ) {
my $column_name = $row->{'name'};
$column_set{$column_name} = 1;
if($column_key eq 'PRI') {
if($row->{'pk'}) {
push @primary_key, $column_name;
if($extra eq 'auto_increment') {
if($row->{'ai'}) {
$autoinc_id = $column_name;
}
}
}
$sth->finish;
if(($driver eq 'sqlite') and scalar(@primary_key)==1) {
$autoinc_id = $primary_key[0];
}
$self->column_set( \%column_set );
$self->primary_key( \@primary_key );
$self->autoinc_id( $autoinc_id );
......@@ -275,8 +286,12 @@ sub store {
my $table_name = $self->table_name();
my $column_set = $self->column_set();
my $autoinc_id = $self->autoinc_id();
my $driver = $self->dbc->driver();
my $insertion_method = $self->insertion_method; # INSERT, INSERT_IGNORE or REPLACE
$insertion_method =~ s/_/ /g;
if($driver eq 'sqlite') {
$insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
}
# NB: here we assume all hashes will have the same keys:
my $non_autoinc_columns = [ grep { $_ ne $autoinc_id } keys %$column_set ];
......@@ -289,7 +304,7 @@ sub store {
if($check_presence_in_db_first and my $present = $self->check_object_present_in_db($object)) {
$self->mark_stored($object, $present);
} else {
#print "STORE: $sql\n";
# print "STORE: $sql\n";
$sth ||= $self->prepare( $sql ); # only prepare (once) if we get here
#print "NON_AUTOINC_COLUMNS: ".join(', ', @$non_autoinc_columns)."\n";
......@@ -300,7 +315,7 @@ sub store {
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
or die "Could not perform\n\t$sql\nwith data:\n\t(".join(',', @$non_autoinc_values).')';
if($return_code > 0) { # <--- for the same reason we have to be expliticly numeric here
$self->mark_stored($object, $sth->{'mysql_insertid'});
$self->mark_stored($object, ($driver eq 'sqlite') ? $self->dbc->db_handle->func('last_insert_rowid') : $sth->{'mysql_insertid'});
}
}
}
......
......@@ -90,17 +90,26 @@ sub Bio::EnsEMBL::Analysis::process {
=cut
sub Bio::EnsEMBL::DBSQL::DBConnection::url
{
sub Bio::EnsEMBL::DBSQL::DBConnection::url {
my $self = shift;
return undef unless($self->host and $self->port and $self->dbname);
my $url = "mysql://";
return undef unless($self->driver and $self->dbname);
my $url = $self->driver . '://';
if($self->username) {
$url .= $self->username;
$url .= ":".$self->password if($self->password);
$url .= "@";
}
$url .= $self->host .":". $self->port ."/" . $self->dbname;
if($self->host) {
$url .= $self->host;
if($self->port) {
$url .= ':'.$self->port;
}
}
$url .= '/' . $self->dbname;
return $url;
}
......@@ -118,15 +127,12 @@ sub Bio::EnsEMBL::DBSQL::DBConnection::url
=cut
sub Bio::EnsEMBL::Analysis::url
{
sub Bio::EnsEMBL::Analysis::url {
my $self = shift;
my $url;
return undef unless($self->adaptor);
$url = $self->adaptor->db->dbc->url;
$url .= "/analysis?logic_name=" . $self->logic_name;
return $url;
return $self->adaptor->db->dbc->url . '/analysis?logic_name=' . $self->logic_name;
}
......
......@@ -69,7 +69,7 @@ sub url {
my $ref_dba = shift @_; # if reference dba is the same as 'our' dba, a shorter url can be generated
if(my $adaptor = $self->adaptor) {
my $conn_prefix = ($adaptor->db == $ref_dba) ? 'mysql:///' : $adaptor->db->dbc->url();
my $conn_prefix = ($adaptor->db == $ref_dba) ? ':///' : $adaptor->db->dbc->url();
return $conn_prefix .'/'. $self->table_name() . '?insertion_method=' . $self->insertion_method();
} else {
return;
......
......@@ -52,8 +52,12 @@ use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
our ($hive_default_driver) = 'mysql';
# ---------------------------[the following methods will be overridden by specific pipelines]-------------------------
=head2 default_options
Description : Interface method that should return a hash of option_name->default_option_value pairs.
......@@ -74,10 +78,12 @@ sub default_options {
-user => 'ensadmin',
-pass => $self->o('password'),
-dbname => $ENV{'USER'}.'_'.$self->o('pipeline_name'), # example of a linked definition (resolved via saturation)
-driver => $hive_default_driver,
},
};
}
=head2 pipeline_create_commands
Description : Interface method that should return a list of command lines to be run in order to create and set up the pipeline database.
......@@ -86,17 +92,24 @@ sub default_options {
=cut
sub pipeline_create_commands {
my ($self) = @_;
return [
'mysql '.$self->dbconn_2_mysql('pipeline_db', 0)." -e 'CREATE DATABASE ".$self->o('pipeline_db', '-dbname')."'",
my $self = shift @_;
# standard eHive tables, foreign_keys and procedures:
'mysql '.$self->dbconn_2_mysql('pipeline_db', 1).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sql',
'mysql '.$self->dbconn_2_mysql('pipeline_db', 1).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/foreign_keys.sql',
'mysql '.$self->dbconn_2_mysql('pipeline_db', 1).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.sql',
];
return ($hive_default_driver eq 'sqlite')
? [
# standard eHive tables and unique/non-unique indices:
$self->db_connect_command('pipeline_db').' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sqlite',
]
: [
'mysql '.$self->dbconn_2_mysql('pipeline_db', 0)." -e 'CREATE DATABASE ".$self->o('pipeline_db', '-dbname')."'",
# standard eHive tables, foreign_keys and procedures:
$self->db_connect_command('pipeline_db').' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sql',
$self->db_connect_command('pipeline_db').' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/foreign_keys.sql',
$self->db_connect_command('pipeline_db').' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.sql',
];
}
=head2 pipeline_wide_parameters
Description : Interface method that should return a hash of pipeline_wide_parameter_name->pipeline_wide_parameter_value pairs.
......@@ -112,6 +125,7 @@ sub pipeline_wide_parameters {
};
}
=head2 resource_classes
Description : Interface method that should return a hash of resource_description_id->resource_description_hash.
......@@ -127,6 +141,7 @@ sub resource_classes {
};
}
=head2 pipeline_analyses
Description : Interface method that should return a list of hashes that define analysis bundled with corresponding jobs, dataflow and analysis_ctrl rules and resource_id.
......@@ -160,6 +175,7 @@ sub new {
return $self;
}
=head2 o
Description : This is the method you call in the interface methods when you need to substitute an option: $self->o('password') .
......@@ -187,6 +203,7 @@ sub o { # descends the option hash structure (vivifying all enco
return $value;
}
=head2 dbconn_2_mysql
Description : A convenience method used to stringify a connection-parameters hash into a parameter string that both mysql and beekeeper.pl can understand
......@@ -203,6 +220,37 @@ sub dbconn_2_mysql { # will save you a lot of typing
.($with_db ? ($self->o($db_conn,'-dbname').' ') : '');
}
=head2 db_connect_command
Description : A convenience method used to stringify a command to connect to the db OR pipe an sql file into it.
=cut
sub db_connect_command {
my ($self, $db_conn) = @_;
return ($hive_default_driver eq 'sqlite')
? 'sqlite3 '.$self->o($db_conn, '-dbname')
: 'mysql '.$self->dbconn_2_mysql($db_conn, 1);
}
=head2 db_execute_command
Description : A convenience method used to stringify a command to connect to the db OR pipe an sql file into it.
=cut
sub db_execute_command {
my ($self, $db_conn, $sql_command) = @_;
return ($hive_default_driver eq 'sqlite')
? 'sqlite3 '.$self->o($db_conn, '-dbname')." '$sql_command'"
: 'mysql '.$self->dbconn_2_mysql($db_conn, 1)." -e '$sql_command'";
}
=head2 dbconn_2_url
Description : A convenience method used to stringify a connection-parameters hash into a 'url' that beekeeper.pl will undestand
......@@ -212,9 +260,12 @@ sub dbconn_2_mysql { # will save you a lot of typing
sub dbconn_2_url {
my ($self, $db_conn) = @_;
return 'mysql://'.$self->o($db_conn,'-user').':'.$self->o($db_conn,'-pass').'@'.$self->o($db_conn,'-host').':'.$self->o($db_conn,'-port').'/'.$self->o($db_conn,'-dbname');
return ($hive_default_driver eq 'sqlite')
? $self->o($db_conn, '-driver').':///'.$self->o($db_conn,'-dbname')
: $self->o($db_conn, '-driver').'://'.$self->o($db_conn,'-user').':'.$self->o($db_conn,'-pass').'@'.$self->o($db_conn,'-host').':'.$self->o($db_conn,'-port').'/'.$self->o($db_conn,'-dbname');
}
=head2 process_options
Description : The method that does all the parameter parsing magic.
......@@ -460,7 +511,7 @@ sub run {
print " beekeeper.pl -url $url -run\t\t# (run one step of the pipeline - useful for debugging/learning)\n";
print "\n\n\tTo connect to your pipeline database use the following line:\n\n";
print " mysql ".$self->dbconn_2_mysql('pipeline_db',1)."\n\n";
print " ".$self->db_connect_command('pipeline_db')."\n\n";
}
......
......@@ -57,6 +57,11 @@ use warnings;
use base ('Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf'); # All Hive databases configuration files should inherit from HiveGeneric, directly or indirectly
# EXPERIMENTAL: choose either 'mysql' or 'sqlite' and do not forget to provide the correct connection parameters:
$Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf::hive_default_driver = 'mysql';
=head2 default_options
Description : Implements default_options() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that is used to initialize default options.
......@@ -72,6 +77,7 @@ sub default_options {
'pipeline_name' => 'long_mult', # name used by the beekeeper to prefix job names on the farm
'pipeline_db' => { # connection parameters
-driver => $Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf::hive_default_driver,
-host => 'compara2',
-port => 3306,
-user => 'ensadmin',
......@@ -98,8 +104,8 @@ sub pipeline_create_commands {
@{$self->SUPER::pipeline_create_commands}, # inheriting database and hive tables' creation
# additional tables needed for long multiplication pipeline's operation:
'mysql '.$self->dbconn_2_mysql('pipeline_db', 1)." -e 'CREATE TABLE intermediate_result (a_multiplier char(40) NOT NULL, digit tinyint NOT NULL, result char(41) NOT NULL, PRIMARY KEY (a_multiplier, digit))'",
'mysql '.$self->dbconn_2_mysql('pipeline_db', 1)." -e 'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'",
$self->db_execute_command('pipeline_db', 'CREATE TABLE intermediate_result (a_multiplier char(40) NOT NULL, digit tinyint NOT NULL, result char(41) NOT NULL, PRIMARY KEY (a_multiplier, digit))'),
$self->db_execute_command('pipeline_db', 'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'),
];
}
......@@ -143,7 +149,7 @@ sub pipeline_analyses {
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
],
-flow_into => {
'MAIN' => [ 'mysql:////intermediate_result' ],
'MAIN' => [ ':////intermediate_result' ],
},
},
......@@ -155,7 +161,7 @@ sub pipeline_analyses {
],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-flow_into => {
'MAIN' => [ 'mysql:////final_result' ],
'MAIN' => [ ':////final_result' ],
},
},
];
......
......@@ -182,11 +182,11 @@ sub create_new_worker {
my $sql = q{INSERT INTO worker
(born, last_check_in, meadow_type, process_id, host, analysis_id)
VALUES (NOW(), NOW(), ?,?,?,?)};
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?)};
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $process_id, $exec_host, $analysisStats->analysis_id);
my $worker_id = $sth->{'mysql_insertid'};
my $worker_id = ($self->dbc->driver eq 'sqlite') ? $self->dbc->db_handle->func('last_insert_rowid') : $sth->{'mysql_insertid'};
$sth->finish;
my $worker = $self->fetch_by_dbID($worker_id);
......@@ -242,7 +242,7 @@ sub register_worker_death {
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
}
my $sql = "UPDATE worker SET died=now(), last_check_in=now()";
my $sql = "UPDATE worker SET died=CURRENT_TIMESTAMP, last_check_in=CURRENT_TIMESTAMP";
$sql .= " ,status='DEAD'";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " ,cause_of_death='$cod'";
......@@ -332,7 +332,7 @@ sub worker_check_in {
my ($self, $worker) = @_;
return unless($worker);
my $sql = "UPDATE worker SET last_check_in=now()";
my $sql = "UPDATE worker SET last_check_in=CURRENT_TIMESTAMP";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " WHERE worker_id='" . $worker->dbID ."'";
......@@ -391,8 +391,10 @@ sub fetch_overdue_workers {
$overdue_secs = 3600 unless(defined($overdue_secs));
my $constraint = "w.cause_of_death='' ".
"AND (UNIX_TIMESTAMP()-UNIX_TIMESTAMP(w.last_check_in))>$overdue_secs";
my $constraint = "w.cause_of_death='' AND ".
( ($self->dbc->driver eq 'sqlite')
? "(strftime('%s','now')-strftime('%s',w.last_check_in))>$overdue_secs"
: "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(w.last_check_in))>$overdue_secs");
return $self->_generic_fetch($constraint);
}
......@@ -803,10 +805,10 @@ sub monitor
my $sql = qq{
INSERT INTO monitor
SELECT
now(),
CURRENT_TIMESTAMP,
count(*),
sum(work_done/TIME_TO_SEC(TIMEDIFF(now(),born))),
sum(work_done/TIME_TO_SEC(TIMEDIFF(now(),born)))/count(*),
sum(work_done/TIME_TO_SEC(TIMEDIFF(CURRENT_TIMESTAMP,born))),
sum(work_done/TIME_TO_SEC(TIMEDIFF(CURRENT_TIMESTAMP,born)))/count(*),
group_concat(DISTINCT logic_name)
FROM worker left join analysis USING (analysis_id)
WHERE cause_of_death = ""};
......
......@@ -179,7 +179,9 @@ sub _make_list_from_query {
my $dbc;
if(my $db_conn = $self->param('db_conn')) {
$dbc = DBI->connect("DBI:mysql:$db_conn->{-dbname}:$db_conn->{-host}:$db_conn->{-port}", $db_conn->{-user}, $db_conn->{-pass}, { RaiseError => 1 });
$db_conn->{-driver} ||= 'mysql';
$dbc = DBI->connect("DBI:$db_conn->{-driver}:$db_conn->{-dbname}:$db_conn->{-host}:$db_conn->{-port}", $db_conn->{-user}, $db_conn->{-pass}, { RaiseError => 1 });
} else {
$dbc = $self->db->dbc;
}
......
......@@ -86,8 +86,9 @@ sub fetch_input {
# Use connection parameters to another database if supplied, otherwise use the current database as default:
#
if(my $db_conn = $self->param('db_conn')) {
$db_conn->{-driver} ||= 'mysql';
$self->param('dbh', DBI->connect("DBI:mysql:$db_conn->{-dbname}:$db_conn->{-host}:$db_conn->{-port}", $db_conn->{-user}, $db_conn->{-pass}, { RaiseError => 1 }) );
$self->param('dbh', DBI->connect("DBI:$db_conn->{-driver}:$db_conn->{-dbname}:$db_conn->{-host}:$db_conn->{-port}", $db_conn->{-user}, $db_conn->{-pass}, { RaiseError => 1 }) );
} else {
$self->param('dbh', $self->db->dbc->db_handle );
}
......@@ -119,7 +120,7 @@ sub run {
$dbh->do( $sql );
my $insert_id_name = '_insert_id_'.$counter++;
my $insert_id_value = $dbh->{'mysql_insertid'};
my $insert_id_value = ($dbh->driver eq 'sqlite') ? $dbh->func('last_insert_rowid') : $dbh->{'mysql_insertid'};
$output_id{$insert_id_name} = $insert_id_value;
$self->