Commit 7fe8c8c9 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Removed the option of setting the worker temp directory directly in the Runnable

The implementation was not aware that a worker can take on multiple
analyses. Being implemented at the Runnable level, there was also the risk
that the method would use job parameters to make the directory name,
unaware that the name is set for all jobs of all analyses the worker takes.

The impact is anyway minimal, as the only Ensembl Runnable that was using
this feature is a Compara Runnable, which can be ported to the new
resource-class based system.

And if Runnables still need to change the worker's temp directory, there
are still free to override worker_temp_directory() and
cleanup_worker_temp_directory().
parent a5ac7ca0
...@@ -204,16 +204,9 @@ In our case, Examl uses MPI and wants to share data via the filesystem too. ...@@ -204,16 +204,9 @@ In our case, Examl uses MPI and wants to share data via the filesystem too.
In this specific Runnable, Examl is set to run in eHive's managed temporary In this specific Runnable, Examl is set to run in eHive's managed temporary
directory, which by default is under /tmp which is not shared across nodes on directory, which by default is under /tmp which is not shared across nodes on
our compute cluster. our compute cluster.
We have to override the eHive method to use a shared directory (``$self->param('examl_dir')``) instead. We have to override the eHive method to use a shared directory (``$self->o('examl_dir')``) instead.
:: This can be done at the resource class level, by adding
``"-worker_base_tmp_dir ".$self->o('examl_dir')`` to the
use Path::Tiny; ``worker_cmd_args`` attribute of the resource-class
sub worker_temp_directory_name {
my $self = shift @_;
my $default_temp_directory_name = $self->SUPER::worker_temp_directory_name(@_);
my $name = path($default_temp_directory_name)->basename;
return $self->param('examl_dir')."/$name/";
}
...@@ -84,7 +84,8 @@ Job (by removing some files, for instance), or to clean it up completely ...@@ -84,7 +84,8 @@ Job (by removing some files, for instance), or to clean it up completely
with ``$self->cleanup_worker_temp_directory``. with ``$self->cleanup_worker_temp_directory``.
By default, this directory will be put under /tmp, but it can be overriden By default, this directory will be put under /tmp, but it can be overriden
by adding a ``worker_temp_directory_name`` method to the runnable. This can by setting the ``-worker_base_tmp_dir`` option for workers through their
resource classes. This can
be used to: be used to:
- use a faster filesystem (although /tmp is usually local to the machine), - use a faster filesystem (although /tmp is usually local to the machine),
......
...@@ -79,7 +79,7 @@ response from GuestProcess): ...@@ -79,7 +79,7 @@ response from GuestProcess):
---> dbIDs of the jobs that have been created ---> dbIDs of the jobs that have been created
<--- WORKER_TEMP_DIRECTORY <--- WORKER_TEMP_DIRECTORY
// The content is the "worker_temp_directory_name" as defined in the runnable (or null otherwise) // No content needed (ignored)
---> returns the temporary directory of the worker ---> returns the temporary directory of the worker
<--- JOB_END <--- JOB_END
...@@ -135,7 +135,7 @@ use base ('Bio::EnsEMBL::Hive::Process'); ...@@ -135,7 +135,7 @@ use base ('Bio::EnsEMBL::Hive::Process');
# -------------------------------------- <versioning of the GuestProcess interface> ------------------------------------------------------- # -------------------------------------- <versioning of the GuestProcess interface> -------------------------------------------------------
our $GUESTPROCESS_PROTOCOL_VERSION = '3'; # Make sure you change this number whenever an incompatible change is introduced our $GUESTPROCESS_PROTOCOL_VERSION = '4'; # Make sure you change this number whenever an incompatible change is introduced
=head2 get_protocol_version =head2 get_protocol_version
...@@ -561,7 +561,6 @@ sub life_cycle { ...@@ -561,7 +561,6 @@ sub life_cycle {
$self->send_response($d); $self->send_response($d);
} elsif ($event eq 'WORKER_TEMP_DIRECTORY') { } elsif ($event eq 'WORKER_TEMP_DIRECTORY') {
$self->{worker_temp_directory_name} = $content;
my $wtd = $self->worker_temp_directory; my $wtd = $self->worker_temp_directory;
$self->send_response($wtd); $self->send_response($wtd);
...@@ -598,33 +597,12 @@ sub life_cycle { ...@@ -598,33 +597,12 @@ sub life_cycle {
} }
} }
=head2 worker_temp_directory_name
Example : $process->worker_temp_directory_name();
Description : Returns the name of the temp directory for this module
The value in $self is initialized at the WORKER_TEMP_DIRECTORY
event above and returned to the caller if defined. This allows
runnables to redefine the name
Returntype : string
Exceptions : none
=cut
sub worker_temp_directory_name {
my $self = shift;
return $self->{worker_temp_directory_name} if $self->{worker_temp_directory_name};
return $self->SUPER::worker_temp_directory_name();
}
### Summary of Process methods ### ### Summary of Process methods ###
## Have to be redefined ## Have to be redefined
# life_cycle # life_cycle
# param_defaults # param_defaults
# worker_temp_directory_name
## Needed, can be reused from the base class ## Needed, can be reused from the base class
# worker_temp_directory # worker_temp_directory
......
...@@ -678,19 +678,13 @@ sub worker_temp_directory { ...@@ -678,19 +678,13 @@ sub worker_temp_directory {
my $self = shift @_; my $self = shift @_;
unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) { unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
$self->{'_tmp_dir'} = $self->worker_temp_directory_name(); $self->{'_tmp_dir'} = $self->worker->temp_directory_name();
make_path( $self->{'_tmp_dir'}, { mode => 0777 } ); make_path( $self->{'_tmp_dir'}, { mode => 0777 } );
throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'}); throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
} }
return $self->{'_tmp_dir'}; return $self->{'_tmp_dir'};
} }
sub worker_temp_directory_name {
my $self = shift @_;
return $self->worker->temp_directory_name;
}
=head2 cleanup_worker_temp_directory =head2 cleanup_worker_temp_directory
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
<property name="build" location="build" /> <property name="build" location="build" />
<property name="lib" location="lib" /> <property name="lib" location="lib" />
<property name="doc" location="doc" /> <property name="doc" location="doc" />
<property name="version" value="3.1" /> <property name="version" value="4.0" />
<path id="wrapper_classpath"> <path id="wrapper_classpath">
<fileset dir="${lib}/"> <fileset dir="${lib}/">
......
...@@ -73,7 +73,7 @@ public abstract class BaseRunnable { ...@@ -73,7 +73,7 @@ public abstract class BaseRunnable {
private static final String JOB_END_TYPE = "JOB_END"; private static final String JOB_END_TYPE = "JOB_END";
private static final String OK = "OK"; private static final String OK = "OK";
public final static String VERSION = "3.1"; public final static String VERSION = "4.0";
protected final static Map<String, Object> DEFAULT_PARAMS = new HashMap<>(); protected final static Map<String, Object> DEFAULT_PARAMS = new HashMap<>();
...@@ -317,23 +317,13 @@ public abstract class BaseRunnable { ...@@ -317,23 +317,13 @@ public abstract class BaseRunnable {
*/ */
protected String workerTempDirectory() { protected String workerTempDirectory() {
if (workerTempDirectory == null) { if (workerTempDirectory == null) {
sendEventMessage(WORKER_TEMP_DIRECTORY_TYPE, sendEventMessage(WORKER_TEMP_DIRECTORY_TYPE, null);
getWorkerTemplateName());
workerTempDirectory = (String) (readMessageAndRespond() workerTempDirectory = (String) (readMessageAndRespond()
.get(RESPONSE_KEY)); .get(RESPONSE_KEY));
} }
return workerTempDirectory; return workerTempDirectory;
} }
/**
* Override to provide a special template for the worker temporary directory
*
* @return A String representing the template location for worker directories or null
*/
protected String getWorkerTemplateName() {
return null;
}
/** /**
* Send a message and wait for OK from the parent * Send a message and wait for OK from the parent
* *
......
...@@ -24,7 +24,7 @@ import unittest ...@@ -24,7 +24,7 @@ import unittest
import warnings import warnings
import traceback import traceback
__version__ = "3.0" __version__ = "4.0"
__doc__ = """ __doc__ = """
This module mainly implements python's counterpart of GuestProcess. Read This module mainly implements python's counterpart of GuestProcess. Read
...@@ -219,12 +219,9 @@ class BaseRunnable(object): ...@@ -219,12 +219,9 @@ class BaseRunnable(object):
def worker_temp_directory(self): def worker_temp_directory(self):
"""Returns the full path of the temporary directory created by the worker. """Returns the full path of the temporary directory created by the worker.
Runnables can implement "worker_temp_directory_name()" to return the name
they would like to use
""" """
if self.__created_worker_temp_directory is None: if self.__created_worker_temp_directory is None:
template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None self.__send_message('WORKER_TEMP_DIRECTORY', None)
self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
self.__created_worker_temp_directory = self.__read_message()['response'] self.__created_worker_temp_directory = self.__read_message()['response']
return self.__created_worker_temp_directory return self.__created_worker_temp_directory
......
...@@ -14,6 +14,11 @@ ...@@ -14,6 +14,11 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# standaloneJob.pl eHive.examples.TestRunnable -language python3
import os
import subprocess
import eHive import eHive
class TestRunnable(eHive.BaseRunnable): class TestRunnable(eHive.BaseRunnable):
...@@ -29,15 +34,20 @@ class TestRunnable(eHive.BaseRunnable): ...@@ -29,15 +34,20 @@ class TestRunnable(eHive.BaseRunnable):
self.warning("Fetch the world !") self.warning("Fetch the world !")
print("alpha is", self.param_required('alpha')) print("alpha is", self.param_required('alpha'))
print("beta is", self.param_required('beta')) print("beta is", self.param_required('beta'))
self.temp_dir = self.worker_temp_directory()
print("my directory name is", self.temp_dir)
def run(self): def run(self):
self.warning("Run the world !") self.warning("Run the world !")
s = self.param('alpha') + self.param('beta') s = self.param('alpha') + self.param('beta')
print("set gamma to", s) print("set gamma to", s)
self.param('gamma', s) self.param('gamma', s)
self.greeting_path = os.path.join(self.temp_dir, "hello")
subprocess.check_call(["touch", self.greeting_path])
def write_output(self): def write_output(self):
self.warning("Write to the world !") self.warning("Write to the world !")
print("gamma is", self.param('gamma')) print("gamma is", self.param('gamma'))
self.dataflow( {'gamma': self.param('gamma')}, 2 ) self.dataflow( {'gamma': self.param('gamma')}, 2 )
print("Greetings in place:", os.path.exists(self.greeting_path))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment