Commit 85efdf90 authored by David Mendez's avatar David Mendez
Browse files

Status Daemon: save lock in database for every job instead of in the cache

parent 7b387c4c
......@@ -86,13 +86,14 @@ if not RUN_CONFIG.get('base_path'):
if not RUN_CONFIG.get('outputs_base_path'):
RUN_CONFIG['outputs_base_path'] = 'outputs'
STATUS_AGENT_CONFIG = RUN_CONFIG.get('status_agent')
if STATUS_AGENT_CONFIG is None:
RUN_CONFIG['status_agent'] = {
'lock_validity_seconds': 1,
'min_sleep_time': 1,
'max_sleep_time': 2
}
STATUS_AGENT_CONFIG = RUN_CONFIG.get('status_agent', {})
RUN_CONFIG['status_agent'] = {
'lock_validity_seconds': 1,
'sleep_time': 1,
'dead_assumption_seconds': 10,
**STATUS_AGENT_CONFIG,
}
CACHE_CONFIG = RUN_CONFIG.get('cache_config')
if CACHE_CONFIG is None:
......
......@@ -9,7 +9,6 @@ import stat
import subprocess
import re
import json
import random
import time
import shutil
......@@ -35,49 +34,36 @@ print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}')
print('------------------------------------------------------------------------------')
def check_jobs_status(delete_lock_after_finishing=True, only_job_id=None):
def check_jobs_status(job_id, delete_lock_after_finishing=True):
"""
The main function of this module. Checks for jobs to check the status, and checks their status in lsf
:param job_id: job id to check
:param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing
:param only_job_id: used to specify a job id to check, not the ones that are available for checkin
:return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs
were checked or not
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
current_lsf_host = lsf_config['lsf_host']
my_hostname = socket.gethostname()
sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time')
min_sleep_time = RUN_CONFIG.get('status_agent').get('min_sleep_time')
max_sleep_time = RUN_CONFIG.get('status_agent').get('max_sleep_time')
sleep_time = random.uniform(min_sleep_time, max_sleep_time)
existing_lock = locks.get_lock_for_lsf_host(current_lsf_host)
existing_lock = locks.get_lock_for_job(job_id)
print('existing_lock: ')
print(existing_lock)
if existing_lock is not None:
if existing_lock != my_hostname:
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time, False
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time, False
print(f'Locking LSF status check for job {job_id}, I am {my_hostname}')
locks.set_lock_for_job(job_id, my_hostname)
else:
print(f'Locking LSF status check for {current_lsf_host}, I am {my_hostname}')
locks.set_lsf_lock(current_lsf_host, my_hostname)
print('Looking for jobs to check...')
if only_job_id is None:
lsf_job_ids_to_check = get_lsf_job_ids_to_check()
else:
lsf_job_ids_to_check = [only_job_id]
print(f'lsf_job_ids_to_check: {lsf_job_ids_to_check}')
if len(lsf_job_ids_to_check) == 0:
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
return sleep_time, True
lsf_job_id = delayed_job_models.get_job_by_id(job_id).lsf_job_id
print(f'lsf_job_id: {lsf_job_id}')
script_path = prepare_job_status_check_script(lsf_job_ids_to_check)
script_path = prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id)
must_run_script = RUN_CONFIG.get('run_status_script', True)
if not must_run_script:
print('Not running script because run_status_script is False')
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, False
try:
......@@ -85,18 +71,21 @@ def check_jobs_status(delete_lock_after_finishing=True, only_job_id=None):
os.remove(script_path) # Remove the script after running so it doesn't fill up the NFS
print(f'deleted script: {script_path}')
parse_bjobs_output(script_output)
locks.delete_lsf_lock(current_lsf_host) if delete_lock_after_finishing else None
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, True
except JobStatusDaemonError as error:
print(error)
def get_check_job_status_script_path():
def get_check_job_status_script_path(job_id, lsf_job_id):
"""
:param job_id: job id for which to get the script path
:param lsf_job_id: lsf job id to check
:return: the path to use for creating the job status script
"""
filename = f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_check_lsf_job_status.sh'
filename = f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_job_id-{job_id}_lsf_job_id-{lsf_job_id}' \
f'_check_lsf_job_status.sh'
job_status_check_script_path = Path(AGENT_RUN_DIR).joinpath(socket.gethostname(), filename)
return job_status_check_script_path
......@@ -105,10 +94,11 @@ def get_check_job_status_script_path():
# ----------------------------------------------------------------------------------------------------------------------
# Preparing status script
# ----------------------------------------------------------------------------------------------------------------------
def prepare_job_status_check_script(lsf_job_ids):
def prepare_job_status_check_script(job_id, lsf_job_id):
"""
Prepares the script that will check for the job status to LSF
:lsf_job_ids: the list of job ids for which check the status
:param job_id: the job id being checked
:param lsf_job_id: the lsf job if to check
:return: the final path of the script that was created
"""
......@@ -121,12 +111,12 @@ def prepare_job_status_check_script(lsf_job_ids):
lsf_host = lsf_config['lsf_host']
job_submission_script = job_status_template.format(
LSF_JOB_IDS=' '.join([str(lsf_job_id) for lsf_job_id in lsf_job_ids]),
LSF_JOB_IDS=lsf_job_id,
LSF_USER=lsf_user,
LSF_HOST=lsf_host
)
status_script_path = get_check_job_status_script_path()
status_script_path = get_check_job_status_script_path(job_id=job_id, lsf_job_id=lsf_job_id)
status_script_path.parent.mkdir(parents=True, exist_ok=True)
with open(status_script_path, 'w') as status_script_file:
status_script_file.write(job_submission_script)
......
"""
Module that handles the locking system for the status daemons
"""
from app.config import RUN_CONFIG
from app import cache
from app.models import delayed_job_models
def get_lock_for_lsf_host(lsf_host):
def get_lock_for_job(job_id):
"""
Returns a lock for a lsf host if it exists
:param lsf_host: lsf host for which get the lock
:return: dict with the lock for the lsf_host given as parameter, None if it doesn't exist
Returns a lock for the job if it exists
:param job_id: id of the job for which to get the lock
"""
return cache.fail_proof_get(key=lsf_host)
job = delayed_job_models.get_job_by_id(job_id)
return job.lsf_check_lock_owner
# def get_lock_for_lsf_host(lsf_host):
# """
# Returns a lock for a lsf host if it exists
# :param lsf_host: lsf host for which get the lock
# :return: dict with the lock for the lsf_host given as parameter, None if it doesn't exist
# """
# return cache.fail_proof_get(key=lsf_host)
def set_lsf_lock(lsf_host, lock_owner):
def set_lock_for_job(job_id, lock_owner):
"""
Creates a lock on the lsf_host given as parameter in the name of the owner given as parameter, it will expire in the
time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param lsf_host: cluster to lock
:param lock_owner: identifier (normally a hostname) of the process that owns the lock
Creates a lock on the job with job_id given as parameter in the name of the owner given as parameter, it will expire
in the time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param job_id: id of the job
:param lock_owner: owner of the lock, typically the hostname of the pod
"""
seconds_valid = RUN_CONFIG.get('status_agent').get('lock_validity_seconds')
lock_dict = {
'owner': lock_owner
}
cache.fail_proof_set(key=lsf_host, value=lock_dict, timeout=seconds_valid)
job = delayed_job_models.get_job_by_id(job_id)
job.lsf_check_lock_owner = lock_owner
delayed_job_models.save_job(job)
def delete_lsf_lock(lsf_host):
def delete_lock_for_job(job_id):
"""
Deletes the lock for the lsf host passed as parameter
:param lsf_host: lsf host for which to delete the lock
Deletes the lock for the job with id passed as parameter
:param job_id: id of the job
"""
cache.delete(key=lsf_host)
job = delayed_job_models.get_job_by_id(job_id)
job.lsf_check_lock_owner = None
delayed_job_models.save_job(job)
......@@ -146,16 +146,19 @@ class TestJobStatusDaemon(unittest.TestCase):
delayed_job_models.save_job(job)
i += 1
# TODO: CONTINUE HERE
def test_produces_a_correct_job_status_check_script_path(self):
"""
Test that produces a correct path for the job status script
"""
filename = f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_check_lsf_job_status.sh'
job_id = 'job_1'
lsf_job_id = 1
filename = f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_job_id-{job_id}_lsf_job_id-{lsf_job_id}' \
f'_check_lsf_job_status.sh'
job_status_check_script_path_must_be = Path(daemon.AGENT_RUN_DIR).joinpath(socket.gethostname(), filename)
print('job_status_check_script_path_must_be: ', job_status_check_script_path_must_be)
job_status_check_script_path_got = daemon.get_check_job_status_script_path()
job_status_check_script_path_got = daemon.get_check_job_status_script_path(job_id=job_id, lsf_job_id=lsf_job_id)
# remove the last character (the second) to avoid annoying false negatives
self.assertEqual(str(job_status_check_script_path_must_be)[:-1], str(job_status_check_script_path_got)[:-1],
......@@ -165,11 +168,11 @@ class TestJobStatusDaemon(unittest.TestCase):
"""
Test that the job status script is created and can be executed.
"""
self.create_test_jobs_0()
job_id = 'job_1'
lsf_job_id = 1
with self.flask_app.app_context():
lsf_ids_to_check = daemon.get_lsf_job_ids_to_check()
script_path_got = daemon.prepare_job_status_check_script(lsf_ids_to_check)
script_path_got = daemon.prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id)
self.assertTrue(path.isfile(script_path_got), msg='The job status check script has not been created!')
self.assertTrue(os.access(script_path_got, os.X_OK),
......@@ -365,46 +368,55 @@ class TestJobStatusDaemon(unittest.TestCase):
"""
Tests that the daemon creates a lock while checking LSF
"""
self.create_test_jobs_0()
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
my_hostname = socket.gethostname()
job_id_to_check = f'Job-{lsf_host}-{delayed_job_models.JobStatuses.QUEUED}'
with self.flask_app.app_context():
daemon.check_jobs_status(delete_lock_after_finishing=False)
daemon.check_jobs_status(job_id=job_id_to_check, delete_lock_after_finishing=False)
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
lock_got = locks.get_lock_for_lsf_host(current_lsf_host)
self.assertIsNotNone(lock_got, msg='The LSF lock was not created!')
lock_hostname_got = lock_got.get('owner')
lock_hostname_got = locks.get_lock_for_job(job_id_to_check)
self.assertIsNotNone(lock_hostname_got, msg='The lock was not created!')
lock_hostname_must_be = socket.gethostname()
self.assertEqual(lock_hostname_got, lock_hostname_must_be, msg='The lock was not saved correctly!')
locks.delete_lsf_lock(current_lsf_host)
locks.delete_lock_for_job(job_id_to_check)
def test_agent_respects_a_lock(self):
"""
Tests that when a lock has been created for another host, the agent respects it. This means that
the agent does not check anything in lsf
"""
self.create_test_jobs_0()
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
job_id_to_check = f'Job-{lsf_host}-{delayed_job_models.JobStatuses.QUEUED}'
with self.flask_app.app_context():
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
locks.set_lsf_lock(current_lsf_host, 'another_owner')
locks.set_lock_for_job(job_id_to_check, 'another_owner')
sleep_time_got, jobs_were_checked = daemon.check_jobs_status()
sleep_time_got, jobs_were_checked = daemon.check_jobs_status(job_id_to_check)
self.assertFalse(jobs_were_checked, msg='The jobs should have not been checked')
min_sleep_time = RUN_CONFIG.get('status_agent').get('min_sleep_time')
max_sleep_time = RUN_CONFIG.get('status_agent').get('max_sleep_time')
self.assertTrue(min_sleep_time <= sleep_time_got <= max_sleep_time,
sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time')
self.assertTrue(sleep_time_got == sleep_time,
msg='The sleep time was not calculated correctly!')
locks.delete_lsf_lock(current_lsf_host)
locks.delete_lock_for_job(job_id_to_check)
def test_deletes_lock_after_finishing(self):
"""
Tests that it requests the deletion of the lock after checking the jobs
"""
self.create_test_jobs_0()
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
job_id_to_check = f'Job-{lsf_host}-{delayed_job_models.JobStatuses.QUEUED}'
with self.flask_app.app_context():
daemon.check_jobs_status()
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
daemon.check_jobs_status(job_id_to_check)
lock_got = locks.get_lock_for_lsf_host(current_lsf_host)
lock_got = locks.get_lock_for_job(job_id_to_check)
self.assertIsNone(lock_got, msg='The LSF lock was not deleted!')
......@@ -140,7 +140,7 @@ class DelayedJob(DB.Model):
lsf_check_lock_owner = DB.Column(DB.String(length=240))
last_lsf_checked_at = DB.Column(DB.DateTime)
last_lsf_check_status = DB.Column(DB.Integer)
lsf_check_log = DB.Column(DB.Text) # log for the checks in LSF
lsf_check_log = DB.Column(DB.Text) # log for the checks in LSF
requirements_parameters_string = DB.Column(DB.Text)
status_description = DB.Column(DB.Text)
run_environment = DB.Column(DB.String(length=60))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment