From dc33e57c4900be57f624081226c60a67ff3d6418 Mon Sep 17 00:00:00 2001
From: Jessica Severin <jessica@ebi.ac.uk>
Date: Thu, 27 May 2004 17:58:06 +0000
Subject: [PATCH] Created new AnalysisStats class and adaptor to replace
 extension to the analysis table/object.  New object class and tables includes
 a good guess of what kind of stats I'll need for the Queen to monitor a hive
 Also cleaned up some bugs caused by the extraction from compara.

---
 modules/Bio/EnsEMBL/Hive/AnalysisStats.pm     | 168 ++++++++++
 .../EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm  |  41 ++-
 .../Hive/DBSQL/AnalysisStatsAdaptor.pm        | 300 ++++++++++++++++++
 modules/Bio/EnsEMBL/Hive/Extensions.pm        |  46 ---
 modules/Bio/EnsEMBL/Hive/Queen.pm             |  34 +-
 modules/Bio/EnsEMBL/Hive/Worker.pm            |  13 +
 scripts/runWorker.pl                          |  12 +-
 sql/tables.sql                                |  14 +-
 8 files changed, 556 insertions(+), 72 deletions(-)
 create mode 100755 modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
 create mode 100644 modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm

diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
new file mode 100755
index 000000000..2bca0153b
--- /dev/null
+++ b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
@@ -0,0 +1,168 @@
+#
+# You may distribute this module under the same terms as perl itself
+#
+# POD documentation - main docs before the code
+
+=pod 
+
+=head1 NAME
+
+Bio::EnsEMBL::Hive::AnalysisStats
+
+=cut
+
+=head1 SYNOPSIS
+
+Object which encapsulates the overall statistics on an analysis and
+all the jobs associated with it in the hive.  Used as a cache of the
+stats at a given moment in time (last_update_time).  The Queen is
+responsible for monitoring the Hive and updating most stats.  Certain
+status states(ALL_CLAIMED) and batch_size are updated by the workers.
+
+Hive based processing is a concept based on a more controlled version
+of an autonomous agent type system.  Each worker is not told what to do
+(like a centralized control system - like the current pipeline system)
+but rather queries a central database for jobs (give me jobs).
+
+Each worker is linked to an analysis_id, registers its self on creation
+into the Hive, creates a RunnableDB instance of the Analysis->module,
+gets $runnable->batch_size() jobs from the analysis_job table, does its
+work, creates the next layer of analysis_job entries by querying simple_rule
+table where condition_analysis_id = $self->analysis_id.  It repeats
+this cycle until it's lived it's lifetime or until there are no more jobs left.
+The lifetime limit is just a safety limit to prevent these from 'infecting'
+a system.
+
+The Queens job is to simply birth Workers of the correct analysis_id to get the
+work down.  The only other thing the Queen does is free up jobs that were
+claimed by Workers that died unexpectantly so that other workers can take
+over the work.
+
+=cut
+
+=head1 DESCRIPTION
+
+=cut
+
+=head1 CONTACT
+
+Jessica Severin, jessica@ebi.ac.uk
+
+=cut
+
+=head1 APPENDIX
+
+The rest of the documentation details each of the object methods. 
+Internal methods are usually preceded with a _
+
+=cut
+
+package Bio::EnsEMBL::Hive::AnalysisStats;
+
+use strict;
+
+use Bio::EnsEMBL::Root;
+use Bio::EnsEMBL::Analysis;
+use Bio::EnsEMBL::DBSQL::DBAdaptor;
+use Bio::EnsEMBL::Hive::Worker;
+
+use vars qw(@ISA);
+
+@ISA = qw(Bio::EnsEMBL::Root);
+
+=head3
+CREATE TABLE analysis_stats (
+  analysis_id           int(10) NOT NULL,
+  status                enum('BLOCKED', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE')
+                          DEFAULT 'READY' NOT NULL,
+  batch_size            int(10) NOT NULL,
+  hive_capacity         int(10) NOT NULL,
+  total_job_count       int(10) NOT NULL,
+  unclaimed_job_count   int(10) NOT NULL,
+  done_job_count        int(10) NOT NULL,
+  num_required_workers  int(10) NOT NULL,
+  last_update           datetime NOT NULL,
+
+  UNIQUE KEY   (analysis_id)
+);
+=cut
+
+
+sub adaptor {
+  my $self = shift;
+  $self->{'_adaptor'} = shift if(@_);
+  return $self->{'_adaptor'};
+}
+
+sub dbID {
+  my $self = shift;
+  $self->{'_dbID'} = shift if(@_);
+  return $self->{'_dbID'};
+}
+
+sub status {
+  my ($self, $value ) = @_;
+
+  if(defined $value) {
+    $self->{'_status'} = $value;
+    $self->adaptor->update($self) if($self->adaptor);
+  }
+  return $self->{'_status'};
+}
+
+sub batch_size {
+  my $self = shift;
+  $self->{'_batch_size'} = shift if(@_);
+  return $self->{'_batch_size'};
+}
+
+sub hive_capacity {
+  my $self = shift;
+  $self->{'_hive_capacity'} = shift if(@_);
+  return $self->{'_hive_capacity'};
+}
+
+sub total_job_count {
+  my $self = shift;
+  $self->{'_total_job_count'} = shift if(@_);
+  return $self->{'_total_job_count'};
+}
+
+sub unclaimed_job_count {
+  my $self = shift;
+  $self->{'_unclaimed_job_count'} = shift if(@_);
+  return $self->{'_unclaimed_job_count'};
+}
+
+sub done_job_count {
+  my $self = shift;
+  $self->{'_done_job_count'} = shift if(@_);
+  return $self->{'_done_job_count'};
+}
+
+sub num_required_workers {
+  my $self = shift;
+  $self->{'_num_required_workers'} = shift if(@_);
+  return $self->{'_num_required_workers'};
+}
+
+sub seconds_since_last_update {
+  my( $self, $value ) = @_;
+  $self->{'_last_update'} = time() + $value if(defined($value));
+  return time() - $self->{'_last_update'};
+}
+
+sub print_stats {
+  my $self = shift;
+  print("ANALYSIS_STATS: analysis_id=",$self->dbID,"\n"
+       ," status=",$self->status,"\n"
+       ," batch_size=",$self->batch_size,"\n"
+       ," hive_capacity=" . $self->hive_capacity(),"\n"
+       ,",total_job_count=" . $self->total_job_count(),"\n"
+       ,",unclaimed_job_count=" . $self->unclaimed_job_count(),"\n"
+       ,",done_job_count=" . $self->done_job_count(),"\n"
+       ,",num_required_workers=" . $self->num_required_workers(),"\n"
+       ,",last_update==", $self->{'_last_update'},"\n");
+}
+
+1;
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
index fadddde9b..30298652e 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
@@ -1,4 +1,43 @@
-package Bio::EnsEMBL::Hive::AnalysisJobAdaptor;
+# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
+#
+# Date of creation: 22.03.2004
+# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
+#
+# Copyright EMBL-EBI 2004
+#
+# You may distribute this module under the same terms as perl itself
+
+# POD documentation - main docs before the code
+
+=head1 NAME
+
+Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
+
+=head1 SYNOPSIS
+
+  $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
+  $analysisJobAdaptor = $analysisJob->adaptor;
+
+=head1 DESCRIPTION
+
+  Module to encapsulate all db access for persistent class AnalysisJob.
+  There should be just one per application and database connection.
+
+=head1 CONTACT
+
+    Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
+    Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
+
+=head1 APPENDIX
+
+The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _
+
+=cut
+
+
+# Let the code begin...
+
+package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
 
 use strict;
 use Bio::EnsEMBL::Hive::Worker;
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm
new file mode 100644
index 000000000..63c073e03
--- /dev/null
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm
@@ -0,0 +1,300 @@
+# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
+#
+# Date of creation: 22.03.2004
+# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
+#
+# Copyright EMBL-EBI 2004
+#
+# You may distribute this module under the same terms as perl itself
+
+# POD documentation - main docs before the code
+
+=head1 NAME
+
+Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
+
+=head1 SYNOPSIS
+
+  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
+  $analysisStatsAdaptor = $analysisStats->adaptor;
+
+=head1 DESCRIPTION
+
+  Module to encapsulate all db access for persistent class AnalysisStats.
+  There should be just one per application and database connection.
+
+=head1 CONTACT
+
+    Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
+    Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
+
+=head1 APPENDIX
+
+The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _
+
+=cut
+
+
+# Let the code begin...
+
+package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
+
+use strict;
+use Bio::EnsEMBL::Hive::AnalysisStats;
+use Bio::EnsEMBL::DBSQL::BaseAdaptor;
+
+our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
+
+
+=head2 fetch_by_dbID
+
+  Arg [1]    : int $id
+               the unique database identifier for the feature to be obtained
+  Example    : $feat = $adaptor->fetch_by_dbID(1234);
+  Description: Returns the feature created from the database defined by the
+               the id $id.
+  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
+  Exceptions : thrown if $id is not defined
+  Caller     : general
+
+=cut
+
+sub fetch_by_dbID {
+  my ($self,$id) = @_;
+
+  unless(defined $id) {
+    $self->throw("fetch_by_dbID must have an id");
+  }
+
+  my $constraint = "ast.analysis_id = $id";
+
+  #return first element of _generic_fetch list
+  my ($obj) = @{$self->_generic_fetch($constraint)};
+  unless(defined($obj)) {
+    $self->_create_new_for_analysis_id($id);
+    ($obj) = @{$self->_generic_fetch($constraint)};  
+  }
+  return $obj;
+}
+
+=head2 fetch_all
+
+  Arg        : None
+  Example    : 
+  Description: 
+  Returntype : 
+  Exceptions : 
+  Caller     : 
+
+=cut
+
+sub fetch_all {
+  my $self = shift;
+
+  return $self->_generic_fetch();
+}
+
+
+
+#
+# INTERNAL METHODS
+#
+###################
+
+=head2 _generic_fetch
+
+  Arg [1]    : (optional) string $constraint
+               An SQL query constraint (i.e. part of the WHERE clause)
+  Arg [2]    : (optional) string $logic_name
+               the logic_name of the analysis of the features to obtain
+  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
+  Description: Performs a database fetch and returns feature objects in
+               contig coordinates.
+  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
+  Exceptions : none
+  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch
+
+=cut
+  
+sub _generic_fetch {
+  my ($self, $constraint, $join) = @_;
+  
+  my @tables = $self->_tables;
+  my $columns = join(', ', $self->_columns());
+  
+  if ($join) {
+    foreach my $single_join (@{$join}) {
+      my ($tablename, $condition, $extra_columns) = @{$single_join};
+      if ($tablename && $condition) {
+        push @tables, $tablename;
+        
+        if($constraint) {
+          $constraint .= " AND $condition";
+        } else {
+          $constraint = " $condition";
+        }
+      } 
+      if ($extra_columns) {
+        $columns .= ", " . join(', ', @{$extra_columns});
+      }
+    }
+  }
+      
+  #construct a nice table string like 'table1 t1, table2 t2'
+  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));
+
+  my $sql = "SELECT $columns FROM $tablenames";
+
+  my $default_where = $self->_default_where_clause;
+  my $final_clause = $self->_final_clause;
+
+  #append a where clause if it was defined
+  if($constraint) { 
+    $sql .= " WHERE $constraint ";
+    if($default_where) {
+      $sql .= " AND $default_where ";
+    }
+  } elsif($default_where) {
+    $sql .= " WHERE $default_where ";
+  }
+
+  #append additional clauses which may have been defined
+  $sql .= " $final_clause";
+  #rint STDOUT $sql,"\n";
+
+  my $sth = $self->prepare($sql);
+  $sth->execute;  
+
+
+  return $self->_objs_from_sth($sth);
+}
+
+sub _tables {
+  my $self = shift;
+
+  return (['analysis_stats', 'ast']);
+}
+
+sub _columns {
+  my $self = shift;
+=head3
+CREATE TABLE analysis_stats (
+  analysis_id           int(10) NOT NULL,
+  status                enum('BLOCKED', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE')
+                          DEFAULT 'READY' NOT NULL,
+  batch_size            int(10) NOT NULL,
+  hive_capacity         int(10) NOT NULL,
+  total_job_count       int(10) NOT NULL,
+  unclaimed_job_count   int(10) NOT NULL,
+  done_job_count        int(10) NOT NULL,
+  num_required_workers  int(10) NOT NULL,
+  last_update           datetime NOT NULL,
+
+  UNIQUE KEY   (analysis_id)
+);
+=cut
+  my @columns = qw (ast.analysis_id
+                    ast.status
+                    ast.batch_size
+                    ast.hive_capacity
+                    ast.total_job_count
+                    ast.unclaimed_job_count
+                    ast.done_job_count
+                    ast.num_required_workers
+                    ast.last_update
+                   );
+  push @columns , "NOW()-ast.last_update seconds_since_last_update";
+  return @columns;            
+}
+
+sub _objs_from_sth {
+  my ($self, $sth) = @_;
+  
+  my %column;
+  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));
+
+  my @statsArray = ();
+
+  while ($sth->fetch()) {
+    my $analStats = new Bio::EnsEMBL::Hive::AnalysisStats;
+
+    $analStats->dbID($column{'analysis_id'});
+    $analStats->status($column{'status'});
+    $analStats->batch_size($column{'batch_size'});
+    $analStats->hive_capacity($column{'hive_capacity'});
+    $analStats->total_job_count($column{'total_job_count'});
+    $analStats->unclaimed_job_count($column{'unclaimed_job_count'});
+    $analStats->done_job_count($column{'done_job_count'});
+    $analStats->num_required_workers($column{'num_required_workers'});
+    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
+    $analStats->adaptor($self);
+
+    push @statsArray, $analStats;    
+  }
+  $sth->finish;
+  
+  return \@statsArray
+}
+
+sub _default_where_clause {
+  my $self = shift;
+  return '';
+}
+
+sub _final_clause {
+  my $self = shift;
+  return '';
+}
+
+
+#
+# STORE / UPDATE METHODS
+#
+################
+
+=head2 update
+
+  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
+  Example    :
+  Description:
+  Returntype : Bio::EnsEMBL::Hive::Worker
+  Exceptions :
+  Caller     :
+
+=cut
+
+sub update {
+  my ($self, $stats) = @_;
+
+  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
+  $sql .= ",batch_size=" . $stats->batch_size();
+  $sql .= ",hive_capacity=" . $stats->hive_capacity();
+  $sql .= ",total_job_count=" . $stats->total_job_count();
+  $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
+  $sql .= ",done_job_count=" . $stats->done_job_count();
+  $sql .= ",num_required_workers=" . $stats->num_required_workers(); 
+  $sql .= ",last_update=NOW()";
+  $sql .= " WHERE analysis_id='".$stats->dbID."' ";
+  
+  my $sth = $self->prepare($sql);
+  $sth->execute();
+  $sth->finish;
+  $stats->seconds_since_last_update(0); #not exact but good enough :)
+}
+
+
+sub _create_new_for_analysis_id {
+  my ($self, $analysis_id) = @_;
+
+  my $sql;
+
+  $sql = "INSERT ignore INTO analysis_stats SET analysis_id='$analysis_id' ";
+  print("$sql\n");
+  my $sth = $self->prepare($sql);
+  $sth->execute();
+  $sth->finish;
+}
+
+1;
+
+
diff --git a/modules/Bio/EnsEMBL/Hive/Extensions.pm b/modules/Bio/EnsEMBL/Hive/Extensions.pm
index 177df759f..a592d7055 100755
--- a/modules/Bio/EnsEMBL/Hive/Extensions.pm
+++ b/modules/Bio/EnsEMBL/Hive/Extensions.pm
@@ -65,52 +65,6 @@ sub Bio::EnsEMBL::Analysis::runnableDB
   return $runobj
 }
 
