GuestProcess.pm 19.4 KB
Newer Older
1 2 3 4
=pod

=head1 NAME

5
Bio::EnsEMBL::Hive::GuestProcess
6

Matthieu Muffato's avatar
Matthieu Muffato committed
7 8 9 10 11 12 13
=head1 SYNOPSIS

This is a variant of Bio::EnsEMBL::Hive::Process that forks into a wrapper that can itself
run jobs (runnables) written in a different language

=head1 DESCRIPTION

14
Upon initialisation, GuestProcess forks, and the child process executes the wrapper that
15
will allow running Runnables of the other language. The communication is ensured by two
16
pipes and is schematically similar to running "| wrapper |", except that GuestProcess
17 18 19
uses non-standard file descriptors, thus allowing the Runnable to still use std{in,out,err}.

The wrapper receives the two file-numbers that it is meant to use (one for reading data
20
from GuestProcess, and one to send data to GuestProcess). All the messages are passed
21
around in single-line JSON structures. The protocol is described below using the convention:
Matthieu Muffato's avatar
Matthieu Muffato committed
22 23 24
    ---> represents a message sent to the child process,
    <--- represents a message sent by the child process

25 26 27 28 29 30 31 32
The initialisation (in the constructor) consists in checking that both sides spek the same
version of the protocol:
    <--- { "version": "XXX" }
    ---> "OK"
GuestProcess will bail out if the response is not "OK"

Then, the child process (i.e. the runnable) will send its default parameters to GuestProcess.
This fills the usual param_defaults() section of the Runnable:
Matthieu Muffato's avatar
Matthieu Muffato committed
33 34
    <--- { ... param_defaults ... }
    ---> "OK"
35

Matthieu Muffato's avatar
Matthieu Muffato committed
36
The child process then goes to sleep, waiting for jobs to be seeded. Meanwhile,
37
GuestProcess enters a number of life_cycle() executions (as triggered by Worker).
Matthieu Muffato's avatar
Matthieu Muffato committed
38 39 40 41 42 43 44 45 46 47 48 49
Each one first sends a JSON object to the child process to initialize the job parameters
    ---> {
           "input_job": {
             "parameters": { ... the unsubstituted job parameters as compiled by Worker ... },
             // followed by several attributes of the job
             "input_id": { ...  },
             "dbID": XXX,
             "retry_count": XXX
           },
           "execute_writes": [1|0],
           "debug": XXX
         }
50
    <--- "OK"
Matthieu Muffato's avatar
Matthieu Muffato committed
51

52
From this point, GuestProcess acts as a server, listening to events sent by the child.
Matthieu Muffato's avatar
Matthieu Muffato committed
53 54
Events are JSON objects composed of an "event" field (the name of the event) and a
"content" field (the payload). Events can be of the following kinds (with the expected
55
response from GuestProcess):
Matthieu Muffato's avatar
Matthieu Muffato committed
56 57

    <--- JOB_STATUS_UPDATE
58
         // The content is one of "PRE_CLEANUP", "FETCH_INPUT", "RUN", "WRITE_OUTPUT", "POST_HEALTHCHECK", "POST_CLEANUP"
Matthieu Muffato's avatar
Matthieu Muffato committed
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
    ---> "OK"

    <--- WARNING
         // The content is a JSON object:
            {
              "message": "XXX",
              "is_error": [true|false],
            }
    ---> "OK"

    <--- DATAFLOW
         // The content is a JSON object:
            {
              "branch_name_or_code": XXX,
              "output_ids": an array or a hash,
              "params": {
                "substituted": { ... the parameters that are currently substituted ... }
                "unsubstituted": { ... the parameters that have not yet been substituted ... }
              }
            }
    ---> dbIDs of the jobs that have been created

    <--- WORKER_TEMP_DIRECTORY
82
         // No content needed (ignored)
Matthieu Muffato's avatar
Matthieu Muffato committed
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    ---> returns the temporary directory of the worker

    <--- JOB_END
         // The content is a JSON object describing the final state of the job
            {
              "complete": [true|false],
              "job": {
                "autoflow": [true|false],
                "lethal_for_worker": [true|false],
                "transient_error": [true|false],
              },
              "params": {
                "substituted": { ... the parameters that are currently substituted ... }
                "unsubstituted": { ... the parameters that have not yet been substituted ... }
              }
            }
    ---> "OK"


102 103
=head1 LICENSE

104
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
105
Copyright [2016-2019] EMBL-European Bioinformatics Institute
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates

=cut


123
package Bio::EnsEMBL::Hive::GuestProcess;
124 125 126 127 128 129

use strict;
use warnings;

use JSON;
use IO::Handle;
130

131 132 133 134 135
use Data::Dumper;

use base ('Bio::EnsEMBL::Hive::Process');


136 137
# -------------------------------------- <versioning of the GuestProcess interface> -------------------------------------------------------

138
our $GUESTPROCESS_PROTOCOL_VERSION = '4';       # Make sure you change this number whenever an incompatible change is introduced
139

140 141 142 143 144 145 146 147 148 149

=head2 get_protocol_version

  Example     : print Bio::EnsEMBL::Hive::GuestProcess->get_protocol_version(), "\n";
  Description : Returns the version number of the communication protocol
  Returntype  : String

=cut

sub get_protocol_version {
150 151 152 153 154 155 156 157 158 159
    return $GUESTPROCESS_PROTOCOL_VERSION;
}

sub check_version_compatibility {
    my ($self, $other_version) = @_;

    my $gpv = $self->get_protocol_version();
#    warn "$self :  GPV='$gpv', MV='$other_version'\n";

    return ((defined $other_version) and ($other_version=~/^$gpv\./)) ? 1 : 0;
160 161
}

162
# -------------------------------------- </versioning of the GuestProcess interface> ------------------------------------------------------
163 164


Matthieu Muffato's avatar
Matthieu Muffato committed
165 166 167 168
=head2 new

  Arg[1]      : $language: the programming language the external runnable is in
  Arg[2]      : $module: the name of the runnable (usually a package name)
169
  Example     : Bio::EnsEMBL::Hive::GuestProcess->new();
Matthieu Muffato's avatar
Matthieu Muffato committed
170
  Description : Constructor
171
  Returntype  : Bio::EnsEMBL::Hive::GuestProcess
Matthieu Muffato's avatar
Matthieu Muffato committed
172 173 174 175 176
  Exceptions  : if $language or $module is not defined properly or if the pipes /
                child process could not be created

=cut

177 178
sub new {

179
    my ($class, $debug, $language, $module) = @_;
180

181
    die "GuestProcess must be told which language to interface with" unless $language;
182 183

    my $wrapper = _get_wrapper_for_language($language);
184
    die "GuestProcess must be told which module to run" unless $module;
185

186 187 188
    my ($PARENT_RDR, $PARENT_WTR, $CHILD_WTR,$CHILD_RDR);
    pipe($PARENT_RDR, $CHILD_WTR) or die 'Could not create a pipe to send data to the child !';
    pipe($CHILD_RDR,  $PARENT_WTR) or die 'Could not create a pipe to get data from the child !';;
189

190 191
    my $protocol_debug = ($debug && ($debug > 1));  # Only advanced levels of debug will show the GuestProcess protocol messages
    if ($protocol_debug) {
192 193 194 195
        print "PARENT_RDR is ", fileno($PARENT_RDR), "\n";
        print "PARENT_WTR is ", fileno($PARENT_WTR), "\n";
        print "CHILD_RDR is ", fileno($CHILD_RDR), "\n";
        print "CHILD_WTR is ", fileno($CHILD_WTR), "\n";
196
    }
197 198 199 200 201

    my $pid;

    if ($pid = fork()) {
        # In the parent
202 203
        close $PARENT_RDR;
        close $PARENT_WTR;
204
        print "parent is PID $$\n" if $protocol_debug;
205 206 207
    } else {
        die "cannot fork: $!" unless defined $pid;
        # In the child
208 209
        close $CHILD_RDR;
        close $CHILD_WTR;
210
        print "child is PID $$\n" if $protocol_debug;
211 212 213

        # Do not close the non-standard file descriptors on exec(): the child process will need them !
        use Fcntl;
214 215 216 217
        my $flags = fcntl($PARENT_RDR, F_GETFD, 0);
        fcntl($PARENT_RDR, F_SETFD, $flags & ~FD_CLOEXEC);
        $flags = fcntl($PARENT_WTR, F_GETFD, 0);
        fcntl($PARENT_WTR, F_SETFD, $flags & ~FD_CLOEXEC);
218

219
        exec($wrapper, 'run', $module, fileno($PARENT_RDR), fileno($PARENT_WTR), $debug//0);
220 221 222
    }


223
    $CHILD_WTR->autoflush(1);
224 225 226

    my $self = bless {}, $class;

227 228
    $self->child_out($CHILD_RDR);
    $self->child_in($CHILD_WTR);
229 230
    $self->child_pid($pid);
    $self->json_formatter( JSON->new()->indent(0) );
231
    $self->{'_protocol_debug'} = $protocol_debug; # controls the GuestProcess protocol, not the worker
232

233 234
    $self->print_debug('CHECK VERSION NUMBER');
    my $other_version = $self->read_message()->{content};
235
    if (!$self->check_version_compatibility($other_version)) {
236
        $self->send_response('NO');
237
        die "eHive's protocol version is '".$self->get_protocol_version."' but the wrapper's is '$other_version'\n";
238 239 240 241
    } else {
        $self->send_response('OK');
    }

242
    $self->print_debug("BEFORE READ PARAM_DEFAULTS");
243
    $self->param_defaults( $self->read_message()->{content} );
244
    $self->send_response('OK');
245

246
    $self->print_debug("INIT DONE");
247 248 249 250 251

    return $self;
}


252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
=head2 _get_wrapper_for_language

  Example     : Bio::EnsEMBL::Hive::GuestProcess::_get_wrapper_for_language('python3');
  Description : Finds the wrapper that understands the given language
  Returntype  : String
  Exceptions  : Can die if the wrapper doesn't exist

=cut

sub _get_wrapper_for_language {
    my ($language) = @_;

    my $wrapper = $ENV{'EHIVE_WRAPPER_'.(uc $language)} # User-overriden wrapper
                    || sprintf('%s/wrappers/%s/wrapper', $ENV{'EHIVE_ROOT_DIR'}, $language);  # Embedded wrapper
    if (not -e $wrapper) {
        die "$language is currently not supported\n";
    } elsif (not -s $wrapper) {
        die "The wrapper '$wrapper' is an empty file !\n";
    } elsif (not -x $wrapper) {
        die "No permissions to execute the wrapper '$wrapper'\n";
    }
    return $wrapper;
}


277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
=head2 _get_all_registered_wrappers

  Example     : my $all_languages = Bio::EnsEMBL::Hive::GuestProcess::_get_all_registered_wrappers()
  Description : Lists all the languages and wrappers that are registered (either
                under via a EHIVE_WRAPPER environment variable, or via a "wrapper"
                file under $EHIVE_ROOT_DIR/wrappers/).
  Returntype  : Hashref { String => String }
  Exceptions  : None

=cut

sub _get_all_registered_wrappers {
    my %all_found;
    foreach my $variable (keys %ENV) {
        if ($variable =~ /^EHIVE_WRAPPER_(.*)$/) {
            $all_found{lc $1} = $ENV{$variable};
        }
    }
    foreach my $wrapper (glob $ENV{'EHIVE_ROOT_DIR'}.'/wrappers/*/wrapper' ) {
        $wrapper =~ /\/wrappers\/(.*)\/wrapper$/;
        $all_found{$1} = $wrapper;
    }
    return \%all_found;
}


Matthieu Muffato's avatar
Matthieu Muffato committed
303 304 305 306 307 308 309
=head2 DESTROY

  Description : Destructor: tells the child to exit by sending an empty JSON object
  Returntype  : none

=cut

310 311 312 313 314 315 316 317
sub DESTROY {
    my $self = shift;
    $self->print_debug("DESTROY");
    $self->child_in->print("{}\n");
    #kill('KILL', $self->child_pid);
}


Matthieu Muffato's avatar
Matthieu Muffato committed
318 319 320
=head2 print_debug

  Example     : $process->print_debug("debug message");
321
  Description : Prints a message if $self->{'_protocol_debug'} is set
Matthieu Muffato's avatar
Matthieu Muffato committed
322 323 324 325
  Returntype  : none

=cut

326 327
sub print_debug {
    my ($self, $msg) = @_;
328
    print sprintf("PERL %d: %s\n", $self->child_pid, $msg) if $self->{'_protocol_debug'};
329
}
330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420

##############
# Attributes #
##############


=head2 child_in

  Example     : my $child_in = $process->child_in();
  Example     : $process->child_in(*CHILD_WTR);
  Description : Getter/Setter for the file handle that allows talking to the
                child process.
  Returntype  : IO::Handle
  Exceptions  : none

=cut

sub child_in {
    my $self = shift;
    $self->{'_child_in'} = shift if @_;
    return $self->{'_child_in'};
}

=head2 child_out

  Example     : my $child_out = $process->child_out();
  Example     : $process->child_out(*CHILD_RDR);
  Description : Getter/Setter for the file handle that allows receiving data
                from the child process.
  Returntype  : IO::Handle
  Exceptions  : none

=cut

sub child_out {
    my $self = shift;
    $self->{'_child_out'} = shift if @_;
    return $self->{'_child_out'};
}

=head2 child_pid

  Example     : my $child_pid = $process->child_pid();
  Example     : $process->child_pid($child_pid);
  Description : Getter/Setter for the process ID of the child
  Returntype  : integer
  Exceptions  : none

=cut

sub child_pid {
    my $self = shift;
    $self->{'_child_pid'} = shift if @_;
    return $self->{'_child_pid'};
}


=head2 json_formatter

  Example     : my $json_formatter = $object_name->json_formatter();
  Example     : $object_name->json_formatter($json_formatter);
  Description : Getter/Setter for the JSON formatter.
  Returntype  : instance of JSON
  Exceptions  : none

=cut

sub json_formatter {
    my $self = shift;
    $self->{'_json_formatter'} = shift if @_;
    return $self->{'_json_formatter'};
}


################################
# Communication with the child #
################################

=head2 send_message

  Example     : $process->send_message($perl_structure);
  Description : Send the Perl structure to the child process via the pipe (and
                serialized in JSON).
  Returntype  : none
  Exceptions  : raised by JSON / IO::Handle

=cut

sub send_message {
    my ($self, $struct) = @_;
    my $j = $self->json_formatter->encode($struct);
421
    $self->print_debug("send_message $j");
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454
    $self->child_in->print($j."\n");
}


=head2 send_response

  Example     : $process->send_response('OK');
  Description : Wrapper around send_message to send a response to the child.
  Returntype  : none
  Exceptions  : raised by JSON / IO::Handle

=cut

sub send_response {
    my ($self, $response) = @_;
    return $self->send_message({'response' => $response});
}


=head2 read_message

  Example     : my $msg = $process->read_message();
  Description : Wait for and read the next message coming from the child.
                Again, the message itself is serialized and transmitted
                via the pipe
  Returntype  : Perl structure
  Exceptions  : raised by JSON / IO::Handle

=cut

sub read_message {
    my $self = shift;
    my $s = $self->child_out->getline();
Matthieu Muffato's avatar
Matthieu Muffato committed
455
    die "Did not receive any messages" unless defined $s;
456 457
    chomp $s;
    $self->print_debug("read_message: $s");
458 459 460 461
    return $self->json_formatter->decode($s);
}


462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
=head2 wait_for_OK

  Example     : $process->wait_for_OK();
  Description : Wait for the child process to send the OK signal
  Returntype  : none
  Exceptions  : dies if the response is not OK, or anything raised by L<read_message()>

=cut

sub wait_for_OK {
    my $self = shift;
    my $s = $self->read_message();
    die "Response message does not look like a response" if not exists $s->{'response'};
    die "Received response is not OK" if ref($s->{'response'}) or $s->{'response'} ne 'OK';
}


479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
###########################
# Hive::Process interface #
###########################


=head2 param_defaults

  Example     : my $param_defaults = $runnable->param_defaults();
  Example     : $runnable->param_defaults($param_defaults);
  Description : Getter/Setter for the default parameters of this runnable.
                Hive only uses it as a getter, but here, we need a setter to
                define the parameters at the Perl layer once they've been
                retrieved from the child process.
  Returntype  : Hashref
  Exceptions  : none

=cut

sub param_defaults {
    my $self = shift;
    $self->{'_param_defaults'} = shift if @_;
    return $self->{'_param_defaults'};
}

Matthieu Muffato's avatar
Matthieu Muffato committed
503

504 505 506 507 508
=head2 life_cycle

  Example     : my $partial_timings = $runnable->life_cycle();
  Description : Runs the life-cycle of the input job and returns the timings
                of each Runnable method (fetch_input, run, etc).
Matthieu Muffato's avatar
Matthieu Muffato committed
509
                See the description of this module for details about the protocol
510 511 512 513 514 515 516 517
  Returntype  : Hashref
  Exceptions  : none

=cut

sub life_cycle {
    my $self = shift;

518
    $self->print_debug("LIFE_CYCLE");
519 520 521 522 523 524 525 526 527

    my $job = $self->input_job();
    my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my %job_partial_timing = ();

    my %struct = (
        input_job => {
            parameters => $job->{_unsubstituted_param_hash},
            input_id => $job->input_id,
528 529
            dbID => $job->dbID + 0,
            retry_count => $job->retry_count + 0,
530 531 532 533
        },
        execute_writes => $self->execute_writes || 0,
        debug => $self->debug || 0,
    );
534
    $self->print_debug("SEND JOB PARAM");
535
    $self->send_message(\%struct);
536
    $self->wait_for_OK();
537

Matthieu Muffato's avatar
Matthieu Muffato committed
538
    # A simple event loop
539
    while (1) {
540
        $self->print_debug("WAITING IN LOOP");
541 542 543 544

        my $msg = $self->read_message;
        my $event = $msg->{event};
        my $content = $msg->{content};
545
        $self->print_debug("processing event '$event'");
546 547

        if ($event eq 'JOB_STATUS_UPDATE') {
548
            $job_partial_timing{$job->status} = $partial_stopwatch->get_elapsed() if ($job->status ne 'READY') and ($job->status ne 'CLAIMED');
549 550 551 552 553
            $self->enter_status(uc $content);
            $partial_stopwatch->restart();
            $self->send_response('OK');

        } elsif ($event eq 'WARNING') {
554
            $self->warning($content->{message}, $content->{is_error}?'WORKER_ERROR':'INFO');
555 556 557
            $self->send_response('OK');

        } elsif ($event eq 'DATAFLOW') {
558 559
            $job->{_param_hash} = $content->{params}->{substituted};
            $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
560
            my $d = $self->dataflow_output_id($content->{output_ids}, $content->{branch_name_or_code});
561
            $self->send_response($d);
562 563 564 565 566 567

        } elsif ($event eq 'WORKER_TEMP_DIRECTORY') {
            my $wtd = $self->worker_temp_directory;
            $self->send_response($wtd);

        } elsif ($event eq 'JOB_END') {
568 569 570 571
            # Especially here we need to be careful about boolean values
            # They are coded as JSON::true and JSON::false which have
            # different meanings in text / number contexts
            $job->autoflow($job->autoflow and $content->{job}->{autoflow});
572 573
            $job->lethal_for_worker($content->{job}->{lethal_for_worker}?1:0);
            $job->transient_error($content->{job}->{transient_error}?1:0);
574 575
            $job->{_param_hash} = $content->{params}->{substituted};
            $job->{_unsubstituted_param_hash} = $content->{params}->{unsubstituted};
576

Matthieu Muffato's avatar
Matthieu Muffato committed
577
            # This piece of code is duplicated from Process
578 579 580 581 582 583 584 585 586 587 588
            if ($content->{complete}) {
                if( $self->execute_writes and $job->autoflow ) {    # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
                    $self->say_with_header( ': AUTOFLOW input->output' );
                    $job->dataflow_output_id();
                }

                my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
                if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
                    $job->transient_error(0);
                    die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
                }
589 590
            } else {
                $job->died_somewhere(1);
591
            }
592
            $self->send_response('OK');
593 594 595 596 597 598 599
            return \%job_partial_timing;
        } else {
            die "Unknown event '$event' coming from the child";
        }
    }
}

Matthieu Muffato's avatar
Matthieu Muffato committed
600 601 602

### Summary of Process methods ###

603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
## Have to be redefined
# life_cycle
# param_defaults

## Needed, can be reused from the base class
# worker_temp_directory
# input_job
# execute_writes
# debug
# dataflow_output_id
# enter_status -> worker / say_with_header
# warning
# cleanup_worker_temp_directory

## Invalid in this context
# strict_hash_format
# fetch_input
# run
# write_output
# db
# dbc
# data_dbc
# input_id
# complete_early
# throw


1;