Process.py 9.87 KB
Newer Older
1

2
import eHive.Params
3
4
5
6
7
8
9
10

import os
import sys
import json
import numbers
import warnings
import traceback

11
__version__ = "0.1"
Matthieu Muffato's avatar
Matthieu Muffato committed
12
13

__doc__ = """
14
This module mainly implements python's counterpart of GuestProcess. Read
Matthieu Muffato's avatar
Matthieu Muffato committed
15
16
17
the later for more information about the JSON protocol used to communicate.
"""

18
class Job(object):
Matthieu Muffato's avatar
Matthieu Muffato committed
19
    """Dummy class to hold job-related information"""
20
21
    pass

Matthieu Muffato's avatar
cleanup  
Matthieu Muffato committed
22
class CompleteEarlyException(Exception):
23
24
25
26
    """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
    pass
class JobFailedException(Exception):
    """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
27
28
    pass
class HiveJSONMessageException(Exception):
29
    """Raised when we could not parse the JSON message coming from GuestProcess"""
30
31
32
    pass


33
class BaseRunnable(object):
34
    """This is the equivalent of GuestProcess. Note that most of the methods
Matthieu Muffato's avatar
Matthieu Muffato committed
35
36
37
38
39
40
41
    are private to be hidden in the derived classes.

    This class can be used as a base-class for people to redefine fetch_input(),
    run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
    Jobs are supposed to raise CompleteEarlyException in case they complete before
    reaching. They can also raise JobFailedException to indicate a general failure
    """
42

43
44
    # Private BaseRunnable interface
    #################################
45
46
47
48
49

    def __init__(self, read_fileno, write_fileno):
        # We need the binary mode to disable the buffering
        self.read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
        self.write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
50
        self.pid = os.getpid()
Matthieu Muffato's avatar
Matthieu Muffato committed
51
        self.debug = 0
52
        self.__process_life_cycle()
53

54
    def __print_debug(self, *args):
Matthieu Muffato's avatar
Matthieu Muffato committed
55
56
        if self.debug > 1:
            print("PYTHON {0}".format(self.pid), *args, file=sys.stderr)
57

58
    def __send_message(self, event, content):
Matthieu Muffato's avatar
Matthieu Muffato committed
59
        """seralizes the message in JSON and send it to the parent process"""
60
        def default_json_encoder(o):
61
            self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
62
63
            return 'UNSERIALIZABLE OBJECT'
        j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
64
        self.__print_debug('__send_message:', j)
Matthieu Muffato's avatar
Matthieu Muffato committed
65
        # UTF8 encoding has never been tested. Just hope it works :)
66
67
        self.write_pipe.write(bytes(j+"\n", 'utf-8'))

68
69
70
71
72
73
    def __send_response(self, response):
        """Sends a response message to the parent process"""
        self.__print_debug('__send_response:', response)
        # Like above, UTF8 encoding has never been tested. Just hope it works :)
        self.write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))

74
    def __read_message(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
75
        """Read a message from the parent and parse it"""
76
        try:
77
            self.__print_debug("__read_message ...")
78
            l = self.read_pipe.readline()
79
            self.__print_debug(" ... -> ", l[:-1].decode())
80
81
            return json.loads(l.decode())
        except ValueError as e:
Matthieu Muffato's avatar
Matthieu Muffato committed
82
            # HiveJSONMessageException is a more meaningful name than ValueError
83
            raise HiveJSONMessageException from e
84
85

    def __send_message_and_wait_for_OK(self, event, content):
Matthieu Muffato's avatar
Matthieu Muffato committed
86
        """Send a message and expects a response to be 'OK'"""
87
88
89
        self.__send_message(event, content)
        response = self.__read_message()
        if response['response'] != 'OK':
90
            raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
91

92
    def __process_life_cycle(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
93
        """Simple loop: wait for job parameters, do the job's life-cycle"""
94
        self.__send_message_and_wait_for_OK('VERSION', __version__)
95
        self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
96
97
98
99
        while True:
            self.__print_debug("waiting for instructions")
            config = self.__read_message()
            if 'input_job' not in config:
Matthieu Muffato's avatar
Matthieu Muffato committed
100
                self.__print_debug("no params, this is the end of the wrapper")
101
102
103
104
                return
            self.__job_life_cycle(config)

    def __job_life_cycle(self, config):
105
        """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
106
        self.__print_debug("__life_cycle")
107

108
        # Params
109
        self.p = eHive.Params.ParamContainer(config['input_job']['parameters'])
110
111
112
113
114
115

        # Job attributes
        self.input_job = Job()
        for x in ['dbID', 'input_id', 'retry_count']:
            setattr(self.input_job, x, config['input_job'][x])
        self.input_job.autoflow = True
116
117
        self.input_job.lethal_for_worker = False
        self.input_job.transient_error = True
118
119

        # Worker attributes
120
        self.debug = config['debug']
121
122
123
124
125
126
127

        # Which methods should be run
        steps = [ 'fetch_input', 'run' ]
        if self.input_job.retry_count > 0:
            steps.insert(0, 'pre_cleanup')
        if config['execute_writes']:
            steps.append('write_output')
128
        self.__print_debug("steps to run:", steps)
129
        self.__send_response('OK')
130
131
132
133
134
135
136
137
138
139

        # The actual life-cycle
        died_somewhere = False
        try:
            for s in steps:
                self.__run_method_if_exists(s)
        except CompleteEarlyException as e:
            self.warning(e.args[0] if len(e.args) else repr(e), False)
        except:
            died_somewhere = True
140
            self.warning( self.__traceback(2), True)
141
142
143
144
145

        try:
            self.__run_method_if_exists('post_cleanup')
        except:
            died_somewhere = True
146
            self.warning( self.__traceback(2), True)
147

148
149
150
        job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.p._param_hash, 'unsubstituted': self.p._unsubstituted_param_hash}}
        for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
            job_end_structure['job'][x] = getattr(self.input_job, x)
151
        self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
152
153

    def __run_method_if_exists(self, method):
Matthieu Muffato's avatar
Matthieu Muffato committed
154
155
        """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
        We only the call the method if it exists to save a trip to the database."""
156
157
158
159
        if hasattr(self, method):
            self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
            getattr(self, method)()

160
    def __traceback(self, skipped_traces):
Matthieu Muffato's avatar
Matthieu Muffato committed
161
        """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
162
163
164
165
166
167
        (etype, value, tb) = sys.exc_info()
        s1 = traceback.format_exception_only(etype, value)
        l = traceback.extract_tb(tb)[skipped_traces:]
        s2 = traceback.format_list(l)
        return "".join(s1+s2)

168

169
170
    # Public BaseRunnable interface
    ################################
171
172

    def warning(self, message, is_error = False):
Matthieu Muffato's avatar
Matthieu Muffato committed
173
        """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
174
175
        self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})

176
    def dataflow(self, output_ids, branch_name_or_code = 1):
Matthieu Muffato's avatar
Matthieu Muffato committed
177
        """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
178
179
        if branch_name_or_code == 1:
            self.autoflow = False
180
        self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.p._param_hash, 'unsubstituted': self.p._unsubstituted_param_hash}})
181
182
183
        return self.__read_message()

    def worker_temp_directory(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
184
185
186
187
        """Returns the full path of the temporary directory created by the worker.
        Runnables can implement "worker_temp_directory_name()" to return the name
        they would like to use
        """
188
189
190
191
192
193
194
195
196
197
        if not hasattr(self, '_created_worker_temp_directory'):
            template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None
            self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
            self._created_worker_temp_directory = self.__read_message()
        return self._created_worker_temp_directory

    # Param interface
    ##################

    def param_defaults(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
198
        """Returns the defaults parameters for this runnable"""
199
200
201
        return {}

    def param_required(self, param_name):
Matthieu Muffato's avatar
Matthieu Muffato committed
202
203
        """Returns the value of the parameter "param_name" or raises an exception
        if anything wrong happens. The exception is marked as non-transient."""
204
205
        t = self.input_job.transient_error
        self.input_job.transient_error = False
Matthieu Muffato's avatar
fixup  
Matthieu Muffato committed
206
        v = self.p.get_param(param_name)
207
208
        self.input_job.transient_error = t
        return v
209
210

    def param(self, param_name, *args):
Matthieu Muffato's avatar
Matthieu Muffato committed
211
212
        """When called as a setter: sets the value of the parameter "param_name".
        When called as a getter: returns the value of the parameter "param_name".
Matthieu Muffato's avatar
Matthieu Muffato committed
213
214
        It does not raise an exception if the parameter (or another one in the
        substitution stack) is undefined"""
215
216
217
218
219
220
221
222
        # As a setter
        if len(args):
            return self.p.set_param(param_name, args[0])

        # As a getter
        try:
            return self.p.get_param(param_name)
        except KeyError as e:
Matthieu Muffato's avatar
Matthieu Muffato committed
223
            warnings.warn("parameter '{0}' cannot be initialized because {1} is not defined !\n".format(param_name, e), eHive.Params.ParamWarning, 2)
224
225
226
            return None

    def param_exists(self, param_name):
Matthieu Muffato's avatar
Matthieu Muffato committed
227
        """Returns True or False, whether the parameter exists (it doesn't mean it can be successfully substituted)"""
228
229
230
        return self.p.has_param(param_name)

    def param_is_defined(self, param_name):
Matthieu Muffato's avatar
Matthieu Muffato committed
231
        """Returns True or False, whether the parameter exists, can be successfully substituted, and is not None"""
232
233
        if not self.param_exists(param_name):
            return False
234
235
236
237
238
        try:
            return self.p.get_param(param_name) is not None
        except KeyError:
            return False