Commit 33812c18 authored by David Mendez's avatar David Mendez
Browse files

Merge branch 'staging' into 'master'

Implement Mr meeseeks pattern for checking job status

See merge request !74
parents eb9e7651 2bf11658
......@@ -4,8 +4,6 @@ variables:
REGISTRY_USER: chembl/chembl
SERVER_APPLICATION: delayed-jobs/delayed-jobs-api/delayed-jobs-server
SERVER_IMAGE_TAG: ${CI_REGISTRY}/${REGISTRY_USER}/${SERVER_APPLICATION}:${CI_COMMIT_SHORT_SHA}
JOB_STATUS_APPLICATION: delayed-jobs/delayed-jobs-api/delayed-jobs-status-daemon
STATUS_DAEMON_IMAGE_TAG: ${CI_REGISTRY}/${REGISTRY_USER}/${JOB_STATUS_APPLICATION}:${CI_COMMIT_SHORT_SHA}
DOCKER_DRIVER: overlay
CONFIGS_FOLDER: run_config
# Config unit tests
......@@ -65,29 +63,6 @@ build_docker_image_server:
- echo Deploying $SERVER_IMAGE_TAG
- docker push $SERVER_IMAGE_TAG
build_docker_image_status_daemon:
image:
name: docker/compose:1.24.1
entrypoint: ["/bin/sh", "-c"]
stage: build
variables:
DOCKER_HOST: tcp://docker:2375
except:
- schedules
services:
- docker:dind
tags:
- dind
script:
- set -x
- docker version # verify docker cli is there. Also prints server info
- echo $CI_REGISTRY_PASSWORD | docker login -u $CI_REGISTRY_USER --password-stdin $CI_REGISTRY
- echo $DOCKERHUB_REGISTRY_PASSWORD | docker login -u $DOCKERHUB_REGISTRY_USER --password-stdin $DOCKERHUB_REGISTRY
- echo building $STATUS_DAEMON_IMAGE_TAG
- docker-compose build delayed-job-status-daemon-prod
- echo Deploying $STATUS_DAEMON_IMAGE_TAG
- docker push $STATUS_DAEMON_IMAGE_TAG
# ----------------------------------------------------------------------------------------------------------------------
# QA
# ----------------------------------------------------------------------------------------------------------------------
......@@ -190,7 +165,6 @@ deploy_to_staging:
- sed -i "s~<UID>~${CBL_ADM_UID}~" k8s-deployment.yaml
- sed -i "s~<GID>~${CBL_PUB_GID}~" k8s-deployment.yaml
- sed -i "s~<SERVER_IMAGE_TAG>~${SERVER_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<STATUS_DAEMON_IMAGE_TAG>~${STATUS_DAEMON_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<NFS_MOUNT_PATH>~${NFS_MOUNT_PATH_STAGING}~" k8s-deployment.yaml
- sed -i "s~<NFS_FQDN>~${NFS_FQDN_HX}~" k8s-deployment.yaml
- sed -i "s~<NFS_PATH>~${NFS_PATH_HX}~" k8s-deployment.yaml
......@@ -277,7 +251,6 @@ deploy_to_prod_hx:
- sed -i "s~<UID>~${CBL_ADM_UID}~" k8s-deployment.yaml
- sed -i "s~<GID>~${CBL_PUB_GID}~" k8s-deployment.yaml
- sed -i "s~<SERVER_IMAGE_TAG>~${SERVER_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<STATUS_DAEMON_IMAGE_TAG>~${STATUS_DAEMON_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<NFS_MOUNT_PATH>~${NFS_MOUNT_PATH_PROD_HX}~" k8s-deployment.yaml
- sed -i "s~<NFS_FQDN>~${NFS_FQDN_HX}~" k8s-deployment.yaml
- sed -i "s~<NFS_PATH>~${NFS_PATH_HX}~" k8s-deployment.yaml
......@@ -359,7 +332,6 @@ deploy_to_prod_hh:
- sed -i "s~<UID>~${CBL_ADM_UID}~" k8s-deployment.yaml
- sed -i "s~<GID>~${CBL_PUB_GID}~" k8s-deployment.yaml
- sed -i "s~<SERVER_IMAGE_TAG>~${SERVER_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<STATUS_DAEMON_IMAGE_TAG>~${STATUS_DAEMON_IMAGE_TAG}~" k8s-deployment.yaml
- sed -i "s~<NFS_MOUNT_PATH>~${NFS_MOUNT_PATH_PROD_HH}~" k8s-deployment.yaml
- sed -i "s~<NFS_FQDN>~${NFS_FQDN_HH}~" k8s-deployment.yaml
- sed -i "s~<NFS_PATH>~${NFS_PATH_HH}~" k8s-deployment.yaml
......
......@@ -30,7 +30,4 @@ ENTRYPOINT FLASK_APP=app flask run --host=0.0.0.0
FROM base AS production-server
# Take into account that the app will get the configuration from the variable DELAYED_JOBS_RAW_CONFIG if the config.yml
# file is not found.
ENTRYPOINT gunicorn wsgi:FLASK_APP -c ${GUNICORN_CONFIG_FILE_PATH}
FROM base AS job-status-daemon
ENTRYPOINT /app/run_daemon.sh
\ No newline at end of file
ENTRYPOINT gunicorn wsgi:FLASK_APP -c ${GUNICORN_CONFIG_FILE_PATH}
\ No newline at end of file
......@@ -18,6 +18,7 @@ from app.models import delayed_job_models
from app.cache import CACHE
from app.rate_limiter import RATE_LIMITER
def create_app():
"""
Creates the flask app
......@@ -50,7 +51,9 @@ def create_app():
create_tables = RUN_CONFIG.get('sql_alchemy').get('create_tables', False)
if create_tables:
print('CREATING TABLES!')
DB.create_all()
print('TABLES CREATED!')
generate_default_config = RUN_CONFIG.get('generate_default_config', False)
if generate_default_config:
......@@ -65,5 +68,6 @@ def create_app():
return flask_app
if __name__ == '__main__':
flask_app = create_app()
......@@ -2,10 +2,13 @@
Module that provides a service to get or modify the status or jobs
"""
from app.models import delayed_job_models
from app.job_status_daemon import meeseeks_launcher
class JobNotFoundError(Exception):
"""Base class for exceptions."""
class InputFileNotFoundError(Exception):
"""Base class for exceptions."""
......@@ -21,10 +24,12 @@ def get_job_status(job_id, server_base_url='http://0.0.0.0:5000'):
try:
job = delayed_job_models.get_job_by_id(job_id, force_refresh=True)
meeseeks_launcher.summon_new_meeseeks_if_previous_died_prematurely(job)
return job.public_dict(server_base_url)
except delayed_job_models.JobNotFoundError:
raise JobNotFoundError()
def get_input_file_path(job_id, input_key):
"""
:param job_id: the id of the job for which the status is required
......@@ -51,6 +56,7 @@ def update_job_progress(job_id, progress, status_log, status_description):
try:
delayed_job_models.update_job_progress(job_id, progress, status_log, status_description)
job = delayed_job_models.get_job_by_id(job_id)
meeseeks_launcher.summon_new_meeseeks_if_previous_died_prematurely(job)
return job.public_dict()
except delayed_job_models.JobNotFoundError:
raise JobNotFoundError()
......@@ -11,6 +11,8 @@ import subprocess
from pathlib import Path
import re
import os.path
import shlex
from datetime import datetime
import yaml
......@@ -21,6 +23,7 @@ from app.config import RUN_CONFIG
from app.models import delayed_job_models
from app import utils
from app.job_statistics import statistics_saver
from app.job_status_daemon import meeseeks_launcher
JOBS_RUN_DIR = RUN_CONFIG.get('jobs_run_dir', str(Path().absolute()) + '/jobs_run')
if not os.path.isabs(JOBS_RUN_DIR):
......@@ -560,6 +563,8 @@ def submit_job_to_lsf(job):
delayed_job_models.save_job(job)
app_logging.debug(f'LSF Job ID is: {lsf_job_id}')
meeseeks_launcher.summon_meeseeks_for_job(job.id)
def get_lsf_job_id(submission_out):
"""
......
......@@ -86,13 +86,14 @@ if not RUN_CONFIG.get('base_path'):
if not RUN_CONFIG.get('outputs_base_path'):
RUN_CONFIG['outputs_base_path'] = 'outputs'
STATUS_AGENT_CONFIG = RUN_CONFIG.get('status_agent')
if STATUS_AGENT_CONFIG is None:
RUN_CONFIG['status_agent'] = {
'lock_validity_seconds': 1,
'min_sleep_time': 1,
'max_sleep_time': 2
}
STATUS_AGENT_CONFIG = RUN_CONFIG.get('status_agent', {})
RUN_CONFIG['status_agent'] = {
'lock_validity_seconds': 1,
'sleep_time': 1,
'death_assumption_seconds': 10,
**STATUS_AGENT_CONFIG,
}
CACHE_CONFIG = RUN_CONFIG.get('cache_config')
if CACHE_CONFIG is None:
......
......@@ -6,16 +6,19 @@ from datetime import datetime, timedelta
from app.config import RUN_CONFIG
from app.job_statistics import statistics_saver
from app import utils
class TestStatisticsSaver(unittest.TestCase):
"""
Class to test the statistics saver
"""
def test_generates_the_correct_json_for_a_job_record(self):
"""
Tests that it generates the correct json for a job record to be sent to elasticsearch
"""
current_time = datetime.now()
current_time = utils.get_utc_now()
dict_must_be = {
'job_type': 'TEST',
......@@ -49,7 +52,7 @@ class TestStatisticsSaver(unittest.TestCase):
)
record_date_field_name = RUN_CONFIG.get('job_statistics').get('date_field_name')
del job_record_dict_got[record_date_field_name] # to avoid complications with the current time calculation
del job_record_dict_got[record_date_field_name] # to avoid complications with the current time calculation
self.assertEqual(job_record_dict_got, dict_must_be, 'The job record dict was not generated correctly')
def test_generates_the_correct_json_for_a_job_cache_record(self):
......@@ -61,7 +64,7 @@ class TestStatisticsSaver(unittest.TestCase):
'job_type': 'TEST',
'run_env_type': 'DEV',
'was_cached': True,
'request_date': datetime.now().timestamp() * 1000
'request_date': utils.get_utc_now().timestamp() * 1000
}
job_cache_record_dict_got = statistics_saver.get_job_cache_record_dict(
......
......@@ -9,7 +9,6 @@ import stat
import subprocess
import re
import json
import random
import time
import shutil
......@@ -19,7 +18,7 @@ from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
from app.job_statistics import statistics_saver
from app.job_status_daemon.job_statistics import statistics_generator
import app.app_logging as app_logging
from app import utils
AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run')
if not os.path.isabs(AGENT_RUN_DIR):
......@@ -36,78 +35,64 @@ print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}')
print('------------------------------------------------------------------------------')
def check_jobs_status(delete_lock_after_finishing=True):
def check_job_status(job_id, delete_lock_after_finishing=True):
"""
The main function of this module. Checks for jobs to check the status, and checks their status in lsf
:param job_id: job id to check
:param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing
:return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs
were checked or not
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
current_lsf_host = lsf_config['lsf_host']
my_hostname = socket.gethostname()
sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time')
min_sleep_time = RUN_CONFIG.get('status_agent').get('min_sleep_time')
max_sleep_time = RUN_CONFIG.get('status_agent').get('max_sleep_time')
sleep_time = random.uniform(min_sleep_time, max_sleep_time)
existing_lock = locks.get_lock_for_lsf_host(current_lsf_host)
existing_lock = locks.get_lock_for_job(job_id)
if existing_lock is not None:
if existing_lock != my_hostname:
check_msg = f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
return sleep_time, False
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time, False
else:
print(f'Locking LSF status check for {current_lsf_host}, I am {my_hostname}')
locks.set_lsf_lock(current_lsf_host, my_hostname)
check_msg = f'Locking LSF status check for job {job_id}, I am {my_hostname}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
print('Looking for jobs to check...')
lsf_job_ids_to_check = get_lsf_job_ids_to_check()
print(f'lsf_job_ids_to_check: {lsf_job_ids_to_check}')
locks.set_lock_for_job(job_id, my_hostname)
if len(lsf_job_ids_to_check) == 0:
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
return sleep_time, True
lsf_job_id = delayed_job_models.get_job_by_id(job_id).lsf_job_id
check_msg = f'lsf_job_id: {lsf_job_id}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
script_path = prepare_job_status_check_script(lsf_job_ids_to_check)
script_path = prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id)
must_run_script = RUN_CONFIG.get('run_status_script', True)
if not must_run_script:
print('Not running script because run_status_script is False')
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
check_msg = 'Not running script because run_status_script is False'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, False
try:
script_output = get_status_script_output(script_path)
script_output = get_status_script_output(job_id, my_hostname, script_path)
os.remove(script_path) # Remove the script after running so it doesn't fill up the NFS
print(f'deleted script: {script_path}')
check_msg = f'deleted script: {script_path}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
parse_bjobs_output(script_output)
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, True
except JobStatusDaemonError as error:
print(error)
check_msg = f'ERROR: \n {error}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
def get_lsf_job_ids_to_check():
"""
:return: a list of LSF job IDs for which it is necessary check the status in the LSF cluster. The jobs that are
checked are the ones that:
1. Were submitted to the same LSF cluster that I am running with (defined in configuration)
2. Are not in Error or Finished state.
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
return delayed_job_models.get_lsf_job_ids_to_check(lsf_host)
def get_check_job_status_script_path():
def get_check_job_status_script_path(job_id, lsf_job_id):
"""
:param job_id: job id for which to get the script path
:param lsf_job_id: lsf job id to check
:return: the path to use for creating the job status script
"""
filename = f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_check_lsf_job_status.sh'
filename = f'{utils.get_utc_now().strftime("%Y-%m-%d-%H-%M-%S")}_job_id-{job_id}_lsf_job_id-{lsf_job_id}' \
f'_check_lsf_job_status.sh'
job_status_check_script_path = Path(AGENT_RUN_DIR).joinpath(socket.gethostname(), filename)
return job_status_check_script_path
......@@ -116,10 +101,11 @@ def get_check_job_status_script_path():
# ----------------------------------------------------------------------------------------------------------------------
# Preparing status script
# ----------------------------------------------------------------------------------------------------------------------
def prepare_job_status_check_script(lsf_job_ids):
def prepare_job_status_check_script(job_id, lsf_job_id):
"""
Prepares the script that will check for the job status to LSF
:lsf_job_ids: the list of job ids for which check the status
:param job_id: the job id being checked
:param lsf_job_id: the lsf job if to check
:return: the final path of the script that was created
"""
......@@ -132,12 +118,12 @@ def prepare_job_status_check_script(lsf_job_ids):
lsf_host = lsf_config['lsf_host']
job_submission_script = job_status_template.format(
LSF_JOB_IDS=' '.join([str(lsf_job_id) for lsf_job_id in lsf_job_ids]),
LSF_JOB_IDS=lsf_job_id,
LSF_USER=lsf_user,
LSF_HOST=lsf_host
)
status_script_path = get_check_job_status_script_path()
status_script_path = get_check_job_status_script_path(job_id=job_id, lsf_job_id=lsf_job_id)
status_script_path.parent.mkdir(parents=True, exist_ok=True)
with open(status_script_path, 'w') as status_script_file:
status_script_file.write(job_submission_script)
......@@ -153,23 +139,32 @@ def prepare_job_status_check_script(lsf_job_ids):
# ----------------------------------------------------------------------------------------------------------------------
# Parsing status script output
# ----------------------------------------------------------------------------------------------------------------------
def get_status_script_output(script_path):
def get_status_script_output(job_id, checker_name, script_path):
"""
Runs the status script and returns a text with the output obtained, if there is an error raises an exception
:param job_id: id of the job, used to save the outputs to the check log
:param checker_name: name of the process that checks the job in lsf
:param script_path: path of the script
:return: the text output of stdout
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
id_rsa_path = lsf_config['id_rsa_file']
run_command = f'{script_path} {id_rsa_path}'
print(f'Going to run job status script, command: {run_command}')
check_msg = f'Going to run job status script, command: {run_command}'
delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
status_check_process = subprocess.run(run_command.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f'Output: \n {status_check_process.stdout}')
print(f'Error: \n {status_check_process.stderr}')
check_msg = f'Output: \n {status_check_process.stdout}'
delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
check_msg = f'Error: \n {status_check_process.stderr}'
delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
return_code = status_check_process.returncode
print(f'script return code was: {return_code}')
check_msg = f'script return code was: {return_code}'
delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
delayed_job_models.set_last_lsf_check_status(job_id, return_code)
delayed_job_models.set_last_lsf_checked_at(job_id)
if return_code not in [0, 255]:
......@@ -182,6 +177,9 @@ def get_status_script_output(script_path):
with open(status_error_path, 'wb') as status_err_file:
status_err_file.write(status_check_process.stderr)
check_msg = f'There was an error when running the job status script! Check the directory {script_path} for logs!'
delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
raise JobStatusDaemonError('There was an error when running the job status script! Please check the logs')
else:
return status_check_process.stdout.decode()
......
"""
Module that handles the locking system for the status daemons
"""
from app.config import RUN_CONFIG
from app import cache
from app.models import delayed_job_models
def get_lock_for_lsf_host(lsf_host):
def get_lock_for_job(job_id):
"""
Returns a lock for a lsf host if it exists
:param lsf_host: lsf host for which get the lock
:return: dict with the lock for the lsf_host given as parameter, None if it doesn't exist
Returns a lock for the job if it exists
:param job_id: id of the job for which to get the lock
"""
return cache.fail_proof_get(key=lsf_host)
job = delayed_job_models.get_job_by_id(job_id)
return job.lsf_check_lock_owner
def set_lsf_lock(lsf_host, lock_owner):
def set_lock_for_job(job_id, lock_owner):
"""
Creates a lock on the lsf_host given as parameter in the name of the owner given as parameter, it will expire in the
time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param lsf_host: cluster to lock
:param lock_owner: identifier (normally a hostname) of the process that owns the lock
Creates a lock on the job with job_id given as parameter in the name of the owner given as parameter, it will expire
in the time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param job_id: id of the job
:param lock_owner: owner of the lock, typically the hostname of the pod
"""
job = delayed_job_models.get_job_by_id(job_id)
job.lsf_check_lock_owner = lock_owner
delayed_job_models.save_job(job)
seconds_valid = RUN_CONFIG.get('status_agent').get('lock_validity_seconds')
lock_dict = {
'owner': lock_owner
}
cache.fail_proof_set(key=lsf_host, value=lock_dict, timeout=seconds_valid)
def delete_lsf_lock(lsf_host):
def delete_lock_for_job(job_id):
"""
Deletes the lock for the lsf host passed as parameter
:param lsf_host: lsf host for which to delete the lock
Deletes the lock for the job with id passed as parameter
:param job_id: id of the job
"""
cache.delete(key=lsf_host)
job = delayed_job_models.get_job_by_id(job_id)
job.lsf_check_lock_owner = None
delayed_job_models.save_job(job)
"""
Module that launches the status daemon in a Mr Meeseeks way
"""
import os
import shlex
import subprocess
import app.app_logging as app_logging
from app.models import delayed_job_models
from app import utils
def summon_meeseeks_for_job(job_id):
"""
starts the job checking process. It will disappear once the job is finished or errored.
:param job_id: id of the job to monitor
"""
app_logging.debug('Going to launch Mr Meeseeks to monitor the job status')
meeseeks_command = f'/app/run_daemon.sh {job_id}'
app_logging.debug(f'Mr Meeseeks command is: {meeseeks_command}')
checker_env = {
'CONFIG_FILE_PATH': os.getenv('CONFIG_FILE_PATH')
}
app_logging.debug(f'Mr Meeseeks env is: {checker_env}')
meeseeks_process = subprocess.Popen(shlex.split(meeseeks_command), env=checker_env)
app_logging.debug(f'Mr Meeseeks summoned! PID is {meeseeks_process.pid}')
def summon_new_meeseeks_if_previous_died_prematurely(job):
"""
If if is detected that the previous job checker died prematurely, a new one is summoned
:param job: job to monitor
"""
if job.needs_to_be_checked_in_lsf() and job.job_checker_seems_to_have_died():
msg = f'The job {job.id} needs to be checked in LSF and the previous checker seems to have died '
f'previously. I will summon another Meeseeks.'
app_logging.debug(msg)
delayed_job_models.append_to_lsf_check_log(job.id, msg)
summon_meeseeks_for_job(job.id)
# I update the last checked at to avoid many summonings at the same time.
job.last_lsf_checked_at = utils.get_utc_now()
delayed_job_models.save_job(job)
......@@ -3,17 +3,27 @@
Script that runs the daemon that checks for the job status
"""
import time
import argparse
from app.job_status_daemon import daemon
from app import create_app
from app.models import delayed_job_models
PARSER = argparse.ArgumentParser()
PARSER.add_argument('job_id', help='id of the job to check')
ARGS = PARSER.parse_args()
def run():
def run():
flask_app = create_app()
job_id = ARGS.job_id
with flask_app.app_context():
while True:
sleep_time, jobs_were_checked = daemon.check_jobs_status()
job = delayed_job_models.get_job_by_id(job_id, force_refresh=True)
while job.needs_to_be_checked_in_lsf():
sleep_time, jobs_were_checked = daemon.check_job_status(job_id)
time.sleep(sleep_time)
job = delayed_job_models.get_job_by_id(job_id, force_refresh=True)
if __name__ == "__main__":
run()
\ No newline at end of file
run()
......@@ -12,6 +12,7 @@ from app import create_app
from app.models import delayed_job_models
from app.job_status_daemon import daemon
from app.job_status_daemon.job_statistics import statistics_generator
from app import utils
class TestJobStatisticsGeneration(unittest.TestCase):
"""
......@@ -35,7 +36,7 @@ class TestJobStatisticsGeneration(unittest.TestCase):
test that it calculates the time from created to running
"""
current_time = datetime.now()
current_time = utils.get_utc_now()
created_at = current_time
seconds_must_be = 1.5
started_at = created_at + timedelta(seconds=seconds_must_be)
......@@ -56,7 +57,7 @@ class TestJobStatisticsGeneration(unittest.TestCase):
test that it calculates the time from running to finished
"""
current_time = datetime.now()
current_time = utils.get_utc_now()
started_at = current_time
seconds_must_be = 1.5
finished_at = started_at + timedelta(seconds=seconds_must_be)
......
......@@ -9,14 +9,13 @@ from os import path
import shutil
import os
from sqlalchemy import and_
from app import create_app
from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.job_status_daemon import daemon
from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
from app import utils
class TestJobStatusDaemon(unittest.TestCase):
......@@ -146,84 +145,19 @@ class TestJobStatusDaemon(unittest.TestCase):
delayed_job_models.save_job(job)
i += 1
def test_determines_for_which_jobs_check_status_0(self):
"""
Given a set of jobs currently in the database, knows for which it is required to check the status.
In this case, some jobs require a check.
"""
self.create_test_jobs_0()