-=head3
-Section dealing with getting/setting analysis_stats.status
-=cut
-
-sub Bio::EnsEMBL::Analysis::status
-{
-  my( $self, $value ) = @_;
-
-  return undef unless($self->adaptor);
-  
-  if($value) {
-    $self->adaptor->update_status($self->dbID, $value);
-    return $value;
-  }
-  return $self->adaptor->get_status($self->dbID);
-}
-
-
-sub Bio::EnsEMBL::DBSQL::AnalysisAdaptor::update_status
-{
-  my($self, $analysis_id, $value ) = @_;
-
-  return undef unless($analysis_id and $value);
-
-  my $sql = "UPDATE analysis_stats SET status='$value'".
-            " WHERE analysis_id=" . $analysis_id;
-  my $sth = $self->prepare($sql);
-  $sth->execute();
-  $sth->finish;            
-}
-
-
-sub Bio::EnsEMBL::DBSQL::AnalysisAdaptor::get_status
-{
-  my($self, $analysis_id) = @_;
-
-  return undef unless($analysis_id);
-
-  my $sql = "SELECT status FROM analysis_stats ".
-            " WHERE analysis_id=" . $analysis_id;
-  my $sth = $self->prepare($sql);
-  $sth->execute();
-  my ($status) = $sth->fetchrow;
-  $sth->finish;
-  return $status;
-}
 
 
 1;
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index dc6ba6cb4..e0cb293cc 100755
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -62,17 +62,20 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
 sub create_new_worker {
   my ($self,$analysis_id) = @_;
 
-  if($self->db->get_AnalysisAdaptor) {
-    my $status = $self->db->get_AnalysisAdaptor->get_status($analysis_id);
-    return undef unless($status);
-    if($status eq 'BLOCKED') {
-      print("Analysis is BLOCKED, can't create workers\n");
-      return undef;
-    }
-    if($status eq 'DONE') {
-      print("Analysis is DONE, don't need to create workers\n");
-      return undef;
-    }
+  my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
+  return undef unless($analStatsDBA);
+  
+  my $analysisStats = $analStatsDBA->fetch_by_dbID($analysis_id);
+  return undef unless($analysisStats);
+  $analysisStats->print_stats;
+  
+  if($analysisStats->status eq 'BLOCKED') {
+    print("Analysis is BLOCKED, can't create workers\n");
+    return undef;
+  }
+  if($analysisStats->status eq 'DONE') {
+    print("Analysis is DONE, don't need to create workers\n");
+    return undef;
   }
 
   my $host = hostname;
@@ -91,8 +94,8 @@ sub create_new_worker {
   my $worker = $self->_fetch_by_hive_id($hive_id);
   $worker=undef unless($worker and $worker->analysis);
 
-  if($worker) {
-    $worker->analysis->status('WORKING');
+  if($worker and $analysisStats) {
+    $analysisStats->status('WORKING');
   }
   return $worker;
 }
@@ -110,6 +113,10 @@ sub register_worker_death {
   my $sth = $self->prepare($sql);
   $sth->execute();
   $sth->finish;
+
+  if($worker->cause_of_death eq "NO_WORK") {
+    $worker->analysis_stats->status("ALL_CLAIMED");
+  }
 }
 
 
@@ -275,6 +282,7 @@ sub _objs_from_sth {
 
     if($column{'analysis_id'} and $self->db->get_AnalysisAdaptor) {
       $worker->analysis($self->db->get_AnalysisAdaptor->fetch_by_dbID($column{'analysis_id'}));
+      $worker->analysis_stats($self->db->get_AnalysisStatsAdaptor->fetch_by_dbID($column{'analysis_id'}));
     }
 
     push @workers, $worker;
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index cd00c0f0f..daa484f59 100755
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -112,6 +112,19 @@ sub analysis {
   return $self->{'_analysis'};
 }
 
+sub analysis_stats {
+  my $self = shift;
+  my $analysisStats = shift;
+
+  if(defined($analysisStats)) {
+    $self->throw("arg must be a [Bio::EnsEMBL::Hive::AnalysisStats] not a [$analysisStats]")
+       unless($analysisStats->isa('Bio::EnsEMBL::Hive::AnalysisStats'));
+    $self->{'_analysis_stats'} = $analysisStats;
+  }
+
+  return $self->{'_analysis_stats'};
+}
+
 
 =head2 life_span
   Arg [1] : (optional) integer $value (in seconds)
diff --git a/scripts/runWorker.pl b/scripts/runWorker.pl
index 2c3564c85..2b57911ae 100755
--- a/scripts/runWorker.pl
+++ b/scripts/runWorker.pl
@@ -3,8 +3,9 @@
 use strict;
 use DBI;
 use Getopt::Long;
-use Bio::EnsEMBL::DBSQL::DBAdaptor;
+use Bio::EnsEMBL::Compara::DBSQL::DBAdaptor;
 use Bio::EnsEMBL::Hive::Worker;
+use Bio::EnsEMBL::Hive::Queen;
 
 
 # ok this is a hack, but I'm going to pretend I've got an object here
@@ -35,7 +36,7 @@ GetOptions('help'           => \$help,
            'outdir=s'       => \$self->{'outdir'},
           );
 
-$self->{'analysis_id'} = shift;
+$self->{'analysis_id'} = shift if(@_);
 
 if ($help) { usage(); }
 
@@ -61,9 +62,10 @@ unless(defined($self->{'analysis_id'})) {
   usage();
 }
 
-my $DBA = new Bio::EnsEMBL::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
-my $queen = new Bio::EnsEMBL::Hive::Queen($DBA);
-$DBA->add_db_adaptor('Queen', $queen);
+# connect to database specified
+my $DBA = new Bio::EnsEMBL::Compara::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
+
+my $queen = $DBA->get_Queen();
 
 my $worker = $queen->create_new_worker($self->{'analysis_id'});
 die("couldn't create worker for analysis_id ".$self->{'analysis_id'}."\n") unless($worker);
diff --git a/sql/tables.sql b/sql/tables.sql
index f3a4f3ecd..dd5f8e63a 100644
--- a/sql/tables.sql
+++ b/sql/tables.sql
@@ -111,11 +111,11 @@ CREATE TABLE dataflow_rule (
 --   condition_analysis_url  - foreign key to net distributed analysis reference
 --   ctrled_analysis_id      - foreign key to analysis table analysis_id
 
-CREATE TABLE dataflow_rule (
+CREATE TABLE analysis_ctrl_rule (
   condition_analysis_url     varchar(255) default '' NOT NULL,
   ctrled_analysis_id         int(10) unsigned NOT NULL,
 
-  UNIQUE (from_analysis_id, to_analysis_url)
+  UNIQUE (condition_analysis_url, ctrled_analysis_id)
 );
 
 
@@ -156,7 +156,7 @@ CREATE TABLE analysis_job (
 
   PRIMARY KEY                  (analysis_job_id),
   UNIQUE KEY input_id_analysis (input_id, analysis_id),
-  INDEX job_claim_analysis     (job_claim, analysis_id)
+  INDEX job_claim_analysis     (job_claim, analysis_id),
   INDEX job_status_analysis    (status, analysis_id)
 );
 
@@ -205,11 +205,11 @@ CREATE TABLE analysis_stats (
                           DEFAULT 'READY' NOT NULL,
   batch_size            int(10) NOT NULL,
   hive_capacity         int(10) NOT NULL,
-  num_jobs              int(10) NOT NULL,
-  num_unclaimed         int(10) NOT NULL,
-  num_done              int(10) NOT NULL,
+  total_job_count       int(10) NOT NULL,
+  unclaimed_job_count   int(10) NOT NULL,
+  done_job_count        int(10) NOT NULL,
   num_required_workers  int(10) NOT NULL,
-  last_update           datetime NOT NULL,
+  last_update           timestamp NOT NULL,
   
   UNIQUE KEY   (analysis_id)
 );
-- 
GitLab