From dc123f420d075cfe565313fb1de51fdac040e046 Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Thu, 15 May 2014 11:40:15 +0100
Subject: [PATCH] schema_change: both job and job_file now point to role
 instead of worker

---
 modules/Bio/EnsEMBL/Hive/AnalysisJob.pm       |  6 +-
 .../EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm  | 83 +++++++++----------
 .../EnsEMBL/Hive/DBSQL/LogMessageAdaptor.pm   |  8 +-
 modules/Bio/EnsEMBL/Hive/DBSQL/RoleAdaptor.pm |  7 ++
 modules/Bio/EnsEMBL/Hive/Queen.pm             | 43 ++++------
 modules/Bio/EnsEMBL/Hive/Storable.pm          |  2 +-
 modules/Bio/EnsEMBL/Hive/Worker.pm            |  9 +-
 sql/foreign_keys.sql                          |  4 +-
 sql/patch_2014-05-16.mysql                    | 33 ++++++++
 sql/patch_2014-05-16.pgsql                    | 33 ++++++++
 sql/tables.mysql                              | 12 +--
 sql/tables.pgsql                              | 12 +--
 sql/tables.sqlite                             | 12 +--
 13 files changed, 165 insertions(+), 99 deletions(-)
 create mode 100644 sql/patch_2014-05-16.mysql
 create mode 100644 sql/patch_2014-05-16.pgsql

diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm
index d1c0e821e..21c10ecbd 100644
--- a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm
+++ b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm
@@ -80,10 +80,10 @@ sub accu_id_stack {
     return $self->{'_accu_id_stack'};
 }
 
-sub worker_id {
+sub role_id {
     my $self = shift;
-    $self->{'_worker_id'} = shift if(@_);
-    return $self->{'_worker_id'};
+    $self->{'_role_id'} = shift if(@_);
+    return $self->{'_role_id'};
 }
 
 sub status {
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
index 53c2f7950..7012c9103 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
@@ -166,10 +166,10 @@ sub fetch_some_by_analysis_id_limit {
 }
 
 
-sub fetch_all_incomplete_jobs_by_worker_id {
-    my ($self, $worker_id) = @_;
+sub fetch_all_incomplete_jobs_by_role_id {
+    my ($self, $role_id) = @_;
 
-    my $constraint = "status IN ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND worker_id='$worker_id'";
+    my $constraint = "status IN ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND role_id='$role_id'";
     return $self->fetch_all($constraint);
 }
 
@@ -298,12 +298,12 @@ sub store_out_files {
     my ($self, $job) = @_;
 
     if($job->stdout_file or $job->stderr_file) {
-        my $insert_sql = 'REPLACE INTO job_file (job_id, retry, worker_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
+        my $insert_sql = 'REPLACE INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
         my $sth = $self->dbc()->prepare($insert_sql);
-        $sth->execute($job->dbID(), $job->retry_count(), $job->worker_id(), $job->stdout_file(), $job->stderr_file());
+        $sth->execute($job->dbID(), $job->retry_count(), $job->role_id(), $job->stdout_file(), $job->stderr_file());
         $sth->finish();
     } else {
-        my $sql = 'DELETE from job_file WHERE worker_id='.$job->worker_id.' AND job_id='.$job->dbID;
+        my $sql = 'DELETE from job_file WHERE role_id='.$job->role_id.' AND job_id='.$job->dbID;
         $self->dbc->do($sql);
     }
 }
@@ -312,8 +312,8 @@ sub store_out_files {
 =head2 reset_or_grab_job_by_dbID
 
   Arg [1]    : int $job_id
-  Arg [2]    : int $worker_id (optional)
-  Description: resets a job to to 'READY' (if no $worker_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
+  Arg [2]    : int $role_id (optional)
+  Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
                NB: Will also reset a previously 'SEMAPHORED' job to READY.
                The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
                but will not change retry_count if a job has never *really* started.
@@ -322,21 +322,19 @@ sub store_out_files {
 =cut
 
 sub reset_or_grab_job_by_dbID {
-    my $self        = shift;
-    my $job_id      = shift;
-    my $worker_id   = shift;
+    my ($self, $job_id, $role_id) = @_;
 
-    my $new_status  = ($worker_id?'CLAIMED':'READY');
+    my $new_status  = $role_id ? 'CLAIMED' : 'READY';
 
         # Note: the order of the fields being updated is critical!
     my $sql = qq{
         UPDATE job
            SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
              , status=?
-             , worker_id=?
+             , role_id=?
          WHERE job_id=?
     };
-    my @values = ($new_status, $worker_id, $job_id);
+    my @values = ($new_status, $role_id, $job_id);
 
     my $sth = $self->prepare( $sql );
     my $return_code = $sth->execute( @values )
@@ -349,13 +347,14 @@ sub reset_or_grab_job_by_dbID {
 }
 
 
-=head2 grab_jobs_for_worker
+=head2 grab_jobs_for_role
 
-  Arg [1]           : Bio::EnsEMBL::Hive::Worker object $worker
+  Arg [1]           : Bio::EnsEMBL::Hive::Role object $role
+  Arg [2]           : int $how_many_this_role
   Example: 
-    my $jobs  = $job_adaptor->grab_jobs_for_worker( $worker );
+    my $jobs  = $job_adaptor->grab_jobs_for_role( $role, $how_many );
   Description: 
-    For the specified worker, it will search available jobs, 
+    For the specified Role, it will search available jobs, 
     and using the how_many_this_batch parameter, claim/fetch that
     number of jobs, and then return them.
   Returntype : 
@@ -364,13 +363,13 @@ sub reset_or_grab_job_by_dbID {
 
 =cut
 
-sub grab_jobs_for_worker {
-    my ($self, $worker, $how_many_this_batch, $workers_rank) = @_;
+sub grab_jobs_for_role {
+    my ($self, $role, $how_many_this_batch) = @_;
   
-    my $current_role    = $worker->current_role;
-    my $analysis_id     = $current_role->analysis_id();
-    my $worker_id       = $worker->dbID();
-    my $offset          = $how_many_this_batch*$workers_rank;
+    my $analysis_id     = $role->analysis_id;
+    my $role_id         = $role->dbID;
+    my $role_rank       = $self->db->get_RoleAdaptor->get_role_rank( $role );
+    my $offset          = $how_many_this_batch * $role_rank;
 
     my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
          UPDATE job j
@@ -381,7 +380,7 @@ sub grab_jobs_for_worker {
                                AND status='READY'
     } : qq{
          UPDATE job
-           SET worker_id='$worker_id', status='CLAIMED'
+           SET role_id='$role_id', status='CLAIMED'
          WHERE job_id in (
                             SELECT job_id
                               FROM job
@@ -394,7 +393,7 @@ sub grab_jobs_for_worker {
     my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
                  ) as x
          USING (job_id)
-           SET j.worker_id='$worker_id', j.status='CLAIMED'
+           SET j.role_id='$role_id', j.status='CLAIMED'
          WHERE j.status='READY'
     } : qq{
                  )
@@ -408,48 +407,48 @@ sub grab_jobs_for_worker {
         }
     }
 
-    return $self->fetch_all_by_worker_id_AND_status($worker_id, 'CLAIMED') ;
+    return $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') ;
 }
 
 
-=head2 release_undone_jobs_from_worker
+=head2 release_undone_jobs_from_role
 
-  Arg [1]    : Bio::EnsEMBL::Hive::Worker object
+  Arg [1]    : Bio::EnsEMBL::Hive::Role object
   Arg [2]    : optional message to be recorded in 'job_message' table
   Example    :
-  Description: If a worker has died some of its jobs need to be reset back to 'READY'
+  Description: If a Worker has died some of its jobs need to be reset back to 'READY'
                so they can be rerun.
                Jobs in state CLAIMED as simply reset back to READY.
                If jobs was 'in progress' (PRE_CLEANUP, FETCH_INPUT, RUN, WRITE_OUTPUT, POST_CLEANUP) 
                the retry_count is increased and the status set back to READY.
                If the retry_count >= $max_retry_count (3 by default) the job is set
                to 'FAILED' and not rerun again.
-  Exceptions : $worker must be defined
+  Exceptions : $role must be defined
   Caller     : Bio::EnsEMBL::Hive::Queen
 
 =cut
 
-sub release_undone_jobs_from_worker {
-    my ($self, $worker, $msg) = @_;
+sub release_undone_jobs_from_role {
+    my ($self, $role, $msg) = @_;
 
-    my $current_role    = $worker->current_role;
-    my $analysis        = $current_role->analysis;
-    my $max_retry_count = $analysis->max_retry_count();
-    my $worker_id       = $worker->dbID();
+    my $role_id         = $role->dbID;
+    my $analysis        = $role->analysis;
+    my $max_retry_count = $analysis->max_retry_count;
+    my $worker          = $role->worker;
 
         #first just reset the claimed jobs, these don't need a retry_count index increment:
-        # (previous worker_id does not matter, because that worker has never had a chance to run the job)
+        # (previous role_id does not matter, because that Role has never had a chance to run the job)
     $self->dbc->do( qq{
         UPDATE job
-           SET status='READY', worker_id=NULL
-         WHERE worker_id='$worker_id'
+           SET status='READY', role_id=NULL
+         WHERE role_id='$role_id'
            AND status='CLAIMED'
     } );
 
     my $sth = $self->prepare( qq{
         SELECT job_id
           FROM job
-         WHERE worker_id='$worker_id'
+         WHERE role_id='$role_id'
            AND status in ('PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
     } );
     $sth->execute();
@@ -490,7 +489,7 @@ sub release_and_age_job {
     $runtime_msec = "NULL" unless(defined $runtime_msec);
         # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
         #
-        # FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
+        # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
         #
     $self->dbc->do( 
         "UPDATE job "
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/LogMessageAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/LogMessageAdaptor.pm
index f64b3e6c0..5a0afe91f 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/LogMessageAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/LogMessageAdaptor.pm
@@ -55,9 +55,11 @@ sub store_job_message {
 
         # Note: the timestamp 'time' column will be set automatically
     my $sql = qq{
-        INSERT INTO $table_name (job_id, worker_id, retry, status, msg, is_error)
-                           SELECT job_id, worker_id, retry_count, status, ?, ?
-                             FROM job WHERE job_id=?
+        INSERT INTO $table_name (job_id, role_id, worker_id, retry, status, msg, is_error)
+                           SELECT job_id, role_id, worker_id, retry_count, status, ?, ?
+                             FROM job
+                             JOIN role USING(role_id)
+                            WHERE job_id=?
     };
 
     my $sth = $self->prepare( $sql );
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/RoleAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/RoleAdaptor.pm
index fac6e3aaf..09e8b49ee 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/RoleAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/RoleAdaptor.pm
@@ -134,5 +134,12 @@ sub print_active_role_counts {
 }
 
 
+sub fetch_all_finished_roles_with_unfinished_jobs {
+    my $self = shift;
+
+    return $self->fetch_all( "JOIN job USING(role_id) WHERE when_finished IS NOT NULL AND status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON') GROUP BY role_id" );
+}
+
+
 1;
 
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index 7338d8a32..62b17d2a3 100644
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -210,7 +210,7 @@ sub specialize_new_worker {
         die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
     }
 
-    my ($analysis, $stats, $special_batch);
+    my ($analysis, $stats);
     my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
 
     if($job_id or $analysis_id or $logic_name) {    # probably pre-specialized from command-line
@@ -234,14 +234,7 @@ sub specialize_new_worker {
                 warn "Increasing the semaphore count of the dependent job";
                 $job_adaptor->increase_semaphore_count_for_jobid( $job->semaphored_job_id );
             }
-
-            my $worker_id = $worker->dbID;
-            if($job = $job_adaptor->reset_or_grab_job_by_dbID($job_id, $worker_id)) {
-                $special_batch = [ $job ];
-                $analysis_id = $job->analysis_id;
-            } else {
-                die "Could not claim job with dbID='$job_id' for worker with dbID='$worker_id'";
-            }
+            $analysis_id = $job->analysis_id;
         }
 
         if($logic_name) {
@@ -263,7 +256,7 @@ sub specialize_new_worker {
         $stats = $analysis->stats;
         $self->safe_synchronize_AnalysisStats($stats);
 
-        unless($special_batch or $force) {    # do we really need to run this analysis?
+        unless($job_id or $force) {    # do we really need to run this analysis?
             if($self->db->get_RoleAdaptor->get_hive_current_load() >= 1.1) {
                 $worker->cause_of_death('HIVE_OVERLOAD');
                 die "Hive is overloaded, can't specialize a worker";
@@ -298,8 +291,15 @@ sub specialize_new_worker {
     $role_adaptor->store( $new_role );
     $worker->current_role( $new_role );
 
-    if($special_batch) {
-        $worker->special_batch( $special_batch );
+    if($job_id) {
+        my $role_id = $new_role->dbID;
+        if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
+
+            $worker->special_batch( [ $job ] );
+        } else {
+            die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
+        }
+
     } else {    # count it as autonomous worker sharing the load of that analysis:
 
         $analysis_stats_adaptor->update_status($analysis_id, 'WORKING');
@@ -355,7 +355,7 @@ sub register_worker_death {
             or  $cause_of_death eq 'HIVE_OVERLOAD'
             or  $cause_of_death eq 'LIFESPAN'
         ) {
-                $self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_worker($worker);
+                $self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_role( $current_role );
         }
 
             # re-sync the analysis_stats when a worker dies as part of dynamic sync system
@@ -444,14 +444,14 @@ sub check_for_dead_workers {    # scans the whole Valley for lost Workers (but i
 
         # the following bit is completely Meadow-agnostic and only restores database integrity:
     if($check_buried_in_haste) {
-        warn "GarbageCollector:\tChecking for Workers buried in haste...\n";
-        my $buried_in_haste_list = $self->fetch_all_dead_workers_with_jobs();
+        warn "GarbageCollector:\tChecking for Workers/Roles buried in haste...\n";
+        my $buried_in_haste_list = $self->db->get_RoleAdaptor->fetch_all_finished_roles_with_unfinished_jobs();
         if(my $bih_number = scalar(@$buried_in_haste_list)) {
             warn "GarbageCollector:\tfound $bih_number jobs, reclaiming.\n\n";
             if($bih_number) {
-                my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
-                foreach my $worker (@$buried_in_haste_list) {
-                    $job_adaptor->release_undone_jobs_from_worker($worker);
+                my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
+                foreach my $role (@$buried_in_haste_list) {
+                    $job_adaptor->release_undone_jobs_from_role( $role );
                 }
             }
         } else {
@@ -522,13 +522,6 @@ sub fetch_overdue_workers {
 }
 
 
-sub fetch_all_dead_workers_with_jobs {
-    my $self = shift;
-
-    return $self->fetch_all( "JOIN job j USING(worker_id) WHERE worker.status='DEAD' AND j.status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON') GROUP BY worker_id" );
-}
-
-
 =head2 synchronize_hive
 
   Arg [1]    : $filter_analysis (optional)
diff --git a/modules/Bio/EnsEMBL/Hive/Storable.pm b/modules/Bio/EnsEMBL/Hive/Storable.pm
index ba1a954ba..5b3c2ffc0 100644
--- a/modules/Bio/EnsEMBL/Hive/Storable.pm
+++ b/modules/Bio/EnsEMBL/Hive/Storable.pm
@@ -156,7 +156,7 @@ sub AUTOLOAD {
                 # attempt to lazy-load:
             } elsif( !$self->{$foo_obj_method_name} and my $foo_object_id = $self->{$foo_id_method_name}) {
                 my $foo_class = 'Bio::EnsEMBL::Hive::'.$AdaptorType;
-                my $collection = $foo_class->collection();
+                my $collection = $foo_class->can('collection') && $foo_class->collection();
                 if( $collection and $self->{$foo_obj_method_name} = $collection->find_one_by('dbID', $foo_object_id) ) { # careful: $AdaptorType may not be unique (aliases)
 #                    warn "Lazy-loading object from $AdaptorType collection\n";
                 } elsif(my $adaptor = $self->adaptor) {
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index 4124a6bf8..8f41efdf7 100644
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -456,12 +456,13 @@ sub run {
         } else {    # a proper "BATCHES" loop
 
             while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
+                my $current_role        = $self->current_role;
 
-                if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
+                if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
                     my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
                     $self->worker_say( $msg );
                     $self->cause_of_death('CONTAMINATED');
-                    $job_adaptor->release_undone_jobs_from_worker($self, $msg);
+                    $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
 
                 } elsif( $self->job_limiter->reached()) {
                     $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
@@ -472,12 +473,10 @@ sub run {
                     $self->cause_of_death('LIFESPAN');
 
                 } else {
-                    my $current_role        = $self->current_role;
                     my $desired_batch_size  = $current_role->analysis->stats->get_or_estimate_batch_size();
                     $desired_batch_size     = $self->job_limiter->preliminary_offer( $desired_batch_size );
 
-                    my $role_rank = $self->adaptor->db->get_RoleAdaptor->get_role_rank( $current_role );
-                    my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size, $role_rank );
+                    my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
                     if(scalar(@$actual_batch)) {
                         my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
                         $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
diff --git a/sql/foreign_keys.sql b/sql/foreign_keys.sql
index a1ead9902..8a92660a7 100644
--- a/sql/foreign_keys.sql
+++ b/sql/foreign_keys.sql
@@ -47,10 +47,10 @@ ALTER TABLE analysis_base           ADD FOREIGN KEY (resource_class_id)
 ALTER TABLE resource_description    ADD FOREIGN KEY (resource_class_id)         REFERENCES resource_class(resource_class_id);
 ALTER TABLE worker                  ADD FOREIGN KEY (resource_class_id)         REFERENCES resource_class(resource_class_id);
 
+ALTER TABLE job                     ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
+ALTER TABLE job_file                ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
 ALTER TABLE log_message             ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
 
-ALTER TABLE job                     ADD FOREIGN KEY (worker_id)                 REFERENCES worker(worker_id)                    ON DELETE CASCADE;
-ALTER TABLE job_file                ADD FOREIGN KEY (worker_id)                 REFERENCES worker(worker_id)                    ON DELETE CASCADE;
 ALTER TABLE log_message             ADD FOREIGN KEY (worker_id)                 REFERENCES worker(worker_id)                    ON DELETE CASCADE;
 ALTER TABLE role                    ADD FOREIGN KEY (worker_id)                 REFERENCES worker(worker_id)                    ON DELETE CASCADE;
 ALTER TABLE worker_resource_usage   ADD FOREIGN KEY (worker_id)                 REFERENCES worker(worker_id)                    ON DELETE CASCADE;
diff --git a/sql/patch_2014-05-16.mysql b/sql/patch_2014-05-16.mysql
new file mode 100644
index 000000000..02ca76676
--- /dev/null
+++ b/sql/patch_2014-05-16.mysql
@@ -0,0 +1,33 @@
+
+    -- First remove the ForeignKeys from job.worker_id and job_file.worker_id:
+ALTER TABLE job DROP FOREIGN KEY job_ibfk_4;
+ALTER TABLE job_file DROP FOREIGN KEY job_file_ibfk_2;
+
+    -- Also remove Indices from the old columns:
+ALTER TABLE job DROP INDEX worker_id;
+ALTER TABLE job_file DROP INDEX worker_id;
+
+    -- Use the existing worker_id columns as reference to add role_id columns:
+ALTER TABLE job ADD COLUMN role_id INTEGER DEFAULT NULL AFTER worker_id;
+ALTER TABLE job_file ADD COLUMN role_id INTEGER DEFAULT NULL AFTER worker_id;
+
+    -- Pretend we had role entries from the very beginning (the data is very approximately correct!):
+UPDATE job j, role r SET j.role_id=r.role_id WHERE r.worker_id=j.worker_id AND CASE WHEN completed IS NOT NULL THEN when_started<=completed AND (when_finished IS NULL OR completed<=when_finished) ELSE when_finished IS NULL END;
+UPDATE job_file jf, job j SET jf.role_id=j.role_id WHERE jf.job_id=j.job_id;
+
+    -- Now we can drop the columns themselves:
+ALTER TABLE job DROP COLUMN worker_id;
+ALTER TABLE job_file DROP COLUMN worker_id;
+
+    -- Add new Indices:
+ALTER TABLE job ADD INDEX role_status (role_id, status);
+ALTER TABLE job_file ADD INDEX role (role_id);
+
+
+    -- Add ForeignKeys on the new columns:
+ALTER TABLE job                     ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
+ALTER TABLE job_file                ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
+
+    -- UPDATE hive_sql_schema_version
+UPDATE hive_meta SET meta_value=62 WHERE meta_key='hive_sql_schema_version' AND meta_value='61';
+
diff --git a/sql/patch_2014-05-16.pgsql b/sql/patch_2014-05-16.pgsql
new file mode 100644
index 000000000..41bc25597
--- /dev/null
+++ b/sql/patch_2014-05-16.pgsql
@@ -0,0 +1,33 @@
+
+    -- First remove the ForeignKeys from job.worker_id and job_file.worker_id:
+ALTER TABLE job DROP CONSTRAINT job_worker_id_fkey;
+ALTER TABLE job_file DROP CONSTRAINT job_file_worker_id_fkey;
+
+    -- Also remove Indices from the old columns:
+DROP INDEX job_worker_id_status_idx;
+DROP INDEX job_file_worker_id_idx;
+
+    -- Add role_id columns:
+ALTER TABLE job ADD COLUMN role_id INTEGER DEFAULT NULL;
+ALTER TABLE job_file ADD COLUMN role_id INTEGER DEFAULT NULL;
+
+    -- Pretend we had role entries from the very beginning (the data is very approximately correct!):
+UPDATE job j set role_id = (SELECT r.role_id FROM role r WHERE r.worker_id=j.worker_id AND CASE WHEN completed IS NOT NULL THEN when_started<=completed AND (when_finished IS NULL OR completed<=when_finished) ELSE when_finished IS NULL END);
+UPDATE job_file jf set role_id = (SELECT role_id FROM job j WHERE j.job_id=jf.job_id);
+
+    -- Now we can drop the columns themselves:
+ALTER TABLE job DROP COLUMN worker_id;
+ALTER TABLE job_file DROP COLUMN worker_id;
+
+    -- Add new Indices:
+CREATE INDEX ON job (role_id, status);
+CREATE INDEX ON job_file (role_id);
+
+
+    -- Add ForeignKeys on the new columns:
+ALTER TABLE job                     ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
+ALTER TABLE job_file                ADD FOREIGN KEY (role_id)                   REFERENCES role(role_id)                        ON DELETE CASCADE;
+
+    -- UPDATE hive_sql_schema_version
+UPDATE hive_meta SET meta_value=62 WHERE meta_key='hive_sql_schema_version' AND meta_value='61';
+
diff --git a/sql/tables.mysql b/sql/tables.mysql
index 3a3667a7b..6d69260d7 100644
--- a/sql/tables.mysql
+++ b/sql/tables.mysql
@@ -312,7 +312,7 @@ CREATE TABLE resource_description (
 @column input_id                input data passed into Analysis:RunnableDB to control the work
 @column param_id_stack          a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
 @column accu_id_stack           a CSV of job_ids whose accu's contribute to the stack of local variables for the job
-@column worker_id               link to worker table to define which worker claimed this job
+@column role_id                 links to the Role that claimed this job (NULL means it has never been claimed)
 @column status                  state the job is in
 @column retry_count             number times job had to be reset when worker failed to run it
 @column completed               when the job was completed
@@ -329,7 +329,7 @@ CREATE TABLE job (
     input_id                CHAR(255)   NOT NULL,
     param_id_stack          CHAR(64)    NOT NULL DEFAULT '',
     accu_id_stack           CHAR(64)    NOT NULL DEFAULT '',
-    worker_id               INTEGER              DEFAULT NULL,
+    role_id                 INTEGER              DEFAULT NULL,
     status                  ENUM('SEMAPHORED','READY','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
     retry_count             INTEGER     NOT NULL DEFAULT 0,
     completed               TIMESTAMP                    NULL,  -- mysql's special for "TIMESTAMP DEFAULT NULL"
@@ -341,7 +341,7 @@ CREATE TABLE job (
 
     UNIQUE  KEY input_id_stacks_analysis    (input_id, param_id_stack, accu_id_stack, analysis_id), -- to avoid repeating tasks
             KEY analysis_status_retry       (analysis_id, status, retry_count),                     -- for claiming jobs
-            KEY  worker_id                  (worker_id, status)                                     -- for fetching and releasing claimed jobs
+            KEY role_status                 (role_id, status)                                       -- for fetching and releasing claimed jobs
 ) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
 
 
@@ -356,8 +356,8 @@ CREATE TABLE job (
         There is max one entry per job_id and retry.
 
 @column job_id             foreign key
-@column worker_id          link to worker table to define which worker claimed this job
 @column retry              copy of retry_count of job as it was run
+@column role_id            links to the Role that claimed this job
 @column stdout_file        path to the job's STDOUT log
 @column stderr_file        path to the job's STDERR log
 */
@@ -365,12 +365,12 @@ CREATE TABLE job (
 CREATE TABLE job_file (
     job_id                  INTEGER     NOT NULL,
     retry                   INTEGER     NOT NULL,
-    worker_id               INTEGER     NOT NULL,
+    role_id                 INTEGER     NOT NULL,
     stdout_file             VARCHAR(255),
     stderr_file             VARCHAR(255),
 
     PRIMARY KEY job_retry   (job_id, retry),
-            KEY  worker_id  (worker_id)
+            KEY role        (role_id)
 ) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
 
 
diff --git a/sql/tables.pgsql b/sql/tables.pgsql
index 556f06356..10fa0a99b 100644
--- a/sql/tables.pgsql
+++ b/sql/tables.pgsql
@@ -315,7 +315,7 @@ CREATE TABLE resource_description (
 @column input_id                input data passed into Analysis:RunnableDB to control the work
 @column param_id_stack          a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
 @column accu_id_stack           a CSV of job_ids whose accu's contribute to the stack of local variables for the job
-@column worker_id               link to worker table to define which worker claimed this job
+@column role_id                 links to the Role that claimed this job (NULL means it has never been claimed)
 @column status                  state the job is in
 @column retry_count             number times job had to be reset when worker failed to run it
 @column completed               when the job was completed
@@ -335,7 +335,7 @@ CREATE TABLE job (
     input_id                TEXT        NOT NULL,
     param_id_stack          TEXT        NOT NULL DEFAULT '',
     accu_id_stack           TEXT        NOT NULL DEFAULT '',
-    worker_id               INTEGER              DEFAULT NULL,
+    role_id                 INTEGER              DEFAULT NULL,
     status                  jw_status   NOT NULL DEFAULT 'READY',
     retry_count             INTEGER     NOT NULL DEFAULT 0,
     completed               TIMESTAMP            DEFAULT NULL,
@@ -348,7 +348,7 @@ CREATE TABLE job (
     UNIQUE (input_id, param_id_stack, accu_id_stack, analysis_id)   -- to avoid repeating tasks
 );
 CREATE INDEX ON job (analysis_id, status, retry_count); -- for claiming jobs
-CREATE INDEX ON job (worker_id, status);                -- for fetching and releasing claimed jobs
+CREATE INDEX ON job (role_id, status);                  -- for fetching and releasing claimed jobs
 
 
 /**
@@ -362,7 +362,7 @@ CREATE INDEX ON job (worker_id, status);                -- for fetching and rele
         There is max one entry per job_id and retry.
 
 @column job_id             foreign key
-@column worker_id          link to worker table to define which worker claimed this job
+@column role_id            links to the Role that claimed this job
 @column retry              copy of retry_count of job as it was run
 @column stdout_file        path to the job's STDOUT log
 @column stderr_file        path to the job's STDERR log
@@ -371,13 +371,13 @@ CREATE INDEX ON job (worker_id, status);                -- for fetching and rele
 CREATE TABLE job_file (
     job_id                  INTEGER     NOT NULL,
     retry                   INTEGER     NOT NULL,
-    worker_id               INTEGER     NOT NULL,
+    role_id                 INTEGER     NOT NULL,
     stdout_file             VARCHAR(255),
     stderr_file             VARCHAR(255),
 
     PRIMARY KEY (job_id, retry)
 );
-CREATE INDEX ON job_file (worker_id);
+CREATE INDEX ON job_file (role_id);
 
 
 /**
diff --git a/sql/tables.sqlite b/sql/tables.sqlite
index 15d088f0d..f5f63294b 100644
--- a/sql/tables.sqlite
+++ b/sql/tables.sqlite
@@ -310,7 +310,7 @@ CREATE TABLE resource_description (
 @column input_id                input data passed into Analysis:RunnableDB to control the work
 @column param_id_stack          a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
 @column accu_id_stack           a CSV of job_ids whose accu's contribute to the stack of local variables for the job
-@column worker_id               link to worker table to define which worker claimed this job
+@column role_id                 links to the Role that claimed this job (NULL means it has never been claimed)
 @column status                  state the job is in
 @column retry_count             number times job had to be reset when worker failed to run it
 @column completed               when the job was completed
@@ -327,7 +327,7 @@ CREATE TABLE job (
     input_id                CHAR(255)   NOT NULL,
     param_id_stack          CHAR(64)    NOT NULL DEFAULT '',
     accu_id_stack           CHAR(64)    NOT NULL DEFAULT '',
-    worker_id               INTEGER              DEFAULT NULL,
+    role_id                 INTEGER              DEFAULT NULL,
     status                  TEXT        NOT NULL DEFAULT 'READY', /* enum('SEMAPHORED','READY','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, */
     retry_count             INTEGER     NOT NULL DEFAULT 0,
     completed               TIMESTAMP            DEFAULT NULL,
@@ -339,7 +339,7 @@ CREATE TABLE job (
 );
 CREATE UNIQUE INDEX job_input_id_stacks_analysis_idx ON job (input_id, param_id_stack, accu_id_stack, analysis_id);
 CREATE        INDEX job_analysis_status_retry_idx ON job (analysis_id, status, retry_count);
-CREATE        INDEX job_worker_idx ON job (worker_id);
+CREATE        INDEX job_role_id_status_id_idx ON job (role_id);
 
 
 /**
@@ -353,8 +353,8 @@ CREATE        INDEX job_worker_idx ON job (worker_id);
         There is max one entry per job_id and retry.
 
 @column job_id             foreign key
-@column worker_id          link to worker table to define which worker claimed this job
 @column retry              copy of retry_count of job as it was run
+@column role_id            links to the Role that claimed this job
 @column stdout_file        path to the job's STDOUT log
 @column stderr_file        path to the job's STDERR log
 */
@@ -362,13 +362,13 @@ CREATE        INDEX job_worker_idx ON job (worker_id);
 CREATE TABLE job_file (
     job_id                  INTEGER     NOT NULL,
     retry                   INTEGER     NOT NULL,
-    worker_id               INTEGER     NOT NULL,
+    role_id                 INTEGER     NOT NULL,
     stdout_file             VARCHAR(255),
     stderr_file             VARCHAR(255),
 
     PRIMARY KEY (job_id, retry)
 );
-CREATE        INDEX job_file_worker_idx ON job_file (worker_id);
+CREATE        INDEX job_file_role_id_idx ON job_file (role_id);
 
 
 /**
-- 
GitLab