Commit 44bb0662 authored by David Mendez's avatar David Mendez
Browse files

When someone checks or update job status, summon a new meeseeks if the previous one fails

parent b292f185
...@@ -2,10 +2,13 @@ ...@@ -2,10 +2,13 @@
Module that provides a service to get or modify the status or jobs Module that provides a service to get or modify the status or jobs
""" """
from app.models import delayed_job_models from app.models import delayed_job_models
from app.job_status_daemon import meeseeks_launcher
class JobNotFoundError(Exception): class JobNotFoundError(Exception):
"""Base class for exceptions.""" """Base class for exceptions."""
class InputFileNotFoundError(Exception): class InputFileNotFoundError(Exception):
"""Base class for exceptions.""" """Base class for exceptions."""
...@@ -21,10 +24,12 @@ def get_job_status(job_id, server_base_url='http://0.0.0.0:5000'): ...@@ -21,10 +24,12 @@ def get_job_status(job_id, server_base_url='http://0.0.0.0:5000'):
try: try:
job = delayed_job_models.get_job_by_id(job_id, force_refresh=True) job = delayed_job_models.get_job_by_id(job_id, force_refresh=True)
meeseeks_launcher.summon_new_meeseeks_if_previous_died_prematurely(job)
return job.public_dict(server_base_url) return job.public_dict(server_base_url)
except delayed_job_models.JobNotFoundError: except delayed_job_models.JobNotFoundError:
raise JobNotFoundError() raise JobNotFoundError()
def get_input_file_path(job_id, input_key): def get_input_file_path(job_id, input_key):
""" """
:param job_id: the id of the job for which the status is required :param job_id: the id of the job for which the status is required
...@@ -51,6 +56,7 @@ def update_job_progress(job_id, progress, status_log, status_description): ...@@ -51,6 +56,7 @@ def update_job_progress(job_id, progress, status_log, status_description):
try: try:
delayed_job_models.update_job_progress(job_id, progress, status_log, status_description) delayed_job_models.update_job_progress(job_id, progress, status_log, status_description)
job = delayed_job_models.get_job_by_id(job_id) job = delayed_job_models.get_job_by_id(job_id)
meeseeks_launcher.summon_new_meeseeks_if_previous_died_prematurely(job)
return job.public_dict() return job.public_dict()
except delayed_job_models.JobNotFoundError: except delayed_job_models.JobNotFoundError:
raise JobNotFoundError() raise JobNotFoundError()
...@@ -6,6 +6,8 @@ import shlex ...@@ -6,6 +6,8 @@ import shlex
import subprocess import subprocess
import app.app_logging as app_logging import app.app_logging as app_logging
from app.models import delayed_job_models
from app import utils
def summon_meeseeks_for_job(job_id): def summon_meeseeks_for_job(job_id):
...@@ -23,3 +25,19 @@ def summon_meeseeks_for_job(job_id): ...@@ -23,3 +25,19 @@ def summon_meeseeks_for_job(job_id):
app_logging.debug(f'Mr Meeseeks env is: {checker_env}') app_logging.debug(f'Mr Meeseeks env is: {checker_env}')
meeseeks_process = subprocess.Popen(shlex.split(meeseeks_command), env=checker_env) meeseeks_process = subprocess.Popen(shlex.split(meeseeks_command), env=checker_env)
app_logging.debug(f'Mr Meeseeks summoned! PID is {meeseeks_process.pid}') app_logging.debug(f'Mr Meeseeks summoned! PID is {meeseeks_process.pid}')
def summon_new_meeseeks_if_previous_died_prematurely(job):
"""
If if is detected that the previous job checker died prematurely, a new one is summoned
:param job: job to monitor
"""
if job.needs_to_be_checked_in_lsf() and job.job_checker_seems_to_have_died():
msg = f'The job {job.id} needs to be checked in LSF and the previous checker seems to have died '
f'previously. I will summon another Meeseeks.'
app_logging.debug(msg)
delayed_job_models.append_to_lsf_check_log(job.id, msg)
summon_meeseeks_for_job(job.id)
# I update the last checked at to avoid many summonings at the same time.
job.last_lsf_checked_at = utils.get_utc_now()
delayed_job_models.save_job(job)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment