Commit 3e6477b1 authored by Leo Gordon's avatar Leo Gordon
Browse files

separating create_new_worker() from specialize_new_worker()

parent 8d873af3
......@@ -88,39 +88,103 @@ sub object_class {
############################
#
# PUBLIC API for Workers
# PUBLIC API
#
############################
=head2 create_new_worker
Arg [1] : $analysis_id (optional)
Example :
Description: If analysis_id is specified it will try to create a worker based
on that analysis. If not specified the queen will analyze the hive
and pick the analysis that has the most amount of work to be done.
It creates an entry in the worker table, and returns a Worker object
based on that insert. This guarantees that each worker registered
in this queens hive is properly registered.
Description: Creates an entry in the worker table,
populates some non-storable attributes
and returns a Worker object based on that insert.
This guarantees that each worker registered in this Queen's hive is properly registered.
Returntype : Bio::EnsEMBL::Hive::Worker
Exceptions :
Caller :
Caller : runWorker.pl
=cut
sub create_new_worker {
my ($self, @args) = @_;
my ( $meadow_type, $meadow_name, $process_id, $exec_host,
$rc_id, $rc_name, $analysis_id, $logic_name, $job_id,
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once, $force) =
my ($meadow_type, $meadow_name, $process_id, $exec_host, $resource_class_id, $resource_class_name,
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once) =
rearrange([qw(meadow_type meadow_name process_id exec_host resource_class_id resource_class_name
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once) ], @args);
if($resource_class_name) {
my $rc = $self->db->get_ResourceClassAdaptor->fetch_by_name($resource_class_name)
or die "resource_class with name='$resource_class_name' could not be fetched from the database";
$resource_class_id = $rc->dbID;
}
$resource_class_id ||= 1;
my $sql = q{INSERT INTO worker (born, last_check_in, meadow_type, meadow_name, host, process_id, resource_class_id)
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?, ?, ?, ?, ?)};
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $meadow_name, $exec_host, $process_id, $resource_class_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id')
or die "Could not create a new worker";
$sth->finish;
if($hive_log_dir or $worker_log_dir) {
my $dir_revhash = dir_revhash($worker_id);
$worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') .'worker_id_'.$worker_id;
# Note: the following die-message will not reach the log files for circular reason!
system("mkdir -p $worker_log_dir") && die "Could not create '$worker_log_dir' because: $!";
my $sth_add_log = $self->prepare( "UPDATE worker SET log_dir=? WHERE worker_id=?" );
$sth_add_log->execute($worker_log_dir, $worker_id);
$sth_add_log->finish;
}
my $worker = $self->fetch_by_dbID($worker_id)
or die "Could not fetch worker with dbID=$worker_id";
$worker->init;
if($job_limit) {
$worker->job_limit($job_limit);
$worker->life_span(0);
}
$worker->life_span($life_span * 60) if($life_span);
$worker->execute_writes(0) if($no_write);
$worker->perform_cleanup(0) if($no_cleanup);
$worker->debug($debug) if($debug);
$worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
$worker->compile_module_once($compile_module_once) if(defined $compile_module_once);
return $worker;
}
=head2 specialize_new_worker
rearrange([qw(meadow_type meadow_name process_id exec_host
rc_id rc_name analysis_id logic_name job_id
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once force) ], @args);
Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
If not specified the Queen will analyze the hive and pick the most suitable analysis.
Returntype : Bio::EnsEMBL::Hive::Worker
Caller : runWorker.pl
=cut
sub specialize_new_worker {
my ($self, $worker, @args) = @_;
my ($analysis_id, $logic_name, $job_id, $force) =
rearrange([qw(analysis_id logic_name job_id force) ], @args);
if( scalar( grep {defined($_)} ($rc_id, $rc_name, $analysis_id, $logic_name, $job_id) ) > 1) {
die "At most one of the options {-rc_id, -rc_name, -analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
if( scalar( grep {defined($_)} ($analysis_id, $logic_name, $job_id) ) > 1) {
die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
}
my ($job, $analysis, $stats);
......@@ -151,6 +215,10 @@ sub create_new_worker {
or die "analysis with dbID='$analysis_id' could not be fetched from the database";
}
if($worker->resource_class_id != $analysis->resource_class_id) {
die "resource_class of analysis ".$analysis->logic_name." is incompatible with this Worker's resource_class";
}
$stats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($stats);
......@@ -171,30 +239,22 @@ sub create_new_worker {
} else { # probably scheduled by beekeeper.pl
if($rc_name) {
my $rc = $self->db->get_ResourceClassAdaptor->fetch_by_name($rc_name)
or die "resource_class with name='$rc_name' could not be fetched from the database";
$rc_id = $rc->dbID;
}
$stats = $self->specialize_new_worker($rc_id)
$stats = $self->suggest_analysis_to_specialize_by_rc_id($worker->resource_class_id)
or die "Scheduler failed to pick an analysis for the worker";
print "Scheduler picked analysis with dbID=".$stats->analysis_id." for the worker\n";
$analysis_id = $stats->analysis_id;
}
my $sql = q{INSERT INTO worker (born, last_check_in, meadow_type, meadow_name, process_id, host, analysis_id)
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?,?)};
# now set it in the $worker:
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $meadow_name, $process_id, $exec_host, $stats->analysis_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id')
or die "Could not store worker";
$sth->finish;
$worker->analysis_id( $analysis_id );
unless($job) { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->decrease_required_workers($stats->analysis_id);
if($job) {
$worker->_specific_job($job);
} else { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->decrease_required_workers($worker->analysis_id);
}
# The following increment used to be done only when no specific task was given to the worker,
......@@ -204,49 +264,12 @@ sub create_new_worker {
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers($stats->analysis_id);
$analysis_stats_adaptor->increase_running_workers($worker->analysis_id);
}
if($hive_log_dir or $worker_log_dir) {
my $dir_revhash = dir_revhash($worker_id);
$worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') .'worker_id_'.$worker_id;
# Note: the following die-message will not reach the log files for circular reason!
system("mkdir -p $worker_log_dir") && die "Could not create '$worker_log_dir' because: $!";
my $sql_add_log = "UPDATE worker SET log_dir=? WHERE worker_id=?";
my $sth_add_log = $self->prepare($sql_add_log);
$sth_add_log->execute($worker_log_dir, $worker_id);
$sth_add_log->finish;
}
my $worker = $self->fetch_by_dbID($worker_id)
or die "Could not fetch worker with dbID=$worker_id";
$worker->init;
$worker->_specific_job($job) if($job);
$worker->execute_writes(0) if($no_write);
$worker->debug($debug) if($debug);
if($job_limit) {
$worker->job_limit($job_limit);
$worker->life_span(0);
}
if($life_span) {
$worker->life_span($life_span * 60);
}
if($no_cleanup) {
$worker->perform_cleanup(0);
}
if(defined $retry_throwing_jobs) {
$worker->retry_throwing_jobs($retry_throwing_jobs);
}
if(defined $compile_module_once) {
$worker->compile_module_once($compile_module_once);
}
my $sth_update_analysis_id = $self->prepare( "UPDATE worker SET analysis_id=? WHERE worker_id=?" );
$sth_update_analysis_id->execute($worker->analysis_id, $worker->dbID);
$sth_update_analysis_id->finish;
$stats->update_status('WORKING');
......@@ -270,7 +293,7 @@ sub register_worker_death {
$self->dbc->do( $sql );
unless( $self->db->hive_use_triggers() ) {
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis_id);
}
if($cod eq 'NO_WORK') {
......@@ -911,9 +934,9 @@ sub register_all_workers_dead {
}
sub specialize_new_worker {
sub suggest_analysis_to_specialize_by_rc_id {
my $self = shift;
my $rc_id = shift; # this parameter will percolate into fetch
my $rc_id = shift;
my @suitable_analyses = @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id( $rc_id ) };
......
......@@ -87,21 +87,22 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my($analysis_id, $meadow_type, $meadow_name, $host, $process_id, $work_done, $status, $born, $last_check_in, $died, $cause_of_death, $log_dir) =
rearrange([qw(analysis_id meadow_type meadow_name host process_id work_done status born last_check_in died cause_of_death log_dir) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->meadow_type($meadow_type) if(defined($meadow_type));
$self->meadow_name($meadow_name) if(defined($meadow_name));
$self->host($host) if(defined($host));
$self->process_id($process_id) if(defined($process_id));
$self->work_done($work_done) if(defined($work_done));
$self->status($status) if(defined($status));
$self->born($born) if(defined($born));
$self->last_check_in($last_check_in) if(defined($last_check_in));
$self->died($died) if(defined($died));
$self->cause_of_death($cause_of_death) if(defined($cause_of_death));
$self->log_dir($log_dir) if(defined($log_dir));
my($analysis_id, $meadow_type, $meadow_name, $host, $process_id, $resource_class_id, $work_done, $status, $born, $last_check_in, $died, $cause_of_death, $log_dir) =
rearrange([qw(analysis_id meadow_type meadow_name host process_id resource_class_id work_done status born last_check_in died cause_of_death log_dir) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->meadow_type($meadow_type) if(defined($meadow_type));
$self->meadow_name($meadow_name) if(defined($meadow_name));
$self->host($host) if(defined($host));
$self->process_id($process_id) if(defined($process_id));
$self->resource_class_id($resource_class_id) if(defined($resource_class_id));
$self->work_done($work_done) if(defined($work_done));
$self->status($status) if(defined($status));
$self->born($born) if(defined($born));
$self->last_check_in($last_check_in) if(defined($last_check_in));
$self->died($died) if(defined($died));
$self->cause_of_death($cause_of_death) if(defined($cause_of_death));
$self->log_dir($log_dir) if(defined($log_dir));
return $self;
}
......@@ -157,6 +158,13 @@ sub process_id {
}
sub resource_class_id {
my $self = shift;
$self->{'_resource_class_id'} = shift if(@_);
return $self->{'_resource_class_id'};
}
sub work_done {
my $self = shift;
$self->{'_work_done'} = shift if(@_);
......
......@@ -100,13 +100,8 @@ eval {
-meadow_name => $meadow_name,
-process_id => $process_id,
-exec_host => $exec_host,
# Task specification:
-rc_id => $rc_id,
-rc_name => $rc_name,
-analysis_id => $analysis_id,
-logic_name => $logic_name,
-job_id => $job_id,
# Worker control parameters:
-job_limit => $job_limit,
......@@ -117,18 +112,23 @@ eval {
-hive_log_dir => $hive_log_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
-compile_module_once => $compile_module_once,
-force => $force,
# Other parameters:
-debug => $debug,
);
$queen->specialize_new_worker( $worker,
-analysis_id => $analysis_id,
-logic_name => $logic_name,
-job_id => $job_id,
-force => $force,
);
};
my $msg_thrown = $@;
if($worker) {
$worker->run();
# warn "Not really running the worker (debug)";
} else {
......
......@@ -18,5 +18,6 @@ ALTER TABLE job_file ADD FOREIGN KEY (job_id)
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE worker ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE dataflow_rule ADD FOREIGN KEY (funnel_dataflow_rule_id) REFERENCES dataflow_rule(dataflow_rule_id);
# allow the analysis_id to be NULL before specialization:
ALTER TABLE worker MODIFY COLUMN analysis_id int(10) unsigned DEFAULT NULL;
# since it is a life-long property of the worker, let's keep it in the DB:
ALTER TABLE worker ADD COLUMN resource_class_id int(10) unsigned NOT NULL;
......@@ -71,22 +71,24 @@ CREATE TABLE analysis_base (
--
CREATE TABLE worker (
worker_id int(10) unsigned NOT NULL AUTO_INCREMENT,
analysis_id int(10) unsigned NOT NULL,
meadow_type varchar(40) NOT NULL,
meadow_name varchar(40) DEFAULT NULL,
host varchar(40) DEFAULT NULL,
process_id varchar(40) DEFAULT NULL,
work_done int(11) DEFAULT '0' NOT NULL,
status enum('READY','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DEAD') DEFAULT 'READY' NOT NULL,
born timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
cause_of_death enum('NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'CONTAMINATED', 'KILLED_BY_USER', 'MEMLIMIT', 'RUNLIMIT', 'UNKNOWN') DEFAULT NULL,
log_dir varchar(80) DEFAULT NULL,
PRIMARY KEY (worker_id),
INDEX analysis_status (analysis_id, status)
worker_id int(10) unsigned NOT NULL AUTO_INCREMENT,
meadow_type varchar(40) NOT NULL,
meadow_name varchar(40) NOT NULL,
host varchar(40) NOT NULL,
process_id varchar(40) NOT NULL,
resource_class_id int(10) unsigned NOT NULL,
analysis_id int(10) unsigned DEFAULT NULL,
work_done int(11) DEFAULT '0' NOT NULL,
status enum('READY','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DEAD') DEFAULT 'READY' NOT NULL,
born timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
cause_of_death enum('NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'CONTAMINATED', 'KILLED_BY_USER', 'MEMLIMIT', 'RUNLIMIT', 'UNKNOWN') DEFAULT NULL,
log_dir varchar(80) DEFAULT NULL,
PRIMARY KEY (worker_id),
INDEX analysis_status (analysis_id, status)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
......@@ -64,12 +64,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name);
--
CREATE TABLE worker (
worker_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
analysis_id INTEGER NOT NULL,
meadow_type TEXT NOT NULL, /* enum('LSF', 'LOCAL') NOT NULL, */
meadow_name varchar(40) DEFAULT NULL,
host varchar(40) DEFAULT NULL,
process_id varchar(40) DEFAULT NULL,
worker_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
meadow_type varchar(40) NOT NULL,
meadow_name varchar(40) NOT NULL,
host varchar(40) NOT NULL,
process_id varchar(40) NOT NULL,
resource_class_id INTEGER NOT NULL,
analysis_id INTEGER DEFAULT NULL,
work_done int(11) DEFAULT '0' NOT NULL,
status TEXT DEFAULT 'READY' NOT NULL, /* enum('READY','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DEAD') DEFAULT 'READY' NOT NULL, */
born timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
......@@ -285,7 +287,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS resource_class_name_idx ON resource_class (nam
CREATE TABLE resource_description (
resource_class_id INTEGER NOT NULL,
meadow_type TEXT, /* enum('LSF', 'LOCAL') DEFAULT 'LSF' NOT NULL, */
meadow_type varchar(40) NOT NULL,
parameters varchar(255) DEFAULT '' NOT NULL,
PRIMARY KEY(resource_class_id, meadow_type)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment