lsf_beekeeper.pl 7.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
#!/usr/local/ensembl/bin/perl -w

use strict;
use DBI;
use Getopt::Long;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::URLFactory;
use Sys::Hostname;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;

# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = bless {};

$self->{'db_conf'} = {};
$self->{'db_conf'}->{'-user'} = 'ensro';
$self->{'db_conf'}->{'-port'} = 3306;

my $conf_file;
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
my ($limit, $batch_size);
26
my $loopit=0;
27 28 29 30 31 32 33 34 35

GetOptions('help'           => \$help,
           'url=s'          => \$url,
           'conf=s'         => \$conf_file,
           'dbhost=s'       => \$host,
           'dbport=i'       => \$port,
           'dbuser=s'       => \$user,
           'dbpass=s'       => \$pass,
           'dbname=s'       => \$dbname,
Jessica Severin's avatar
Jessica Severin committed
36
           'dead'           => \$self->{'check_for_dead'},
37
           'alldead'        => \$self->{'all_dead'},
Jessica Severin's avatar
Jessica Severin committed
38
           'run'            => \$self->{'run'},
39
           'limit=i'        => \$limit,
40 41
           'batch_size=i'   => \$batch_size,
           'loop'           => \$loopit
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
          );

if ($help) { usage(); }

parse_conf($self, $conf_file);

my $DBA;

if($url) {
  $DBA = Bio::EnsEMBL::Hive::URLFactory->fetch($url);
  die("Unable to connect to $url\n") unless($DBA);
} else {
  if($host)   { $self->{'db_conf'}->{'-host'}   = $host; }
  if($port)   { $self->{'db_conf'}->{'-port'}   = $port; }
  if($dbname) { $self->{'db_conf'}->{'-dbname'} = $dbname; }
  if($user)   { $self->{'db_conf'}->{'-user'}   = $user; }
  if($pass)   { $self->{'db_conf'}->{'-pass'}   = $pass; }


  unless(defined($self->{'db_conf'}->{'-host'})
         and defined($self->{'db_conf'}->{'-user'})
         and defined($self->{'db_conf'}->{'-dbname'}))
  {
    print "\nERROR : must specify host, user, and database to connect\n\n";
    usage();
  }

  # connect to database specified
  $DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
}

my $queen = $DBA->get_Queen;

75
if($self->{'all_dead'}) { register_all_workers_dead($self, $queen); }
Jessica Severin's avatar
Jessica Severin committed
76

77 78 79 80
if($loopit) {
  run_autonomously($self, $queen);
} else {
  if($self->{'check_for_dead'}) { check_for_dead_workers($self, $queen); }
81

82 83 84
  $queen->update_analysis_stats();
  $queen->check_blocking_control_rules;
  $queen->print_hive_status;
85

86
  $queen->get_num_needed_workers();
87

88 89
  run_next_worker_clutch($self, $queen);
}
90 91 92 93 94 95 96 97 98 99 100 101

Bio::EnsEMBL::Hive::URLFactory->cleanup;
exit(0);


#######################
#
# subroutines
#
#######################

sub usage {
Jessica Severin's avatar
Jessica Severin committed
102
  print "lsf_beekeeper.pl [options]\n";
103 104 105 106 107 108 109 110 111 112 113 114
  print "  -help                  : print this help\n";
  print "  -url <url string>      : url defining where hive database is located\n";
  print "  -conf <path>           : config file describing db connection\n";
  print "  -dbhost <machine>      : mysql database host <machine>\n";
  print "  -dbport <port#>        : mysql port number\n";
  print "  -dbname <name>         : mysql database <name>\n";
  print "  -dbuser <name>         : mysql connection user <name>\n";
  print "  -dbpass <pass>         : mysql connection password\n";
  print "  -batch_size <num>      : #jobs a worker can claim at once\n";
  print "  -limit <num>           : #jobs to run before worker can die naturally\n";
  print "  -run                   : show and run the needed jobs\n";
  print "  -dead                  : clean overdue jobs for resubmission\n";
115 116
  print "  -alldead               : all outstanding workers\n";
  print "  -loop                  : run autonomously\n";
Jessica Severin's avatar
Jessica Severin committed
117
  print "lsf_beekeeper.pl v1.0\n";
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
  
  exit(1);  
}


sub parse_conf {
  my $self      = shift;
  my $conf_file = shift;

  if($conf_file and (-e $conf_file)) {
    #read configuration file from disk
    my @conf_list = @{do $conf_file};

    foreach my $confPtr (@conf_list) {
      #print("HANDLE type " . $confPtr->{TYPE} . "\n");
      if(($confPtr->{TYPE} eq 'COMPARA') or ($confPtr->{TYPE} eq 'DATABASE')) {
        $self->{'db_conf'} = $confPtr;
      }
    }
  }
}


sub run_next_worker_clutch
{
  my $self = shift;
  my $queen = shift;  

  my $clutches = $queen->db->get_AnalysisStatsAdaptor->fetch_by_needed_workers();

  print("\n");
  foreach my $analysis_stats (@{$clutches}) {

    my $analysis_id = $analysis_stats->analysis_id;
    my $count = $analysis_stats->num_required_workers;
    my $analysis = $analysis_stats->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($analysis_id);
    my $hive_capacity = $analysis_stats->hive_capacity;

    my $cmd;
157
    my $worker_cmd = "./runWorker.pl -bk LSF -logic_name " . $analysis->logic_name;
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179

    $worker_cmd .= " -conf $conf_file" if($conf_file);
    $worker_cmd .= " -url $url" if($url);
    if (defined $limit) {
      $worker_cmd .= " -limit $limit";
    } elsif ($hive_capacity < 0) {
      $worker_cmd .= " -limit " . $analysis_stats->batch_size;
    }
    $worker_cmd .= " -batch_size $batch_size" if (defined $batch_size);

    if($count>1) { $cmd = "bsub -JW$analysis_id\[1-$count\] $worker_cmd";}
    else { $cmd = "bsub -JW$analysis_id $worker_cmd";}
    print("$cmd\n");
    system($cmd) if($self->{'run'});
  }
}


sub check_for_dead_workers {
  my $self = shift;
  my $queen = shift;

180
  my $overdueWorkers = $queen->fetch_overdue_workers(15*60);  #overdue by 15 minutes
181 182
  print(scalar(@{$overdueWorkers}), " overdue workers\n");
  foreach my $worker (@{$overdueWorkers}) {
183 184 185 186
    if($worker->beekeeper eq 'LSF') {
      printf("%10d %20s    analysis_id=%d : ", $worker->hive_id,$worker->host, $worker->analysis->dbID);
      my $cmd = "ssh -x ". $worker->host . " ps -p ". $worker->process_id . "|grep -v PID";
      #print("  check worker with : $cmd\n");
187 188
      my $check = qx/$cmd/;

189 190 191 192 193
      unless($check) {
        print("worker is missing => it DIED!!\n");
        $queen->register_worker_death($worker);
      }
      else {
Jessica Severin's avatar
Jessica Severin committed
194
        print("ALIVE and running\n");
195 196
      }
    }
197 198 199 200
  }
}


201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243
sub register_all_workers_dead {
  my $self = shift;
  my $queen = shift;

  my $overdueWorkers = $queen->fetch_overdue_workers(0);
  foreach my $worker (@{$overdueWorkers}) {
    $queen->register_worker_death($worker);
  }
}


sub run_autonomously {
  my $self = shift;
  my $queen = shift;

  my ($cmd, $worker_cmd);
  
  while($loopit) {
    check_for_dead_workers($self, $queen);

    $queen->update_analysis_stats();
    $queen->check_blocking_control_rules;

    my $load  = $queen->get_hive_current_load();
    my $count = $queen->get_num_needed_workers();

    return if($load==0 and $count=0); #nothing running and nothing todo => done
    
    if($count) {
      $worker_cmd = "./runWorker.pl -bk LSF -url $url";
      $worker_cmd .= " -limit $limit" if(defined $limit);
      $worker_cmd .= " -batch_size $batch_size" if(defined $batch_size);

      if($count>1) { $cmd = "bsub -JW\[1-$count\] $worker_cmd";}
      else { $cmd = "bsub -JW $worker_cmd";}
      print("$cmd\n");
      system($cmd);
    }
    sleep(15*60);  #sleep 15 minutes before repeating    
  }
}