Commit ef413bd4 authored by Leo Gordon's avatar Leo Gordon
Browse files

First attempt to support PostgreSQL in eHive. Use with caution.

Has one known limitation (always INSERTs instead of INSERT_IGNORE and REPLACE) and may contain other bugs.
parent e898fdaf
......@@ -104,7 +104,7 @@ sub CreateNewJob {
my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc;
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $insertion_method = { 'mysql' => 'INSERT IGNORE', 'sqlite' => 'INSERT OR IGNORE', 'pgsql' => 'INSERT' }->{ $dbc->driver };
my $job_status = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
my $analysis_id = $analysis->dbID();
......@@ -137,10 +137,10 @@ sub CreateNewJob {
.(($job_status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).qq{
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=$analysis_id ");
}
} else { # if we got 0E0, it means "ignored insert collision" (job created previously), so we simply return an undef and deal with it outside
}
......@@ -373,9 +373,11 @@ sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or
# NB: BOTH THE ORDER OF UPDATES AND EXACT WORDING IS ESSENTIAL FOR SYNCHRONOUS ATOMIC OPERATION,
# otherwise the same command tends to behave differently on MySQL and SQLite (at least)
#
my $sql = qq{
UPDATE job
SET status=(CASE WHEN semaphore_count>1 THEN 'SEMAPHORED' ELSE 'READY' END),
my $sql = "UPDATE job "
.( ($self->dbc->driver eq 'pgsql')
? "SET status = CAST(CASE WHEN semaphore_count>1 THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status), "
: "SET status = CASE WHEN semaphore_count>1 THEN 'SEMAPHORED' ELSE 'READY' END, "
).qq{
semaphore_count=semaphore_count-?
WHERE job_id=? AND status='SEMAPHORED'
};
......@@ -478,7 +480,7 @@ sub reset_or_grab_job_by_dbID {
# Note: the order of the fields being updated is critical!
my $sql = qq{
UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
SET retry_count = CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
, status=?
, worker_id=?
WHERE job_id=?
......@@ -524,15 +526,15 @@ sub grab_jobs_for_worker {
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch";
if($self->dbc->driver eq 'sqlite') {
if($self->dbc->driver eq 'mysql') {
# we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
}
} else {
# we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
}
}
......@@ -621,9 +623,12 @@ sub release_and_age_job {
#
# FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
#
$self->dbc->do( qq{
UPDATE job
SET status=(CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END),
$self->dbc->do(
"UPDATE job "
.( ($self->dbc->driver eq 'pgsql')
? "SET status = CAST(CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS jw_status), "
: "SET status = CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
).qq{
retry_count=retry_count+1,
runtime_msec=$runtime_msec
WHERE job_id=$job_id
......
......@@ -92,7 +92,7 @@ sub fetch_all_by_suitability_rc_id_meadow_type {
my $primary_results = $self->_generic_fetch(
"ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
$join ,
'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' ),
'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ),
);
# the ones that may have work to do after a sync:
......@@ -425,10 +425,13 @@ sub _columns {
ast.sync_lock
);
push @columns , ($self->dbc->driver eq 'sqlite')
? "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update "
: "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
return @columns;
push @columns, {
'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ",
'sqlite' => "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update ",
'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - ast.last_update) seconds_since_last_update ",
}->{ $self->dbc->driver };
return @columns;
}
sub _objs_from_sth {
......
......@@ -113,7 +113,8 @@ sub _table_info_loader {
}
$sth->finish;
if(($driver eq 'sqlite') and scalar(@primary_key)==1 and (uc($name2type{$primary_key[0]}) eq 'INTEGER') ) {
if( ($driver ne 'mysql')
and scalar(@primary_key)==1 and (uc($name2type{$primary_key[0]}) eq 'INTEGER') ) {
$autoinc_id = $primary_key[0];
}
......@@ -291,13 +292,15 @@ sub store {
return unless(scalar(@$objects));
my $table_name = $self->table_name();
my $all_storable_columns = [ keys %{ $self->column_set() } ];
my $autoinc_id = $self->autoinc_id();
my $all_storable_columns = [ grep { $_ ne $autoinc_id } keys %{ $self->column_set() } ];
my $driver = $self->dbc->driver();
my $insertion_method = $self->insertion_method; # INSERT, INSERT_IGNORE or REPLACE
$insertion_method =~ s/_/ /g;
if($driver eq 'sqlite') {
$insertion_method =~ s/INSERT IGNORE/INSERT OR IGNORE/ig;
} elsif($driver eq 'pgsql') { # FIXME! temporary hack
$insertion_method = 'INSERT';
}
my %hashed_sth = (); # do not prepare statements until there is a real need
......@@ -309,6 +312,7 @@ sub store {
$self->mark_stored($object, $present);
} else {
my ($columns_being_stored, $column_key) = (ref($object) eq 'HASH') ? ( [ sort keys %$object ], join(', ', sort keys %$object) ) : ($all_storable_columns, '*all*');
# print "COLUMN_KEY='$column_key'\n";
my $this_sth;
......@@ -328,7 +332,8 @@ sub store {
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
or die "Could not store fields\n\t{$column_key}\nwith data:\n\t(".join(',', @$values_being_stored).')';
if($return_code > 0) { # <--- for the same reason we have to be explicitly numeric here
$self->mark_stored($object, $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id) );
my $liid = $autoinc_id && $self->dbc->db_handle->last_insert_id(undef, undef, $table_name, $autoinc_id);
$self->mark_stored($object, $liid );
++$stored_this_time;
}
}
......
......@@ -107,26 +107,34 @@ sub pipeline_create_commands {
my $self = shift @_;
my $db_conn = shift @_ || 'pipeline_db';
return ($self->o($db_conn, '-driver') eq 'sqlite')
? [
$self->o('hive_force_init') ? ( 'rm -f '.$self->o('pipeline_db', '-dbname') ) : (),
return {
'sqlite' => [
$self->o('hive_force_init') ? ( 'rm -f '.$self->o($db_conn, '-dbname') ) : (),
# standard eHive tables, triggers and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/tables.sqlite',
$self->o('hive_use_triggers') ? ( $self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/triggers.sqlite' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/procedures.sqlite',
]
: [
$self->o('hive_force_init') ? ( 'mysql '.$self->dbconn_2_mysql($db_conn, 0)." -e 'DROP DATABASE IF EXISTS `".$self->o('pipeline_db', '-dbname')."`'" ) : (),
],
'mysql' => [
$self->o('hive_force_init') ? ( 'mysql '.$self->dbconn_2_mysql($db_conn, 0)." -e 'DROP DATABASE IF EXISTS `".$self->o($db_conn, '-dbname')."`'" ) : (),
'mysql '.$self->dbconn_2_mysql($db_conn, 0)." -e 'CREATE DATABASE `".$self->o('pipeline_db', '-dbname')."`'",
'mysql '.$self->dbconn_2_mysql($db_conn, 0)." -e 'CREATE DATABASE `".$self->o($db_conn, '-dbname')."`'",
# standard eHive tables, triggers, foreign_keys and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/tables.sql',
$self->o('hive_use_triggers') ? ( $self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/triggers.mysql' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/foreign_keys.mysql',
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/procedures.mysql',
];
],
'pgsql' => [
$self->o('hive_force_init') ? ( $self->db_execute_command($db_conn, 'DROP DATABASE IF EXISTS '.$self->o($db_conn, '-dbname'), 0 ) ) : (),
$self->db_execute_command($db_conn, 'CREATE DATABASE '.$self->o($db_conn, '-dbname'), 0 ),
$self->db_connect_command($db_conn).' <'.$self->o('hive_root_dir').'/sql/tables.pgsql',
],
}->{ $self->o($db_conn, '-driver') };
}
......@@ -235,9 +243,11 @@ sub dbconn_2_mysql { # will save you a lot of typing
sub db_connect_command {
my ($self, $db_conn) = @_;
return ($self->o($db_conn, '-driver') eq 'sqlite')
? 'sqlite3 '.$self->o($db_conn, '-dbname')
: 'mysql '.$self->dbconn_2_mysql($db_conn, 1);
return {
'sqlite' => 'sqlite3 '.$self->o($db_conn, '-dbname'),
'mysql' => 'mysql '.$self->dbconn_2_mysql($db_conn, 1),
'pgsql' => 'psql '.$self->o($db_conn, '-dbname'),
}->{ $self->o($db_conn, '-driver') };
}
......@@ -248,11 +258,15 @@ sub db_connect_command {
=cut
sub db_execute_command {
my ($self, $db_conn, $sql_command) = @_;
my ($self, $db_conn, $sql_command, $with_db) = @_;
return ($self->o($db_conn, '-driver') eq 'sqlite')
? 'sqlite3 '.$self->o($db_conn, '-dbname')." '$sql_command'"
: 'mysql '.$self->dbconn_2_mysql($db_conn, 1)." -e '$sql_command'";
$with_db = 1 unless(defined($with_db));
return {
'sqlite' => 'sqlite3 '.$self->o($db_conn, '-dbname')." '$sql_command'", # can't imagine an sqlite3 cmd without dbname
'mysql' => 'mysql '.$self->dbconn_2_mysql($db_conn, $with_db)." -e '$sql_command'",
'pgsql' => "psql --command='$sql_command' ".($with_db ? $self->o($db_conn, '-dbname') : ''),
}->{ $self->o($db_conn, '-driver') };
}
......
......@@ -466,10 +466,12 @@ sub fetch_overdue_workers {
$overdue_secs = 3600 unless(defined($overdue_secs));
my $constraint = "status!='DEAD' AND ".
( ($self->dbc->driver eq 'sqlite')
? "(strftime('%s','now')-strftime('%s',last_check_in))>$overdue_secs"
: "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(last_check_in))>$overdue_secs");
my $constraint = "status!='DEAD' AND ".{
'mysql' => "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(last_check_in)) > $overdue_secs",
'sqlite' => "(strftime('%s','now')-strftime('%s',last_check_in)) > $overdue_secs",
'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - last_check_in) > $overdue_secs",
}->{ $self->dbc->driver };
return $self->fetch_all( $constraint );
}
......@@ -744,9 +746,9 @@ sub print_running_worker_counts {
my $sql = qq{
SELECT logic_name, count(*)
FROM worker w
JOIN analysis_base USING(analysis_id)
JOIN analysis_base a USING(analysis_id)
WHERE w.status!='DEAD'
GROUP BY analysis_id
GROUP BY a.analysis_id
};
my $total_workers = 0;
......@@ -775,23 +777,23 @@ sub print_running_worker_counts {
=cut
sub monitor {
my $self = shift;
my $sql = qq{
INSERT INTO monitor
SELECT
CURRENT_TIMESTAMP,
count(*),
}. ( ($self->dbc->driver eq 'sqlite')
? qq{ sum(work_done/(strftime('%s','now')-strftime('%s',born))),
sum(work_done/(strftime('%s','now')-strftime('%s',born)))/count(*), }
: qq{ sum(work_done/(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(born))),
sum(work_done/(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(born)))/count(*), }
). qq{
group_concat(DISTINCT logic_name)
FROM worker w
LEFT JOIN analysis_base USING (analysis_id)
WHERE w.status!='DEAD'
};
my $self = shift;
my $sql = qq{
INSERT INTO monitor
SELECT CURRENT_TIMESTAMP, count(*),
} . {
'mysql' => qq{ sum(work_done/(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(born))),
sum(work_done/(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(born)))/count(*), },
'sqlite' => qq{ sum(work_done/(strftime('%s','now')-strftime('%s',born))),
sum(work_done/(strftime('%s','now')-strftime('%s',born)))/count(*), },
'pgsql' => qq{ sum(work_done/(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - born))),
sum(work_done/(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - born)))/count(*), },
}->{ $self->dbc->driver }. qq{
group_concat(DISTINCT logic_name)
FROM worker w
LEFT JOIN analysis_base USING (analysis_id)
WHERE w.status!='DEAD'
};
my $sth = $self->prepare($sql);
$sth->execute();
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment