Process.py 13.5 KB
Newer Older
1

2
import eHive.Params
3
4
5
6
7

import os
import sys
import json
import numbers
8
import unittest
9
10
11
import warnings
import traceback

12
__version__ = "0.2"
Matthieu Muffato's avatar
Matthieu Muffato committed
13
14

__doc__ = """
15
This module mainly implements python's counterpart of GuestProcess. Read
Matthieu Muffato's avatar
Matthieu Muffato committed
16
17
18
the later for more information about the JSON protocol used to communicate.
"""

19
class Job(object):
Matthieu Muffato's avatar
Matthieu Muffato committed
20
    """Dummy class to hold job-related information"""
21
22
    pass

Matthieu Muffato's avatar
cleanup  
Matthieu Muffato committed
23
class CompleteEarlyException(Exception):
24
25
26
27
    """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
    pass
class JobFailedException(Exception):
    """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
28
29
    pass
class HiveJSONMessageException(Exception):
30
    """Raised when we could not parse the JSON message coming from GuestProcess"""
31
    pass
32
33
34
class LostHiveConnectionException(Exception):
    """Raised when the process has lost the communication pipe with the Perl side"""
    pass
35
36


37
class BaseRunnable(object):
Matthieu Muffato's avatar
Matthieu Muffato committed
38
    """This is the counterpart of GuestProcess. Note that most of the methods
Matthieu Muffato's avatar
Matthieu Muffato committed
39
40
41
42
43
44
45
    are private to be hidden in the derived classes.

    This class can be used as a base-class for people to redefine fetch_input(),
    run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
    Jobs are supposed to raise CompleteEarlyException in case they complete before
    reaching. They can also raise JobFailedException to indicate a general failure
    """
46

47
48
    # Private BaseRunnable interface
    #################################
49
50
51

    def __init__(self, read_fileno, write_fileno):
        # We need the binary mode to disable the buffering
52
53
54
        self.__read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
        self.__write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
        self.__pid = os.getpid()
Matthieu Muffato's avatar
Matthieu Muffato committed
55
        self.debug = 0
56
        self.__process_life_cycle()
57

58
    def __print_debug(self, *args):
Matthieu Muffato's avatar
Matthieu Muffato committed
59
        if self.debug > 1:
60
            print("PYTHON {0}".format(self.__pid), *args, file=sys.stderr)
61

62
63
    # FIXME: we can probably merge __send_message and __send_response

64
    def __send_message(self, event, content):
Matthieu Muffato's avatar
Matthieu Muffato committed
65
        """seralizes the message in JSON and send it to the parent process"""
66
        def default_json_encoder(o):
67
            self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
68
69
            return 'UNSERIALIZABLE OBJECT'
        j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
70
        self.__print_debug('__send_message:', j)
Matthieu Muffato's avatar
Matthieu Muffato committed
71
        # UTF8 encoding has never been tested. Just hope it works :)
72
73
74
75
        try:
            self.__write_pipe.write(bytes(j+"\n", 'utf-8'))
        except BrokenPipeError as e:
            raise LostHiveConnectionException("__write_pipe") from None
76

77
78
79
80
    def __send_response(self, response):
        """Sends a response message to the parent process"""
        self.__print_debug('__send_response:', response)
        # Like above, UTF8 encoding has never been tested. Just hope it works :)
81
82
83
84
        try:
            self.__write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))
        except BrokenPipeError as e:
            raise LostHiveConnectionException("__write_pipe") from None
85

86
    def __read_message(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
87
        """Read a message from the parent and parse it"""
88
        try:
89
            self.__print_debug("__read_message ...")
90
            l = self.__read_pipe.readline()
91
            self.__print_debug(" ... -> ", l[:-1].decode())
92
            return json.loads(l.decode())
93
94
        except BrokenPipeError as e:
            raise LostHiveConnectionException("__read_pipe") from None
95
        except ValueError as e:
Matthieu Muffato's avatar
Matthieu Muffato committed
96
            # HiveJSONMessageException is a more meaningful name than ValueError
97
            raise HiveJSONMessageException from e
98
99

    def __send_message_and_wait_for_OK(self, event, content):
Matthieu Muffato's avatar
Matthieu Muffato committed
100
        """Send a message and expects a response to be 'OK'"""
101
102
103
        self.__send_message(event, content)
        response = self.__read_message()
        if response['response'] != 'OK':
104
            raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
105

106
    def __process_life_cycle(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
107
        """Simple loop: wait for job parameters, do the job's life-cycle"""
108
        self.__send_message_and_wait_for_OK('VERSION', __version__)
109
        self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
110
        self.__created_worker_temp_directory = None
111
112
113
114
        while True:
            self.__print_debug("waiting for instructions")
            config = self.__read_message()
            if 'input_job' not in config:
Matthieu Muffato's avatar
Matthieu Muffato committed
115
                self.__print_debug("no params, this is the end of the wrapper")
116
117
118
119
                return
            self.__job_life_cycle(config)

    def __job_life_cycle(self, config):
120
        """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
121
        self.__print_debug("__life_cycle")
122

123
        # Params
124
        self.__params = eHive.Params.ParamContainer(config['input_job']['parameters'], self.debug > 1)
125
126
127
128
129
130

        # Job attributes
        self.input_job = Job()
        for x in ['dbID', 'input_id', 'retry_count']:
            setattr(self.input_job, x, config['input_job'][x])
        self.input_job.autoflow = True
131
132
        self.input_job.lethal_for_worker = False
        self.input_job.transient_error = True
133
134

        # Worker attributes
135
        self.debug = config['debug']
136
137
138
139
140
141
142

        # Which methods should be run
        steps = [ 'fetch_input', 'run' ]
        if self.input_job.retry_count > 0:
            steps.insert(0, 'pre_cleanup')
        if config['execute_writes']:
            steps.append('write_output')
143
            steps.append('post_healthcheck')
144
        self.__print_debug("steps to run:", steps)
145
        self.__send_response('OK')
146
147
148
149
150
151
152
153

        # The actual life-cycle
        died_somewhere = False
        try:
            for s in steps:
                self.__run_method_if_exists(s)
        except CompleteEarlyException as e:
            self.warning(e.args[0] if len(e.args) else repr(e), False)
154
155
156
        except LostHiveConnectionException as e:
            # Mothing we can do, let's just exit
            raise
157
158
        except:
            died_somewhere = True
159
            self.warning( self.__traceback(2), True)
160
161
162

        try:
            self.__run_method_if_exists('post_cleanup')
163
164
165
        except LostHiveConnectionException as e:
            # Mothing we can do, let's just exit
            raise
166
167
        except:
            died_somewhere = True
168
            self.warning( self.__traceback(2), True)
169

170
        job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}}
171
172
        for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
            job_end_structure['job'][x] = getattr(self.input_job, x)
173
        self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
174
175

    def __run_method_if_exists(self, method):
Matthieu Muffato's avatar
Matthieu Muffato committed
176
177
        """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
        We only the call the method if it exists to save a trip to the database."""
178
179
180
181
        if hasattr(self, method):
            self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
            getattr(self, method)()

182
    def __traceback(self, skipped_traces):
Matthieu Muffato's avatar
Matthieu Muffato committed
183
        """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
184
185
186
187
188
189
        (etype, value, tb) = sys.exc_info()
        s1 = traceback.format_exception_only(etype, value)
        l = traceback.extract_tb(tb)[skipped_traces:]
        s2 = traceback.format_list(l)
        return "".join(s1+s2)

190

191
192
    # Public BaseRunnable interface
    ################################
193
194

    def warning(self, message, is_error = False):
Matthieu Muffato's avatar
Matthieu Muffato committed
195
        """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
196
197
        self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})

198
    def dataflow(self, output_ids, branch_name_or_code = 1):
Matthieu Muffato's avatar
Matthieu Muffato committed
199
        """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
200
201
        if branch_name_or_code == 1:
            self.autoflow = False
202
        self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}})
203
        return self.__read_message()['response']
204
205

    def worker_temp_directory(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
206
207
208
209
        """Returns the full path of the temporary directory created by the worker.
        Runnables can implement "worker_temp_directory_name()" to return the name
        they would like to use
        """
210
        if self.__created_worker_temp_directory is None:
211
212
            template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None
            self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
213
            self.__created_worker_temp_directory = self.__read_message()['response']
214
        return self.__created_worker_temp_directory
215
216
217
218
219

    # Param interface
    ##################

    def param_defaults(self):
Matthieu Muffato's avatar
Matthieu Muffato committed
220
        """Returns the defaults parameters for this runnable"""
221
222
223
        return {}

    def param_required(self, param_name):
Matthieu Muffato's avatar
Matthieu Muffato committed
224
        """Returns the value of the parameter "param_name" or raises an exception
225
226
        if anything wrong happens or the value is None. The exception is
        marked as non-transient."""
227
228
        t = self.input_job.transient_error
        self.input_job.transient_error = False
229
        v = self.__params.get_param(param_name)
230
231
        if v is None:
            raise eHive.Params.NullParamException(param_name)
232
233
        self.input_job.transient_error = t
        return v
234
235

    def param(self, param_name, *args):
Matthieu Muffato's avatar
Matthieu Muffato committed
236
237
        """When called as a setter: sets the value of the parameter "param_name".
        When called as a getter: returns the value of the parameter "param_name".
Matthieu Muffato's avatar
Matthieu Muffato committed
238
239
        It does not raise an exception if the parameter (or another one in the
        substitution stack) is undefined"""
240
241
        # As a setter
        if len(args):
242
            return self.__params.set_param(param_name, args[0])
243
244
245

        # As a getter
        try:
246
            return self.__params.get_param(param_name)
247
        except KeyError as e:
248
            warnings.warn("parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e), eHive.Params.ParamWarning, 2)
249
250
251
            return None

    def param_exists(self, param_name):
252
253
254
255
256
257
258
259
260
        """Returns True if the parameter exists and can be successfully
        substituted, None if the substitution fails, False if it is missing"""
        if not self.__params.has_param(param_name):
            return False
        try:
            self.__params.get_param(param_name)
            return True
        except KeyError:
            return None
261
262

    def param_is_defined(self, param_name):
263
264
265
266
267
268
269
        """Returns True if the parameter exists and can be successfully
        substituted to a defined value, None if the substitution fails,
        False if it is missing or evaluates as None"""
        e = self.param_exists(param_name)
        if not e:
            # False or None
            return e
270
        try:
271
            return self.__params.get_param(param_name) is not None
272
273
274
        except KeyError:
            return False

275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
class RunnableTest(unittest.TestCase):
    def test_job_param(self):
        class FakeRunnableWithParams(BaseRunnable):
            def __init__(self, d):
                self._BaseRunnable__params = eHive.Params.ParamContainer(d)
                self.input_job = Job()
                self.input_job.transient_error = True
        j = FakeRunnableWithParams({
            'a': 3,
            'b': None,
            'c': '#other#',
            'e': '#e#'
        })

        # param_exists
        self.assertIs( j.param_exists('a'), True, '"a" exists' )
        self.assertIs( j.param_exists('b'), True, '"b" exists' )
        self.assertIs( j.param_exists('c'), None, '"c"\'s existence is unclear' )
        self.assertIs( j.param_exists('d'), False, '"d" doesn\'t exist' )
        with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
            j.param_exists('e')

        # param_is_defined
        self.assertIs( j.param_is_defined('a'), True, '"a" is defined' )
        self.assertIs( j.param_is_defined('b'), False, '"b" is not defined' )
        self.assertIs( j.param_is_defined('c'), None, '"c"\'s defined-ness is unclear' )
        self.assertIs( j.param_is_defined('d'), False, '"d" is not defined (it doesn\'t exist)' )
        with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
            j.param_is_defined('e')

        # param
        self.assertIs( j.param('a'), 3, '"a" is 3' )
        self.assertIs( j.param('b'), None, '"b" is None' )
        with self.assertWarns(eHive.Params.ParamWarning):
            self.assertIs( j.param('c'), None, '"c"\'s value is unclear' )
        with self.assertWarns(eHive.Params.ParamWarning):
            self.assertIs( j.param('d'), None, '"d" is not defined (it doesn\'t exist)' )
        with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
            j.param('e')

        # param_required
        self.assertIs( j.param_required('a'), 3, '"a" is 3' )
        with self.assertRaises(eHive.Params.NullParamException):
            j.param_required('b')
        with self.assertRaises(KeyError):
            j.param_required('c')
        with self.assertRaises(KeyError):
            j.param_required('d')
        with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
            j.param_required('e')