Commit 0cea89c2 authored by Leo Gordon's avatar Leo Gordon
Browse files

code cleanup and runWorker script sterilization

parent e108b22e
......@@ -27,17 +27,33 @@
package Bio::EnsEMBL::Hive::AnalysisJob;
use strict;
#use Bio::EnsEMBL::Analysis;
#use Bio::EnsEMBL::DBSQL::DBAdaptor;
#use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
use base ('Bio::EnsEMBL::Hive::Params');
sub new {
my ($class,@args) = @_;
my $self = bless {}, $class;
return $self;
my $class = shift @_;
my $self = bless {}, $class;
my($dbID, $analysis_id, $input_id, $job_claim, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id, $adaptor) =
rearrange([qw(dbID analysis_id input_id job_claim worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id adaptor) ], @_);
$self->dbID($dbID) if(defined($dbID));
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->input_id($input_id) if(defined($input_id));
$self->job_claim($job_claim) if(defined($job_claim));
$self->worker_id($worker_id) if(defined($worker_id));
$self->status($status) if(defined($status));
$self->retry_count($retry_count) if(defined($retry_count));
$self->completed($completed) if(defined($completed));
$self->runtime_msec($runtime_msec) if(defined($runtime_msec));
$self->query_count($query_count) if(defined($query_count));
$self->semaphore_count($semaphore_count) if(defined($semaphore_count));
$self->semaphored_job_id($semaphored_job_id) if(defined($semaphored_job_id));
$self->adaptor($adaptor) if(defined($adaptor));
return $self;
}
sub adaptor {
......@@ -141,7 +157,6 @@ sub stderr_file {
return $self->{'_stderr_file'};
}
=head2 autoflow
Title : autoflow
......
......@@ -320,26 +320,26 @@ sub _objs_from_sth {
my @jobs = ();
while ($sth->fetch()) {
my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
$job->dbID($column{'analysis_job_id'});
$job->analysis_id($column{'analysis_id'});
$job->input_id($column{'input_id'});
$job->job_claim($column{'job_claim'});
$job->worker_id($column{'worker_id'});
$job->status($column{'status'});
$job->retry_count($column{'retry_count'});
$job->completed($column{'completed'});
$job->runtime_msec($column{'runtime_msec'});
$job->query_count($column{'query_count'});
$job->semaphore_count($column{'semaphore_count'});
$job->semaphored_job_id($column{'semaphored_job_id'});
$job->adaptor($self);
if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) {
#print("input_id was too big so stored in analysis_data table as dbID $1 -- fetching now\n");
$job->input_id($self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1));
}
my $input_id = ($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/)
? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1)
: $column{'input_id'};
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-DBID => $column{'analysis_job_id'},
-ANALYSIS_ID => $column{'analysis_id'},
-INPUT_ID => $input_id,
-JOB_CLAIM => $column{'job_claim'},
-WORKER_ID => $column{'worker_id'},
-STATUS => $column{'status'},
-RETRY_COUNT => $column{'retry_count'},
-COMPLETED => $column{'completed'},
-RUNTIME_MSEC => $column{'runtime_msec'},
-QUERY_COUNT => $column{'query_count'},
-SEMAPHORE_COUNT => $column{'query_count'},
-SEMAPHORED_JOB_ID => $column{'semaphored_job_id'},
-ADAPTOR => $self,
);
push @jobs, $job;
}
......
......@@ -4,14 +4,40 @@
package Bio::EnsEMBL::Hive::Meadow;
use Sys::Hostname;
use Bio::EnsEMBL::Hive::Meadow::LSF;
use Bio::EnsEMBL::Hive::Meadow::LOCAL;
use strict;
sub new {
my $class = shift @_;
unless($class=~/::/) {
$class = 'Bio::EnsEMBL::Hive::Meadow'.$class;
}
return bless { @_ }, $class;
}
sub guess_current_type_pid_exechost {
my $self = shift @_;
my ($type, $pid);
eval {
$pid = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id();
$type = 'LSF';
};
if($@) {
$pid = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id();
$type = 'LOCAL';
}
my $exechost = hostname();
return ($type, $pid, $exechost);
}
sub type { # should return 'LOCAL' or 'LSF'
return (reverse split(/::/, ref(shift @_)))[0];
}
......
......@@ -61,11 +61,11 @@ package Bio::EnsEMBL::Hive::Queen;
use strict;
use POSIX;
use Sys::Hostname;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()'
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
......@@ -97,16 +97,64 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub create_new_worker {
my ($self, @args) = @_;
my ($rc_id, $analysis_id, $beekeeper ,$process_id, $job, $no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
rearrange([qw(rc_id analysis_id beekeeper process_id job no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
my ( $meadow_type, $process_id, $exec_host,
$rc_id, $logic_name, $analysis_id, $input_id, $job_id,
$no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor or return undef;
rearrange([qw(meadow_type process_id exec_host
rc_id logic_name analysis_id input_id job_id
no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
if($logic_name) {
if($analysis_id) {
die "You should either define -analysis_id or -logic_name, but not both\n";
}
if(my $analysis = $self->db->get_AnalysisAdaptor->fetch_by_logic_name($logic_name)) {
$analysis_id = $analysis->dbID;
} else {
die "logic_name '$logic_name' could not be fetched from the database\n";
}
}
my $job;
if($input_id) {
if($job_id) {
die "You should either define -input_id or -job_id, but not both\n";
} elsif($analysis_id) {
$job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-INPUT_ID => $input_id,
-ANALYSIS_ID => $analysis_id,
-DBID => -1,
);
print "creating a job outside the database\n";
$job->print_job;
$debug=1 unless(defined($debug));
$hive_output_dir='' unless(defined($hive_output_dir)); # make it defined but empty/false
} else {
die "For creating a job outside the database either -analysis_id or -logic_name must also be defined\n";
}
}
if($job_id) {
if($analysis_id) {
die "When you specify -job_id, please omit both -logic_name and -analysis_id to avoid confusion\n";
} else {
print "fetching job for job_id '$job_id'\n";
if($job = $self->reset_and_fetch_job_by_dbID($job_id)) {
$analysis_id = $job->analysis_id;
} else {
die "job_id '$job_id' could not be fetched from the database\n";
}
}
}
$analysis_id = $job->analysis_id if(defined($job));
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor or return undef;
my $analysisStats;
if($analysis_id) {
$analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
$analysisStats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($analysisStats);
#return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else {
......@@ -118,8 +166,8 @@ sub create_new_worker {
#go into autonomous mode
return undef if($self->get_hive_current_load() >= 1.1);
$analStatsDBA->decrease_needed_workers($analysisStats->analysis_id);
$analStatsDBA->increase_running_workers($analysisStats->analysis_id);
$analysis_stats_adaptor->decrease_needed_workers($analysisStats->analysis_id);
$analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
$analysisStats->print_stats;
if($analysisStats->status eq 'BLOCKED') {
......@@ -132,16 +180,12 @@ sub create_new_worker {
}
}
my $host = hostname;
$process_id ||= $$;
$beekeeper = '' unless($beekeeper);
my $sql = q{INSERT INTO hive
(born, last_check_in, process_id, analysis_id, beekeeper, host)
(born, last_check_in, beekeeper, process_id, host, analysis_id)
VALUES (NOW(), NOW(), ?,?,?,?)};
my $sth = $self->prepare($sql);
$sth->execute($process_id, $analysisStats->analysis_id, $beekeeper, $host);
$sth->execute($meadow_type, $process_id, $exec_host, $analysisStats->analysis_id);
my $worker_id = $sth->{'mysql_insertid'};
$sth->finish;
......
......@@ -4,23 +4,14 @@ use strict;
use warnings;
use DBI;
use Getopt::Long;
use Bio::EnsEMBL::Registry;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Registry;
use Bio::EnsEMBL::Hive::Meadow::LSF;
use Bio::EnsEMBL::Hive::Meadow::LOCAL;
use Bio::EnsEMBL::Hive::Meadow;
Bio::EnsEMBL::Registry->no_version_check(1);
# I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = bless {};
$self->{'db_conf'} = {
my $db_conf = {
-host => '',
-port => 3306,
-user => 'ensro',
......@@ -28,78 +19,63 @@ $self->{'db_conf'} = {
-dbname => '',
};
$self->{'job_id'} = undef; # most specific specialization
$self->{'analysis_id'} = undef; # less specific specialization
$self->{'logic_name'} = undef; # (---------,,---------------)
$self->{'rc_id'} = undef; # least specific specialization
$self->{'hive_output_dir'} = undef;
$self->{'worker_output_dir'} = undef; # will take precedence over hive_output_dir if set
$self->{'beekeeper'} = undef;
$self->{'process_id'} = undef;
$self->{'debug'} = undef;
$self->{'no_write'} = undef;
$self->{'maximise_concurrency'} = undef;
$self->{'retry_throwing_jobs'} = undef;
my $conf_file;
my ($help, $adaptor, $url);
my $reg_conf = undef;
my $reg_alias = 'hive';
my ($conf_file, $reg_conf, $reg_alias, $url); # Connection parameters
my ($rc_id, $logic_name, $analysis_id, $input_id, $job_id); # Task specification parameters
my ($batch_size, $job_limit, $life_span, $no_cleanup, $no_write, $hive_output_dir, $worker_output_dir, $retry_throwing_jobs); # Worker control parameters
my ($help, $debug, $show_analysis_stats, $maximise_concurrency);
GetOptions(
# Connection parameters:
'conf=s' => \$conf_file,
'regfile=s' => \$reg_conf,
'regname=s' => \$reg_alias,
'url=s' => \$url,
'host|dbhost=s' => \$self->{'db_conf'}->{'-host'},
'port|dbport=i' => \$self->{'db_conf'}->{'-port'},
'user|dbuser=s' => \$self->{'db_conf'}->{'-user'},
'password|dbpass=s' => \$self->{'db_conf'}->{'-pass'},
'database|dbname=s' => \$self->{'db_conf'}->{'-dbname'},
# Job/Analysis control parameters:
'job_id=i' => \$self->{'job_id'},
'analysis_id=i' => \$self->{'analysis_id'},
'rc_id=i' => \$self->{'rc_id'},
'logic_name=s' => \$self->{'logic_name'},
'batch_size=i' => \$self->{'batch_size'},
'job_limit|limit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'hive_output_dir|outdir=s' => \$self->{'hive_output_dir'}, # keep compatibility with the old name
'worker_output_dir=s' => \$self->{'worker_output_dir'}, # will take precedence over hive_output_dir if set
'input_id=s' => \$self->{'input_id'},
'no_cleanup' => \$self->{'no_cleanup'},
'analysis_stats' => \$self->{'show_analysis_stats'},
'no_write' => \$self->{'no_write'},
'nowrite' => \$self->{'no_write'},
'maximise_concurrency=i'=> \$self->{'maximise_concurrency'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'conf=s' => \$conf_file,
'reg_conf|regfile=s' => \$reg_conf,
'reg_alias|regname=s' => \$reg_alias,
'url=s' => \$url,
'host|dbhost=s' => \$db_conf->{'-host'},
'port|dbport=i' => \$db_conf->{'-port'},
'user|dbuser=s' => \$db_conf->{'-user'},
'password|dbpass=s' => \$db_conf->{'-pass'},
'database|dbname=s' => \$db_conf->{'-dbname'},
# Task specification parameters:
'rc_id=i' => \$rc_id,
'logic_name=s' => \$logic_name,
'analysis_id=i' => \$analysis_id,
'input_id=s' => \$input_id,
'job_id=i' => \$job_id,
# Worker control parameters:
'batch_size=i' => \$batch_size,
'job_limit|limit=i' => \$job_limit,
'life_span|lifespan=i' => \$life_span,
'no_cleanup' => \$no_cleanup,
'no_write|nowrite' => \$no_write,
'hive_output_dir|outdir=s' => \$hive_output_dir, # keep compatibility with the old name
'worker_output_dir=s' => \$worker_output_dir, # will take precedence over hive_output_dir if set
'retry_throwing_jobs=i' => \$retry_throwing_jobs,
# Other commands
'h|help' => \$help,
'debug=i' => \$self->{'debug'},
'h|help' => \$help,
'debug=i' => \$debug,
'analysis_stats' => \$show_analysis_stats,
'maximise_concurrency=i' => \$maximise_concurrency,
# loose arguments interpreted as database name (for compatibility with mysql[dump])
'<>', sub { $self->{'db_conf'}->{'-dbname'} = shift @_; },
'<>', sub { $db_conf->{'-dbname'} = shift @_; },
);
$self->{'analysis_id'} = shift if(@_);
if ($help) { usage(0); }
parse_conf($self, $conf_file);
parse_conf($conf_file);
my $DBA;
if($reg_conf) {
Bio::EnsEMBL::Registry->load_all($reg_conf);
$DBA = Bio::EnsEMBL::Registry->get_DBAdaptor($reg_alias, 'hive');
$DBA = Bio::EnsEMBL::Registry->get_DBAdaptor($reg_alias || 'hive', 'hive');
} elsif($url) {
$DBA = Bio::EnsEMBL::Hive::URLFactory->fetch($url) or die "Unable to connect to '$url'\n";
} elsif ($self->{'db_conf'}->{'-host'} and $self->{'db_conf'}->{'-user'} and $self->{'db_conf'}->{'-dbname'}) {
$DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
} elsif ($db_conf->{'-host'} and $db_conf->{'-user'} and $db_conf->{'-dbname'}) {
$DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%$db_conf);
} else {
print "\nERROR : Connection parameters (regfile+regname, url or dbhost+dbuser+dbname) need to be specified\n\n";
usage(1);
......@@ -111,74 +87,60 @@ unless($DBA and $DBA->isa("Bio::EnsEMBL::Hive::DBSQL::DBAdaptor")) {
}
my $queen = $DBA->get_Queen();
$queen->{maximise_concurrency} = 1 if ($self->{maximise_concurrency});
$queen->{maximise_concurrency} = 1 if ($maximise_concurrency);
eval {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id();
};
if($@) {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id();
$self->{'beekeeper'} = 'LOCAL';
} else {
$self->{'beekeeper'} = 'LSF';
}
my ($meadow_type, $process_id, $exec_host) = Bio::EnsEMBL::Hive::Meadow->guess_current_type_pid_exechost();
print("process_id = ", $self->{'process_id'}, "\n") if($self->{'process_id'});
print "runWorker(-MeadowType => $meadow_type, -ProcessId => $process_id, -ExecHost => $exec_host)\n";
if($self->{'logic_name'}) {
my $analysis = $queen->db->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'});
unless($analysis) {
printf("logic_name: '%s' does not exist in database\n\n", $self->{'logic_name'});
usage(1);
}
$self->{'analysis_id'} = $analysis->dbID;
}
my $worker;
$self->{'analysis_job'} = undef;
if($self->{'analysis_id'} and $self->{'input_id'}) {
$self->{'analysis_job'} = new Bio::EnsEMBL::Hive::AnalysisJob;
$self->{'analysis_job'}->input_id($self->{'input_id'});
$self->{'analysis_job'}->analysis_id($self->{'analysis_id'});
$self->{'analysis_job'}->dbID(-1);
print("creating job outside database\n");
$self->{'analysis_job'}->print_job;
$self->{'debug'}=1 unless(defined($self->{'debug'}));
$self->{'hive_output_dir'}='' unless(defined($self->{'hive_output_dir'})); # make it defined but empty/false
}
if($self->{'job_id'}) {
printf("fetching job for id %i\n", $self->{'job_id'});
$self->{'analysis_job'} = $queen->reset_and_fetch_job_by_dbID($self->{'job_id'});
$self->{'analysis_id'} = $self->{'analysis_job'}->analysis_id if($self->{'analysis_job'});
}
eval {
$worker = $queen->create_new_worker(
# Worker identity:
-meadow_type => $meadow_type,
-process_id => $process_id,
-exec_host => $exec_host,
# Task specification:
-rc_id => $rc_id,
-logic_name => $logic_name,
-analysis_id => $analysis_id,
-input_id => $input_id,
-job_id => $job_id,
# Worker control parameters:
-batch_size => $batch_size,
-job_limit => $job_limit,
-life_span => $life_span,
-no_cleanup => $no_cleanup,
-no_write => $no_write,
-worker_output_dir => $worker_output_dir,
-hive_output_dir => $hive_output_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
# Other parameters:
-debug => $debug,
);
};
my $msg_thrown = $@;
my $worker = $queen->create_new_worker(
-rc_id => $self->{'rc_id'},
-analysis_id => $self->{'analysis_id'},
-beekeeper => $self->{'beekeeper'},
-process_id => $self->{'process_id'},
-job => $self->{'analysis_job'},
-no_write => $self->{'no_write'},
-debug => $self->{'debug'},
-batch_size => $self->{'batch_size'},
-job_limit => $self->{'job_limit'},
-life_span => $self->{'life_span'},
-no_cleanup => $self->{'no_cleanup'},
-worker_output_dir => $self->{'worker_output_dir'},
-hive_output_dir => $self->{'hive_output_dir'},
-retry_throwing_jobs => $self->{'retry_throwing_jobs'},
);
unless($worker) {
$queen->print_analysis_status if($self->{'show_analysis_stats'});
print("\n=== COULDN'T CREATE WORKER ===\n");
exit(1);
$queen->print_analysis_status if($show_analysis_stats);
print "\n=== COULDN'T CREATE WORKER ===\n";
if($msg_thrown) {
print "$msg_thrown\n";
usage(1);
} else {
exit(1);
}
}
$worker->print_worker();
$worker->run();
if($self->{'show_analysis_stats'}) {
if($show_analysis_stats) {
$queen->print_analysis_status;
$queen->get_num_needed_workers(); # apparently run not for the return value, but for the side-effects
}
......@@ -208,7 +170,6 @@ sub usage {
}
sub parse_conf {
my $self = shift;
my $conf_file = shift;
if($conf_file and (-e $conf_file)) {
......@@ -218,7 +179,7 @@ sub parse_conf {
foreach my $confPtr (@conf_list) {
#print("HANDLE type " . $confPtr->{TYPE} . "\n");
if(($confPtr->{TYPE} eq 'COMPARA') or ($confPtr->{TYPE} eq 'DATABASE')) {
$self->{'db_conf'} = $confPtr;
$db_conf = $confPtr;
}
}
}
......@@ -251,12 +212,15 @@ __DATA__
# Run one local worker process in ehive_dbname and specify the logic_name
runWorker.pl -url mysql://username:secret@hostname:port/ehive_dbname -logic_name fast_blast
# Run a specific job (by a local worker process):
runWorker.pl -url mysql://username:secret@hostname:port/ehive_dbname -job_id 123456
# Create a job outside the eHive to test the specified input_id
runWorker.pl -url mysql://username:secret@hostname:port/ehive_dbname -logic_name fast_blast -input_id '{ "foo" => 1500 }'
=head1 OPTIONS
=head2 Connection parameters
=head2 Connection parameters:
-conf <path> : config file describing db connection
-regfile <path> : path to a Registry configuration file
......@@ -268,26 +232,31 @@ __DATA__
-password <pass> : mysql connection password
[-database] <name> : mysql database <name>
=head2 Job/Analysis control parameters:
=head2 Task specificaton parameters:
-rc_id <id> : resource class id
-logic_name <string> : pre-specify this worker in a particular analysis defined by name
-analysis_id <id> : pre-specify this worker in a particular analysis defined by database id
-input_id <string> : test this input_id on specified analysis (defined either by analysis_id or logic_name)
-job_id <id> : run a specific job defined by its database id
=head2 Worker control parameters:
-analysis_id <id> : analysis_id in db
-logic_name <string> : logic_name of analysis to make this worker
-batch_size <num> : #jobs to claim at a time
-job_limit <num> : #jobs to run before worker can die naturally
-life_span <num> : number of minutes this worker is allowed to run
-hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected
-input_id <string> : test input_id on specified analysis (analysis_id or logic_name)
-job_id <id> : run specific job defined by analysis_job_id
-analysis_stats : show status of each analysis in hive
-no_cleanup : don't perform temp directory cleanup when worker exits
-no_write : don't write_output or auto_dataflow input_job
-retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default?
-hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected
-retry_throwing_jobs <0|1> : if a job dies *knowingly*, should we retry it by default?
=head2 Other options:
-help : print this help
-debug <level> : turn on debug messages at <level>
-help : print this help
-debug <level> : turn on debug messages at <level>
-analysis_stats : show status of each analysis in hive
-maximise_concurrency <0|1> : different scheduling strategies of analysis self-assignment
=head1 CONTACT
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment