Commit efb2f8b3 authored by Leo Gordon's avatar Leo Gordon
Browse files

optional (cmdline-controlled, off-by-default) support of job-counting triggers

parent d4d8b410
......@@ -99,7 +99,8 @@ sub CreateNewJob {
$input_id = "_ext_input_analysis_data_id $input_data_id";
}
my $dbc = $analysis->adaptor->db->dbc;
my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc;
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $status = $blocked ? 'BLOCKED' : 'READY';
my $analysis_id = $analysis->dbID();
......@@ -114,13 +115,15 @@ sub CreateNewJob {
my $job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
,unclaimed_job_count=unclaimed_job_count+1
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
unless($dba->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
,unclaimed_job_count=unclaimed_job_count+1
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
}
return $job_id;
}
......
......@@ -260,13 +260,22 @@ sub interval_update_work_done {
$weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
my $sql = qq{
my $sql = $self->db->hive_use_triggers()
? qq{
UPDATE analysis_stats SET
avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)),
avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)),
avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)),
avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
WHERE analysis_id= $analysis_id
}
: qq{
UPDATE analysis_stats SET
unclaimed_job_count = unclaimed_job_count - $job_count,
avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)),
avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)),
avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)),
avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)),
unclaimed_job_count = unclaimed_job_count - $job_count,
done_job_count = done_job_count + $job_count
WHERE analysis_id= $analysis_id
};
......
......@@ -39,6 +39,18 @@ use strict;
use base ('Bio::EnsEMBL::DBSQL::DBAdaptor');
sub hive_use_triggers { # getter only, not setter
my $self = shift @_;
unless( defined($self->{'_hive_use_triggers'}) ) {
my $arrRef = $self->get_MetaContainer->list_value_by_key( 'hive_use_triggers' );
$self->{'_hive_use_triggers'} = @$arrRef ? $arrRef->[0] : 0;
}
return $self->{'_hive_use_triggers'};
}
sub get_available_adaptors {
my %pairs = (
......
......@@ -93,17 +93,21 @@ sub pipeline_create_commands {
my $self = shift @_;
my $db_conn = shift @_ || 'pipeline_db';
my $hive_use_triggers = $self->{'_extra_options'}{'hive_use_triggers'};
return ($self->o($db_conn, '-driver') eq 'sqlite')
? [
# standard eHive tables and unique/non-unique indices:
# standard eHive tables, triggers and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sqlite',
$hive_use_triggers ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.sqlite' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.sqlite',
]
: [
'mysql '.$self->dbconn_2_mysql($db_conn, 0)." -e 'CREATE DATABASE ".$self->o('pipeline_db', '-dbname')."'",
# standard eHive tables, foreign_keys and procedures:
# standard eHive tables, triggers, foreign_keys and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sql',
$hive_use_triggers ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.mysql' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/foreign_keys.mysql',
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.mysql',
];
......@@ -167,6 +171,7 @@ sub pre_options {
'job_topup!' => '',
'analysis_topup!' => '',
'hive_driver' => '',
'hive_use_triggers' => '',
};
}
......@@ -294,6 +299,8 @@ sub run {
warn "Loading pipeline-wide parameters ...\n";
my $pipeline_wide_parameters = $self->pipeline_wide_parameters;
$pipeline_wide_parameters->{'hive_use_triggers'} = $self->{'_extra_options'}{'hive_use_triggers'} || 0; # pass it into the database
while( my($meta_key, $meta_value) = each %$pipeline_wide_parameters ) {
if($analysis_topup) {
$meta_container->delete_key($meta_key);
......
......@@ -517,55 +517,73 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats->analysis_id);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor or return undef;
$analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
$analysisStats->total_job_count(0);
$analysisStats->unclaimed_job_count(0);
$analysisStats->done_job_count(0);
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*), semaphore_count FROM job ".
"WHERE analysis_id=? GROUP BY status, semaphore_count";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
$analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
my $hive_capacity = $analysisStats->hive_capacity;
my $done_here = 0;
my $done_elsewhere = 0;
while (my ($status, $job_count, $semaphore_count)=$sth->fetchrow_array()) {
# print STDERR "$status: $job_count\n";
if($self->db->hive_use_triggers()) {
my $job_count = $analysisStats->unclaimed_job_count();
my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
my $curr_total = $analysisStats->total_job_count();
$analysisStats->total_job_count($curr_total + $job_count);
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $analysis_stats_adaptor->get_running_worker_count($analysisStats);
if($capacity_allows_to_add < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
}
}
$analysisStats->num_required_workers( $required_workers );
} else {
$analysisStats->total_job_count(0);
$analysisStats->unclaimed_job_count(0);
$analysisStats->done_job_count(0);
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
if(($status eq 'READY') and ($semaphore_count<=0)) {
$analysisStats->unclaimed_job_count($job_count);
my $sql = "SELECT status, count(*), semaphore_count FROM job ".
"WHERE analysis_id=? GROUP BY status, semaphore_count";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $analysis_stats_adaptor->get_running_worker_count($analysisStats);
my $done_here = 0;
my $done_elsewhere = 0;
while (my ($status, $job_count, $semaphore_count)=$sth->fetchrow_array()) {
# print STDERR "$status: $job_count\n";
if($capacity_allows_to_add < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
my $curr_total = $analysisStats->total_job_count();
$analysisStats->total_job_count($curr_total + $job_count);
if(($status eq 'READY') and ($semaphore_count<=0)) {
$analysisStats->unclaimed_job_count($job_count);
my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $analysis_stats_adaptor->get_running_worker_count($analysisStats);
if($capacity_allows_to_add < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
}
}
$analysisStats->num_required_workers( $required_workers );
} elsif($status eq 'DONE' and $semaphore_count<=0) {
$done_here = $job_count;
} elsif($status eq 'PASSED_ON' and $semaphore_count<=0) {
$done_elsewhere = $job_count;
} elsif ($status eq 'FAILED') {
$analysisStats->failed_job_count($job_count);
}
$analysisStats->num_required_workers( $required_workers );
} elsif($status eq 'DONE' and $semaphore_count<=0) {
$done_here = $job_count;
} elsif($status eq 'PASSED_ON' and $semaphore_count<=0) {
$done_elsewhere = $job_count;
} elsif ($status eq 'FAILED') {
$analysisStats->failed_job_count($job_count);
}
}
$sth->finish;
}
$sth->finish;
$analysisStats->done_job_count($done_here + $done_elsewhere);
$analysisStats->done_job_count($done_here + $done_elsewhere);
} # /unless $self->{'_hive_use_triggers'}
$analysisStats->check_blocking_control_rules();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment