Queen.pm 32 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1
2
3
=pod 

=head1 NAME
4

5
6
7
    Bio::EnsEMBL::Hive::Queen

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
8

9
10
11
12
    The Queen of the Hive based job control system is responsible to 'birthing' the
    correct number of workers of the right type so that they can find jobs to do.
    It will also free up jobs of Workers that died unexpectantly so that other workers
    can claim them to do.
13

14
15
16
17
    Hive based processing is a concept based on a more controlled version
    of an autonomous agent type system.  Each worker is not told what to do
    (like a centralized control system - like the current pipeline system)
    but rather queries a central database for jobs (give me jobs).
Jessica Severin's avatar
Jessica Severin committed
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
    Each worker is linked to an analysis_id, registers its self on creation
    into the Hive, creates a RunnableDB instance of the Analysis->module,
    gets $analysis->stats->batch_size jobs from the job table, does its work,
    creates the next layer of job entries by interfacing to
    the DataflowRuleAdaptor to determine the analyses it needs to pass its
    output data to and creates jobs on the next analysis database.
    It repeats this cycle until it has lived its lifetime or until there are no
    more jobs left.
    The lifetime limit is just a safety limit to prevent these from 'infecting'
    a system.

    The Queens job is to simply birth Workers of the correct analysis_id to get the
    work down.  The only other thing the Queen does is free up jobs that were
    claimed by Workers that died unexpectantly so that other workers can take
    over the work.
34

35
36
37
38
39
40
41
42
    The Beekeeper is in charge of interfacing between the Queen and a compute resource
    or 'compute farm'.  Its job is to query Queens if they need any workers and to
    send the requested number of workers to open machines via the runWorker.pl script.
    It is also responsible for interfacing with the Queen to identify worker which died
    unexpectantly.

=head1 LICENSE

43
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Matthieu Muffato's avatar
Matthieu Muffato committed
44
    Copyright [2016-2017] EMBL-European Bioinformatics Institute
45
46
47
48
49
50
51
52
53

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
54
55

=head1 CONTACT
56

57
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
Jessica Severin's avatar
Jessica Severin committed
58
59

=head1 APPENDIX
60

61
62
    The rest of the documentation details each of the object methods. 
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
63
64
65

=cut

66

Jessica Severin's avatar
Jessica Severin committed
67
68
69
package Bio::EnsEMBL::Hive::Queen;

use strict;
70
use warnings;
71
use File::Path 'make_path';
72
use List::Util qw(max);
73

74
use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash');  # NB: needed by invisible code
75
use Bio::EnsEMBL::Hive::Role;
76
use Bio::EnsEMBL::Hive::Scheduler;
77
use Bio::EnsEMBL::Hive::Worker;
Jessica Severin's avatar
Jessica Severin committed
78

79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');


sub default_table_name {
    return 'worker';
}


sub default_insertion_method {
    return 'INSERT';
}


sub object_class {
    return 'Bio::EnsEMBL::Hive::Worker';
}
95

Jessica Severin's avatar
Jessica Severin committed
96

97
############################
Jessica Severin's avatar
Jessica Severin committed
98
#
99
# PUBLIC API
Jessica Severin's avatar
Jessica Severin committed
100
#
101
############################
Jessica Severin's avatar
Jessica Severin committed
102

103

Jessica Severin's avatar
Jessica Severin committed
104
105
=head2 create_new_worker

106
107
108
109
  Description: Creates an entry in the worker table,
               populates some non-storable attributes
               and returns a Worker object based on that insert.
               This guarantees that each worker registered in this Queen's hive is properly registered.
Jessica Severin's avatar
Jessica Severin committed
110
  Returntype : Bio::EnsEMBL::Hive::Worker
111
  Caller     : runWorker.pl
Jessica Severin's avatar
Jessica Severin committed
112
113
114
115

=cut

sub create_new_worker {
116
117
    my $self    = shift @_;
    my %flags   = @_;
118

119
    my ($meadow_type, $meadow_name, $process_id, $meadow_host, $meadow_user, $resource_class_id, $resource_class_name,
120
        $no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize)
121
     = @flags{qw(-meadow_type -meadow_name -process_id -meadow_host -meadow_user -resource_class_id -resource_class_name
122
            -no_write -debug -worker_log_dir -hive_log_dir -job_limit -life_span -no_cleanup -retry_throwing_jobs -can_respecialize)};
123

124
125
126
127
128
129
130
131
132
133
134
135
    foreach my $prev_worker_incarnation (@{ $self->fetch_all( "status!='DEAD' AND meadow_type='$meadow_type' AND meadow_name='$meadow_name' AND process_id='$process_id'" ) }) {
            # so far 'RELOCATED events' has been detected on LSF 9.0 in response to sending signal #99 or #100
            # Since I don't know how to avoid them, I am trying to register them when they happen.
            # The following snippet buries the previous incarnation of the Worker before starting a new one.
            #
            # FIXME: if GarabageCollector (beekeeper -dead) gets to these processes first, it will register them as DEAD/UNKNOWN.
            #       LSF 9.0 does not report "rescheduling" events in the output of 'bacct', but does mention them in 'bhist'.
            #       So parsing 'bhist' output would probably yield the most accurate & confident registration of these events.
        $prev_worker_incarnation->cause_of_death( 'RELOCATED' );
        $self->register_worker_death( $prev_worker_incarnation );
    }

136
137
    my $resource_class;

138
    if( defined($resource_class_name) ) {
139
        $resource_class = $self->db->get_ResourceClassAdaptor->fetch_by_name($resource_class_name)
140
            or die "resource_class with name='$resource_class_name' could not be fetched from the database";
141
142
143
    } elsif( defined($resource_class_id) ) {
        $resource_class = $self->db->get_ResourceClassAdaptor->fetch_by_dbID($resource_class_id)
            or die "resource_class with dbID='$resource_class_id' could not be fetched from the database";
144
145
    }

146
147
148
    my $worker = Bio::EnsEMBL::Hive::Worker->new(
        'meadow_type'       => $meadow_type,
        'meadow_name'       => $meadow_name,
149
150
        'meadow_host'       => $meadow_host,
        'meadow_user'       => $meadow_user,
151
152
153
154
155
        'process_id'        => $process_id,
        'resource_class'    => $resource_class,
    );
    $self->store( $worker );
    my $worker_id = $worker->dbID;
156

157
    $worker = $self->fetch_by_dbID( $worker_id )    # refresh the object to get the fields initialized at SQL level (timestamps in this case)
158
159
        or die "Could not fetch worker with dbID=$worker_id";

160
161
162
163
    if($hive_log_dir or $worker_log_dir) {
        my $dir_revhash = dir_revhash($worker_id);
        $worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') .'worker_id_'.$worker_id;

164
165
        eval {
            make_path( $worker_log_dir );
166
167
            1;
        } or die "Could not create '$worker_log_dir' directory : $@";
168

169
170
        $worker->log_dir( $worker_log_dir );
        $self->update_log_dir( $worker );   # autoloaded
171
172
173
174
    }

    $worker->init;

175
176
    if(defined($job_limit)) {
      $worker->job_limiter($job_limit);
177
178
179
      $worker->life_span(0);
    }

180
    $worker->life_span($life_span * 60)                 if($life_span); # $life_span min -> sec
181
182
183
184
185
186
187
188
189

    $worker->execute_writes(0)                          if($no_write);

    $worker->perform_cleanup(0)                         if($no_cleanup);

    $worker->debug($debug)                              if($debug);

    $worker->retry_throwing_jobs($retry_throwing_jobs)  if(defined $retry_throwing_jobs);

190
191
    $worker->can_respecialize($can_respecialize)        if(defined $can_respecialize);

192
193
194
195
    return $worker;
}


196
=head2 specialize_worker
197

198
199
  Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
               If not specified the Queen will analyze the hive and pick the most suitable analysis.
200
  Caller     : Bio::EnsEMBL::Hive::Worker
201
202
203

=cut

204
sub specialize_worker {
205
206
    my $self    = shift @_;
    my $worker  = shift @_;
207
    my $flags   = shift @_;
208

209
210
    my ($analyses_pattern, $job_id, $force)
     = @$flags{qw(-analyses_pattern -job_id -force)};
211

212
213
    if( $analyses_pattern and $job_id ) {
        die "At most one of the options {-analyses_pattern, -job_id} can be set to pre-specialize a Worker";
Leo Gordon's avatar
Leo Gordon committed
214
215
    }

216
    my $analysis;
217

218
    if( $job_id ) {
219

220
        warn "resetting and fetching job for job_id '$job_id'\n";
221

222
        my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
223

224
225
226
        my $job = $job_adaptor->fetch_by_dbID( $job_id )
            or die "Could not fetch job with dbID='$job_id'";
        my $job_status = $job->status();
227

228
229
230
231
        if($job_status =~/(CLAIMED|PRE_CLEANUP|FETCH_INPUT|RUN|WRITE_OUTPUT|POST_CLEANUP)/ ) {
            die "Job with dbID='$job_id' is already in progress, cannot run";   # FIXME: try GC first, then complain
        } elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
            die "Job with dbID='$job_id' is $job_status, please use -force 1 to override";
232
233
        }

234
235
236
        $analysis = $job->analysis;
        if(($analysis->stats->status eq 'BLOCKED') and !$force) {
            die "Analysis is BLOCKED, can't specialize a worker. Please use -force 1 to override";
237
238
        }

239
240
241
        if(($job_status eq 'DONE') and $job->semaphored_job_id) {
            warn "Increasing the semaphore count of the dependent job";
            $job_adaptor->increase_semaphore_count_for_jobid( $job->semaphored_job_id );
242
243
        }

244
245
246
        my %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
        $analysis->stats->adaptor->increment_a_counter( $status2counter{$job->status}, -1, $job->analysis_id );

247
    } else {
248

249
250
        $analyses_pattern //= '%';  # for printing
        my $analyses_matching_pattern   = Bio::EnsEMBL::Hive::Analysis->collection()->find_all_by_pattern( $analyses_pattern );
251

252
253
254
255
            # refresh the stats of matching analyses before re-specialization:
        foreach my $analysis ( @$analyses_matching_pattern ) {
            $analysis->stats->refresh();
        }
256
257

        $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_matching_pattern, $analyses_pattern);
258
259
260
261
262
263
264
265

        unless( ref($analysis) ) {

            $worker->cause_of_death('NO_ROLE');

            my $msg = $analysis // "No analysis suitable for the worker was found";
            die "$msg\n";
        }
266
    }
267

268
269
    my $new_role = Bio::EnsEMBL::Hive::Role->new(
        'worker'        => $worker,
270
        'analysis'      => $analysis,
271
    );
272
    $self->db->get_RoleAdaptor->store( $new_role );
273
274
    $worker->current_role( $new_role );

275
276
    my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;

277
278
279
280
281
282
283
284
285
    if($job_id) {
        my $role_id = $new_role->dbID;
        if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {

            $worker->special_batch( [ $job ] );
        } else {
            die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
        }

Leo Gordon's avatar
Leo Gordon committed
286
    } else {    # Note: special batch Workers should avoid flipping the status to 'WORKING' in case the analysis is still 'BLOCKED'
287

288
        $analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING');
289
290
291
292
293
294
295
296
    }

        # The following increment used to be done only when no specific task was given to the worker,
        # thereby excluding such "special task" workers from being counted in num_running_workers.
        #
        # However this may be tricky to emulate by triggers that know nothing about "special tasks",
        # so I am (temporarily?) simplifying the accounting algorithm.
        #
297
    $analysis_stats_adaptor->increment_a_counter( 'num_running_workers', 1, $analysis->dbID );
Jessica Severin's avatar
Jessica Severin committed
298
299
}

300

Jessica Severin's avatar
Jessica Severin committed
301
sub register_worker_death {
302
    my ($self, $worker, $update_when_checked_in) = @_;
303

304
305
306
    my $worker_id       = $worker->dbID;
    my $work_done       = $worker->work_done;
    my $cause_of_death  = $worker->cause_of_death || 'UNKNOWN';    # make sure we do not attempt to insert a void
307
    my $worker_died     = $worker->when_died;
308

309
310
311
    my $current_role    = $worker->current_role;

    unless( $current_role ) {
312
        $worker->current_role( $current_role = $self->db->get_RoleAdaptor->fetch_last_unfinished_by_worker_id( $worker_id ) );
313
314
    }

315
    if( $current_role and !$current_role->when_finished() ) {
316
        # List of cause_of_death:
317
318
319
        # only happen before or after a batch: 'NO_ROLE','NO_WORK','JOB_LIMIT','HIVE_OVERLOAD','LIFESPAN','SEE_MSG'
        # can happen whilst the worker is running a batch: 'CONTAMINATED','RELOCATED','KILLED_BY_USER','MEMLIMIT','RUNLIMIT','SEE_MSG','UNKNOWN'
        my $release_undone_jobs = ($cause_of_death =~ /^(CONTAMINATED|RELOCATED|KILLED_BY_USER|MEMLIMIT|RUNLIMIT|SEE_MSG|UNKNOWN)$/);
320
        $current_role->worker($worker); # So that release_undone_jobs_from_role() has the correct cause_of_death and work_done
321
        $current_role->when_finished( $worker_died );
322
        $self->db->get_RoleAdaptor->finalize_role( $current_role, $release_undone_jobs );
323
    }
324
325

    my $sql = "UPDATE worker SET status='DEAD', work_done='$work_done', cause_of_death='$cause_of_death'"
326
327
            . ( $update_when_checked_in ? ', when_checked_in=CURRENT_TIMESTAMP ' : '' )
            . ( $worker_died ? ", when_died='$worker_died'" : ', when_died=CURRENT_TIMESTAMP' )
328
329
            . " WHERE worker_id='$worker_id' ";

330
331
332
    $self->dbc->protected_prepare_execute( [ $sql ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "register_worker_death".$after, 0 ); }
    );
Jessica Severin's avatar
Jessica Severin committed
333
334
}

335

336
sub meadow_type_2_name_2_users_of_running_workers {
337
338
    my $self = shift @_;

339
    return $self->count_all("status!='DEAD'", ['meadow_type', 'meadow_name', 'meadow_user']);
340
341
342
}


343
sub check_for_dead_workers {    # scans the whole Valley for lost Workers (but ignores unreachable ones)
344
    my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
Leo Gordon's avatar
Leo Gordon committed
345

346
347
    my $last_few_seconds            = 5;    # FIXME: It is probably a good idea to expose this parameter for easier tuning.

348
    warn "GarbageCollector:\tChecking for lost Workers...\n";
349

350
351
352
353
354
355
356
357
358
359
360
    my $meadow_type_2_name_2_users      = $self->meadow_type_2_name_2_users_of_running_workers();
    my %signature_and_pid_to_worker_status = ();

    while(my ($meadow_type, $level2) = each %$meadow_type_2_name_2_users) {

        if(my $meadow = $valley->available_meadow_hash->{$meadow_type}) {   # if this Valley supports $meadow_type at all...
            while(my ($meadow_name, $level3) = each %$level2) {

                if($meadow->cached_name eq $meadow_name) {  # and we can reach the same $meadow_name from this Valley...
                    my $meadow_users_of_interest    = [ keys %$level3 ];
                    my $meadow_signature            = $meadow_type.'/'.$meadow_name;
Leo Gordon's avatar
Leo Gordon committed
361

362
363
364
365
366
367
368
                    $signature_and_pid_to_worker_status{$meadow_signature} ||= $meadow->status_of_all_our_workers( $meadow_users_of_interest );
                }
            }
        }
    }

    my $queen_overdue_workers       = $self->fetch_overdue_workers( $last_few_seconds );    # check the workers we have not seen active during the $last_few_seconds
369
    warn "GarbageCollector:\t[Queen:] out of ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
370

371
372
373
    my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id=?";
    my $update_when_seen_sth;

374
375
    my %meadow_status_counts        = ();
    my %mt_and_pid_to_lost_worker   = ();
376
    foreach my $worker (@$queen_overdue_workers) {
Leo Gordon's avatar
Leo Gordon committed
377

378
379
        my $meadow_signature    = $worker->meadow_type.'/'.$worker->meadow_name;
        if(my $pid_to_worker_status = $signature_and_pid_to_worker_status{$meadow_signature}) {   # the whole Meadow subhash is either present or the Meadow is unreachable
Leo Gordon's avatar
Leo Gordon committed
380

381
382
            my $meadow_type = $worker->meadow_type;
            my $process_id  = $worker->process_id;
383
384
385
386
387
388
389
390
            my $status = $pid_to_worker_status->{$process_id};

            if($bury_unkwn_workers and ($status eq 'UNKWN')) {
                if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
                    if($meadow->can('kill_worker')) {
                        if($worker->meadow_user eq $ENV{'USER'}) {  # if I'm actually allowed to kill the worker...
                            warn "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id";

391
392
                            $meadow->kill_worker($worker, 1);
                            $status = ''; # make it look like LOST
393
394
395
396
397
398
                        }
                    }
                }
            }

            if($status) {  # can be RUN|PEND|xSUSP
399
                $meadow_status_counts{$meadow_signature}{$status}++;
400
401
402
403
404

                    # only prepare once at most:
                $update_when_seen_sth ||= $self->prepare( $update_when_seen_sql );

                $update_when_seen_sth->execute( $worker->dbID );
405
            } else {
406
                $meadow_status_counts{$meadow_signature}{'LOST'}++;
407

408
409
                $mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
            }
410
        } else {
411
            $meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++;   # Worker is unreachable from this Valley
412
        }
413
414
    }

415
416
    $update_when_seen_sth->finish() if $update_when_seen_sth;

417
        # print a quick summary report:
418
419
    while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
        warn "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(', ', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n";
420
    }
421

422
423
424
425
426
427
    while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
        my $this_meadow = $valley->available_meadow_hash->{$meadow_type};

        if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
            warn "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";

428
            my $report_entries;
429

430
            if($this_meadow->can('find_out_causes')) {
431
432
                die "Your Meadow::$meadow_type driver now has to support get_report_entries_for_process_ids() method instead of find_out_causes(). Please update it.\n";

433
            } else {
434
435
436
437
                if ($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
                    my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
                    warn "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
                }
438
439
440
441
            }

            warn "GarbageCollector:\tReleasing the jobs\n";
            while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
442
                $worker->when_died(         $report_entries->{$process_id}{'when_died'} );
443
444
                $worker->cause_of_death(    $report_entries->{$process_id}{'cause_of_death'} );
                $self->register_worker_death( $worker );
445
            }
446
447
448
449
450

            if( %$report_entries ) {    # use the opportunity to also store resource usage of the buried workers:
                my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
                $self->store_resource_usage( $report_entries, $processid_2_workerid );
            }
451
452
453
        }
    }

454
        # the following bit is completely Meadow-agnostic and only restores database integrity:
Leo Gordon's avatar
Leo Gordon committed
455
    if($check_buried_in_haste) {
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
        warn "GarbageCollector:\tChecking for orphan roles...\n";
        my $orphan_roles = $self->db->get_RoleAdaptor->fetch_all_unfinished_roles_of_dead_workers();
        if(my $orphan_role_number = scalar @$orphan_roles) {
            warn "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
            foreach my $orphan_role (@$orphan_roles) {
                $self->db->get_RoleAdaptor->finalize_role( $orphan_role );
            }
        }

        warn "GarbageCollector:\tChecking for orphan jobs...\n";
        my $buried_in_haste_roles = $self->db->get_RoleAdaptor->fetch_all_finished_roles_with_unfinished_jobs();
        if(my $bih_number = scalar @$buried_in_haste_roles) {
            warn "GarbageCollector:\tfound $bih_number buried roles with orphan jobs, reclaiming.\n\n";
            my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
            foreach my $role (@$buried_in_haste_roles) {
                $job_adaptor->release_undone_jobs_from_role( $role );
Leo Gordon's avatar
Leo Gordon committed
472
473
            }
        } else {
474
            warn "GarbageCollector:\tfound none\n";
Leo Gordon's avatar
Leo Gordon committed
475
476
477
        }
    }
}
Jessica Severin's avatar
Jessica Severin committed
478
479


480
481
482
    # a new version that both checks in and updates the status
sub check_in_worker {
    my ($self, $worker) = @_;
Jessica Severin's avatar
Jessica Severin committed
483

484
485
486
487
488
    my $sql = "UPDATE worker SET when_checked_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'";

    $self->dbc->protected_prepare_execute( [ $sql ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $worker, "check_in_worker".$after, 0 ); }
    );
Jessica Severin's avatar
Jessica Severin committed
489
490
491
}


492
=head2 reset_job_by_dbID_and_sync
493

494
  Arg [1]: int $job_id
495
  Example: 
496
    my $job = $queen->reset_job_by_dbID_and_sync($job_id);
497
  Description: 
498
    For the specified job_id it will fetch just that job, 
499
500
501
    reset it completely as if it has never run, and return it.  
    Specifying a specific job bypasses the safety checks, 
    thus multiple workers could be running the 
502
    same job simultaneously (use only for debugging).
503
  Returntype : none
504
  Exceptions :
505
  Caller     : beekeeper.pl
506
507
508

=cut

509
510
sub reset_job_by_dbID_and_sync {
    my ($self, $job_id) = @_;
511

512
513
514
    my $job     = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id);

    my $stats   = $job->analysis->stats;
515
516

    $self->synchronize_AnalysisStats($stats);
517
518
519
}


520
521
522
523
524
525
######################################
#
# Public API interface for beekeeper
#
######################################

526

527
528
529
    # Note: asking for Queen->fetch_overdue_workers(0) essentially means
    #       "fetch all workers known to the Queen not to be officially dead"
    #
530
sub fetch_overdue_workers {
531
    my ($self,$overdue_secs) = @_;
532

533
    $overdue_secs = 3600 unless(defined($overdue_secs));
534

535
    my $constraint = "status!='DEAD' AND ".{
536
537
538
            'mysql'     =>  "(UNIX_TIMESTAMP()-UNIX_TIMESTAMP(when_checked_in)) > $overdue_secs",
            'sqlite'    =>  "(strftime('%s','now')-strftime('%s',when_checked_in)) > $overdue_secs",
            'pgsql'     =>  "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - when_checked_in) > $overdue_secs",
539
540
        }->{ $self->dbc->driver };

541
    return $self->fetch_all( $constraint );
542
543
}

544

545
546
=head2 synchronize_hive

547
548
549
550
551
  Arg [1]    : $list_of_analyses
  Example    : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
  Description: Runs through all analyses in the given list and synchronizes
              the analysis_stats summary with the states in the job and worker tables.
              Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
552
553
554
555
556
557
  Exceptions : none
  Caller     : general

=cut

sub synchronize_hive {
558
    my ($self, $list_of_analyses) = @_;
559

560
    my $start_time = time();
561

562
563
564
565
566
567
    print STDERR "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
    foreach my $analysis (@$list_of_analyses) {
        $self->synchronize_AnalysisStats($analysis->stats);
        print STDERR ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o');
    }
    print STDERR "\n";
Leo Gordon's avatar
Leo Gordon committed
568

569
    print STDERR ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
570
}
571

572

573
574
575
=head2 safe_synchronize_AnalysisStats

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
576
  Example    : $self->safe_synchronize_AnalysisStats($stats);
577
578
579
  Description: Prewrapper around synchronize_AnalysisStats that does
               checks and grabs sync_lock before proceeding with sync.
               Used by distributed worker sync system to avoid contention.
580
581
582
               Returns 1 on success and 0 if the lock could not have been obtained,
               and so no sync was attempted.
  Returntype : boolean
583
584
585
586
587
  Caller     : general

=cut

sub safe_synchronize_AnalysisStats {
588
589
    my ($self, $stats) = @_;

590
591
    $stats->refresh();

592
593
    my $max_refresh_attempts = 5;
    while($stats->sync_lock and $max_refresh_attempts--) {   # another Worker/Beekeeper is synching this analysis right now
594
            # ToDo: it would be nice to report the detected collision
595
596
597
598
        sleep(1);
        $stats->refresh();  # just try to avoid collision
    }

599
    unless( ($stats->status eq 'DONE')
600
         or ( ($stats->status eq 'WORKING') and defined($stats->seconds_since_when_updated) and ($stats->seconds_since_when_updated < 3*60) ) ) {
601
602
603
604
605

        my $sql = "UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
                  "WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;

        my $row_count = $self->dbc->do($sql);   # try to claim the sync_lock
606

607
608
        if( $row_count == 1 ) {     # if we managed to obtain the lock, let's go and perform the sync:
            $self->synchronize_AnalysisStats($stats);   
609
            return 1;
610
611
        } # otherwise assume it's locked and just return un-updated
    }
612
613

    return 0;
614
615
616
}


617
=head2 synchronize_AnalysisStats
618

619
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
620
  Example    : $self->synchronize_AnalysisStats( $stats );
621
  Description: Queries the job and worker tables to get summary counts
622
623
               and rebuilds the AnalysisStats object.
               Then updates the analysis_stats table with the new summary info.
624
625
  Exceptions : none
  Caller     : general
626

627
=cut
628

629
sub synchronize_AnalysisStats {
630
    my ($self, $stats) = @_;
631

632
    if( $stats and $stats->analysis_id ) {
633

634
        $stats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
635

636
        my $job_counts = $self->db->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
637

638
        $stats->recalculate_from_job_counts( $job_counts );
639

640
641
642
        # $stats->sync_lock(0); ## do we perhaps need it here?
        $stats->update;  #update and release sync_lock
    }
643
}
644
645


646
647
648
649
650
651
652
=head2 check_nothing_to_run_but_semaphored

  Arg [1]    : $list_of_analyses
  Example    : $self->check_nothing_to_run_but_semaphored( [ $analysis_A, $analysis_B ] );
  Description: Counts the number of immediately runnable jobs in the given analyses.
  Exceptions : none
  Caller     : Scheduler
653

654
655
656
657
=cut

sub check_nothing_to_run_but_semaphored {   # make sure it is run after a recent sync
    my ($self, $list_of_analyses) = @_;
658
659
660
661
662
663
664

    my $only_semaphored_jobs_to_run = 1;
    my $total_semaphored_job_count  = 0;

    foreach my $analysis (@$list_of_analyses) {
        my $stats = $analysis->stats;

665
        $only_semaphored_jobs_to_run = 0 if( $stats->total_job_count != $stats->done_job_count + $stats->failed_job_count + $stats->semaphored_job_count );
666
667
668
669
670
671
672
        $total_semaphored_job_count += $stats->semaphored_job_count;
    }

    return ( $total_semaphored_job_count && $only_semaphored_jobs_to_run );
}


673
=head2 print_status_and_return_reasons_to_exit
674

675
  Arg [1]    : $list_of_analyses
676
  Example    : my $reasons_to_exit = $queen->print_status_and_return_reasons_to_exit( [ $analysis_A, $analysis_B ] );
677
678
  Description: Runs through all analyses in the given list, reports failed analyses, computes some totals, prints a combined status line
                and returns a pair of ($failed_analyses_counter, $total_jobs_to_do)
679
  Exceptions : none
680
  Caller     : beekeeper.pl
681
682
683

=cut

684
sub print_status_and_return_reasons_to_exit {
685
    my ($self, $list_of_analyses) = @_;
686

687
688
    my ($total_done_jobs, $total_failed_jobs, $total_jobs, $cpumsec_to_do) = (0) x 4;
    my $reasons_to_exit = '';
689

690
691
    my $max_logic_name_length = max(map {length($_->logic_name)} @$list_of_analyses);

692
    foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
693
694
695
        my $stats               = $analysis->stats;
        my $failed_job_count    = $stats->failed_job_count;

696
        print $stats->toString($max_logic_name_length) . "\n";
697

698
        if( $stats->status eq 'FAILED') {
699
700
701
            my $logic_name    = $analysis->logic_name;
            my $tolerance     = $analysis->failed_job_tolerance;
            $reasons_to_exit .= "### Analysis '$logic_name' has FAILED  (failed Jobs: $failed_job_count, tolerance: $tolerance\%) ###\n";
702
        }
703

704
705
706
707
        $total_done_jobs    += $stats->done_job_count;
        $total_failed_jobs  += $failed_job_count;
        $total_jobs         += $stats->total_job_count;
        $cpumsec_to_do      += $stats->ready_job_count * $stats->avg_msec_per_job;
708
709
    }

