local_beekeeper.pl 5.81 KB
Newer Older
1
2
3
4
5
#!/usr/local/ensembl/bin/perl -w

use strict;
use DBI;
use Getopt::Long;
6
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
7
8
9
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::URLFactory;
10
use Sys::Hostname;
11
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = bless {};

$self->{'db_conf'} = {};
$self->{'db_conf'}->{'-user'} = 'ensro';
$self->{'db_conf'}->{'-port'} = 3306;

$self->{'analysis_id'} = undef;
$self->{'outdir'}      = "/ecs4/work2/ensembl/jessica/data/hive-output";

my $conf_file;
27
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
28
my ($limit, $batch_size);
29
30

GetOptions('help'           => \$help,
31
           'url=s'          => \$url,
32
33
34
35
36
37
           'conf=s'         => \$conf_file,
           'dbhost=s'       => \$host,
           'dbport=i'       => \$port,
           'dbuser=s'       => \$user,
           'dbpass=s'       => \$pass,
           'dbname=s'       => \$dbname,
38
           'dead'           => \$self->{'all_dead'},
39
	   'run'            => \$self->{'run'},
40
41
           'limit=i'        => \$limit,
           'batch_size=i'   => \$batch_size
42
43
44
45
46
47
48
49
          );

$self->{'analysis_id'} = shift if(@_);

if ($help) { usage(); }

parse_conf($self, $conf_file);

50
my $DBA;
51

52
53
if($url) {
  $DBA = Bio::EnsEMBL::Hive::URLFactory->fetch($url);
54
  die("Unable to connect to $url\n") unless($DBA);
55
56
57
58
59
60
} else {
  if($host)   { $self->{'db_conf'}->{'-host'}   = $host; }
  if($port)   { $self->{'db_conf'}->{'-port'}   = $port; }
  if($dbname) { $self->{'db_conf'}->{'-dbname'} = $dbname; }
  if($user)   { $self->{'db_conf'}->{'-user'}   = $user; }
  if($pass)   { $self->{'db_conf'}->{'-pass'}   = $pass; }
61
62


63
64
65
66
67
68
69
  unless(defined($self->{'db_conf'}->{'-host'})
         and defined($self->{'db_conf'}->{'-user'})
         and defined($self->{'db_conf'}->{'-dbname'}))
  {
    print "\nERROR : must specify host, user, and database to connect\n\n";
    usage();
  }
70

71
  # connect to database specified
72
  $DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
73
}
74

75
76
my $queen = $DBA->get_Queen;

Jessica Severin's avatar
Jessica Severin committed
77
if($self->{'all_dead'}) { check_for_dead_workers($self, $queen); }
78

79
80
$queen->update_analysis_stats();
$queen->check_blocking_control_rules;
81
$queen->print_hive_status;
82
83

run_next_worker_clutch($self, $queen);
84

85

86

87
Bio::EnsEMBL::Hive::URLFactory->cleanup;
88
89
90
91
92
93
94
95
96
97
exit(0);


#######################
#
# subroutines
#
#######################

sub usage {
98
  print "local_beekeeper.pl [options]\n";
99
  print "  -help                  : print this help\n";
100
  print "  -url <url string>      : url defining where hive database is located\n";
101
102
103
104
105
106
  print "  -conf <path>           : config file describing db connection\n";
  print "  -dbhost <machine>      : mysql database host <machine>\n";
  print "  -dbport <port#>        : mysql port number\n";
  print "  -dbname <name>         : mysql database <name>\n";
  print "  -dbuser <name>         : mysql connection user <name>\n";
  print "  -dbpass <pass>         : mysql connection password\n";
107
  print "  -batch_size <num>      : #jobs a worker can claim at once\n";
108
  print "  -limit <num>           : #jobs to run before worker can die naturally\n";
109
110
  print "  -run                   : show and run the needed jobs\n";
  print "  -dead                  : clean overdue jobs for resubmission\n";
111
  print "local_beekeeper.pl v1.0\n";
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
  
  exit(1);  
}


sub parse_conf {
  my $self      = shift;
  my $conf_file = shift;

  if($conf_file and (-e $conf_file)) {
    #read configuration file from disk
    my @conf_list = @{do $conf_file};

    foreach my $confPtr (@conf_list) {
      #print("HANDLE type " . $confPtr->{TYPE} . "\n");
      if(($confPtr->{TYPE} eq 'COMPARA') or ($confPtr->{TYPE} eq 'DATABASE')) {
        $self->{'db_conf'} = $confPtr;
      }
    }
  }
}

134

135
sub run_next_worker_clutch
136
137
{
  my $self = shift;
138
  my $queen = shift;  
139

140
141
  my $clutches = $queen->db->get_AnalysisStatsAdaptor->fetch_by_needed_workers();

142
  print("\n");
143
144
145
146
147
148
  foreach my $analysis_stats (@{$clutches}) {
    ##my($analysis_id, $count) = $queen->next_clutch();
    #if($count>0) {

    my $analysis_id = $analysis_stats->analysis_id;
    my $count = $analysis_stats->num_required_workers;
149
    my $analysis = $analysis_stats->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($analysis_id);
150
    my $hive_capacity = $analysis_stats->hive_capacity;
151

152
153
154
155
156
    my $cmd;
    my $worker_cmd = "./runWorker.pl -logic_name " . $analysis->logic_name;

    $worker_cmd .= " -conf $conf_file" if($conf_file);
    $worker_cmd .= " -url $url" if($url);
157
158
159
160
161
162
    if (defined $limit) {
      $worker_cmd .= " -limit $limit";
    } elsif ($hive_capacity < 0) {
      $worker_cmd .= " -limit " . $analysis_stats->batch_size;
    }
    $worker_cmd .= " -batch_size $batch_size" if (defined $batch_size);
163
164
165

    if($count>1) { $cmd = "bsub -JW$analysis_id\[1-$count\] $worker_cmd";}
    else { $cmd = "bsub -JW$analysis_id $worker_cmd";}
166
    print("$cmd\n");
167
    system($cmd) if($self->{'run'});
168
169
170
171

    # return of bsub looks like this
    #Job <6392054> is submitted to default queue <normal>.

172
173
174
175
  }
}


176
177
178
179
180
181
sub check_for_dead_workers {
  my $self = shift;
  my $queen = shift;

  my $host = hostname;

182
  my $overdueWorkers = $queen->fetch_overdue_workers(0);  #0 minutes means check all outstanding workers
183
184
  print(scalar(@{$overdueWorkers}), " overdue workers\n");
  foreach my $worker (@{$overdueWorkers}) {
185
    printf("%10d %20s    analysis_id=%d\n", $worker->hive_id,$worker->host, $worker->analysis->dbID);
186
    #if(($worker->beekeeper eq '') and ($worker->host eq $host)) {
Jessica Severin's avatar
Jessica Severin committed
187
      #print("  is one of mine\n");
188
189
190
      my $cmd = "ps -p ". $worker->process_id;
      my $check = qx/$cmd/;

191
      $queen->register_worker_death($worker);
192
    #}
193
194
195
  }
}

196