710
    my $total_jobs_to_do        = $total_jobs - $total_done_jobs - $total_failed_jobs;         # includes SEMAPHORED, READY, CLAIMED, INPROGRESS
711
    my $cpuhrs_to_do            = $cpumsec_to_do / (1000.0*60*60);
712
713
    my $percentage_completed    = $total_jobs
                                    ? (($total_done_jobs+$total_failed_jobs)*100.0/$total_jobs)
714
715
                                    : 0.0;

716
    printf("total over %d analyses : %6.2f%% complete (< %.2f CPU_hrs) (%d to_do + %d done + %d failed = %d total)\n",
717
                scalar(@$list_of_analyses), $percentage_completed, $cpuhrs_to_do, $total_jobs_to_do, $total_done_jobs, $total_failed_jobs, $total_jobs);
718

719
720
    unless( $total_jobs_to_do ) {
        $reasons_to_exit .= "### No jobs left to do ###\n";
Leo Gordon's avatar
Leo Gordon committed
721
    }
722
723

    return $reasons_to_exit;
724
725
726
}


Leo Gordon's avatar
Leo Gordon committed
727
728
729
730
731
=head2 register_all_workers_dead

  Example    : $queen->register_all_workers_dead();
  Description: Registers all workers dead
  Exceptions : none
732
  Caller     : beekeeper.pl
Leo Gordon's avatar
Leo Gordon committed
733
734
735
736
737
738

=cut

sub register_all_workers_dead {
    my $self = shift;

739
740
    my $all_workers_considered_alive = $self->fetch_all( "status!='DEAD'" );
    foreach my $worker (@{$all_workers_considered_alive}) {
741
        $self->register_worker_death( $worker );
Leo Gordon's avatar
Leo Gordon committed
742
743
744
    }
}

745

746
747
748
749
750
751
sub interval_workers_with_unknown_usage {
    my $self = shift @_;

    my %meadow_to_interval = ();

    my $sql_times = qq{
752
        SELECT meadow_type, meadow_name, min(when_born), max(when_died), count(*)
753
        FROM worker w
754
        LEFT JOIN worker_resource_usage u USING(worker_id)
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
        WHERE u.worker_id IS NULL
        GROUP BY meadow_type, meadow_name
    };
    my $sth_times = $self->prepare( $sql_times );
    $sth_times->execute();
    while( my ($meadow_type, $meadow_name, $min_born, $max_died, $workers_count) = $sth_times->fetchrow_array() ) {
        $meadow_to_interval{$meadow_type}{$meadow_name} = {
            'min_born'      => $min_born,
            'max_died'      => $max_died,
            'workers_count' => $workers_count,
        };
    }
    $sth_times->finish();

    return \%meadow_to_interval;
}


773
774
775
sub store_resource_usage {
    my ($self, $report_entries, $processid_2_workerid) = @_;

776
777
778
779
780
781
782
    # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.

    my $sql_delete = 'DELETE FROM worker_resource_usage WHERE worker_id=?';
    my $sth_delete = $self->prepare( $sql_delete );

    my $sql_insert = 'INSERT INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)';
    my $sth_insert = $self->prepare( $sql_insert );
783
784
785
786
787
788

    my @not_ours = ();

    while( my ($process_id, $report_entry) = each %$report_entries ) {

        if( my $worker_id = $processid_2_workerid->{$process_id} ) {
789
            $sth_delete->execute( $worker_id );
790
791
792
793
794
795
796
797
798
799
800

            eval {
                $sth_insert->execute( $worker_id, @$report_entry{'exit_status', 'mem_megs', 'swap_megs', 'pending_sec', 'cpu_sec', 'lifespan_sec', 'exception_status'} );  # slicing hashref
                1;
            } or do {
                if($@ =~ /execute failed: Duplicate entry/s) {     # ignore the collision with another parallel beekeeper
                    $self->db->get_LogMessageAdaptor()->store_worker_message($worker_id, "Collision detected when storing resource_usage", 0 );
                } else {
                    die $@;
                }
            };
801
802
803
804
805
        } else {
            push @not_ours, $process_id;
            #warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
        }
    }
806
807
    $sth_delete->finish();
    $sth_insert->finish();
808
809
810
}


Jessica Severin's avatar
Jessica Severin committed
811
1;