Commit 11eb1e68 authored by David Mendez's avatar David Mendez
Browse files

Simplify locking system and use cache to store them

parent cc36e01f
......@@ -14,7 +14,7 @@ from app.config import RUN_CONFIG
from app.config import RunEnvs
from app.db import DB
from app.models import delayed_job_models
from app.cache import CACHE
def create_app():
"""
......@@ -40,6 +40,7 @@ def create_app():
with flask_app.app_context():
DB.init_app(flask_app)
CACHE.init_app(flask_app)
create_tables = RUN_CONFIG.get('sql_alchemy').get('create_tables', False)
if create_tables:
......
"""
Module that handles the connection with the cache
"""
from flask_caching import Cache
CACHE = Cache(config={'CACHE_TYPE': 'simple'})
\ No newline at end of file
......@@ -14,6 +14,7 @@ import random
from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
DEFAULT_SLEEP_TIME = 1
......@@ -29,54 +30,47 @@ print('-------------------------------------------------------------------------
print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}')
print('------------------------------------------------------------------------------')
def check_jobs_status():
def check_jobs_status(delete_lock_after_finishing=True):
"""
The main function of this module. Checks for jobs to check the status, and checks their status in lsf
:return: the amount of seconds to wait for the next run
:param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing
:return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs
were checked or not
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
current_lsf_host = lsf_config['lsf_host']
my_hostname = socket.gethostname()
existing_lock = delayed_job_models.get_lock_for_lsf_host(current_lsf_host)
existing_lock = locks.get_lock_for_lsf_host(current_lsf_host)
if existing_lock is not None:
lock_expired = existing_lock.expires_at < datetime.datetime.utcnow()
i_must_respect_lock = existing_lock.lsf_host == current_lsf_host and \
existing_lock.lock_owner != my_hostname and \
not lock_expired
sleep_time = DEFAULT_SLEEP_TIME + random.random()
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time, False
if i_must_respect_lock:
sleep_time = DEFAULT_SLEEP_TIME + random.random()
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time
else:
print(f'I ({my_hostname}) found a lock, but it expired, I will disrespect it.')
delayed_job_models.delete_lock(existing_lock)
delayed_job_models.lock_lsf_status_daemon(current_lsf_host, my_hostname)
else:
print(f'Locking LSF status check for {current_lsf_host}, I am {my_hostname}')
delayed_job_models.lock_lsf_status_daemon(current_lsf_host, my_hostname)
locks.set_lsf_lock(current_lsf_host, my_hostname)
print('Checking for jobs to check...')
print('Looking for jobs to check...')
lsf_job_ids_to_check = get_lsf_job_ids_to_check()
print(f'lsf_job_ids_to_check: {lsf_job_ids_to_check}')
if len(lsf_job_ids_to_check) == 0:
return DEFAULT_SLEEP_TIME
return DEFAULT_SLEEP_TIME, True
script_path = prepare_job_status_check_script(lsf_job_ids_to_check)
must_run_script = RUN_CONFIG.get('run_status_script', True)
if not must_run_script:
print('Not running script because run_status_script is False')
return DEFAULT_SLEEP_TIME
return DEFAULT_SLEEP_TIME, False
try:
script_output = get_status_script_output(script_path)
os.remove(script_path) # Remove the script after running so it doesn't fill up the NFS
print(f'deleted script: {script_path}')
parse_bjobs_output(script_output)
return DEFAULT_SLEEP_TIME
return DEFAULT_SLEEP_TIME, True
except JobStatusDaemonError as error:
print(error)
......
"""
Module that handles the locking system for the status daemons
"""
from app.config import RUN_CONFIG
from app.cache import CACHE
def get_lock_for_lsf_host(lsf_host):
"""
Returns a lock for a lsf host if it exists
:param lsf_host: lsf host for which get the lock
:return: dict with the lock for the lsf_host given as parameter, None if it doesn't exist
"""
return CACHE.get(key=lsf_host)
def set_lsf_lock(lsf_host, lock_owner):
"""
Creates a lock on the lsf_host given as parameter in the name of the owner given as parameter, it will expire in the
time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param lsf_host: cluster to lock
:param lock_owner: identifier (normally a hostname) of the process that owns the lock
"""
seconds_valid = RUN_CONFIG.get('status_agent').get('lock_validity_seconds')
lock_dict = {
'owner': lock_owner
}
CACHE.set(key=lsf_host, value=lock_dict, timeout=seconds_valid)
\ No newline at end of file
......@@ -12,7 +12,7 @@ def run():
flask_app = create_app()
with flask_app.app_context():
while True:
sleep_time = daemon.check_jobs_status()
sleep_time, jobs_were_checked = daemon.check_jobs_status()
time.sleep(sleep_time)
if __name__ == "__main__":
......
......@@ -17,6 +17,7 @@ from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.job_status_daemon import daemon
from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
class TestJobStatusDaemon(unittest.TestCase):
......@@ -330,11 +331,17 @@ class TestJobStatusDaemon(unittest.TestCase):
Tests that the daemon creates a lock while checking LSF
"""
with self.flask_app.app_context():
daemon.check_jobs_status()
daemon.check_jobs_status(delete_lock_after_finishing=False)
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
lock_got = delayed_job_models.get_lock_for_lsf_host(current_lsf_host)
lock_got = locks.get_lock_for_lsf_host(current_lsf_host)
self.assertIsNotNone(lock_got, msg='The LSF lock was not created!')
lock_hostname_got = lock_got.get('owner')
lock_hostname_must_be = socket.gethostname()
self.assertEqual(lock_hostname_got, lock_hostname_must_be, msg='The lock was not saved correctly!')
def test_agent_respects_a_lock(self):
"""
Tests that when a lock has been created for another host, the agent respects it. This means that
......@@ -342,25 +349,7 @@ class TestJobStatusDaemon(unittest.TestCase):
"""
with self.flask_app.app_context():
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
delayed_job_models.lock_lsf_status_daemon(current_lsf_host, 'another_owner')
sleep_time_got = daemon.check_jobs_status()
self.assertNotAlmostEqual(sleep_time_got % daemon.DEFAULT_SLEEP_TIME, 0, places=4,
msg='The sleep time must be greater than the default time and not a multiple of it '
'because there was a lock')
def test_agent_does_not_respect_an_expired_lock(self):
"""
Tests that when a lock has been created for another host, the agent does not respect it and creates its own lock
"""
with self.flask_app.app_context():
current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
delayed_job_models.lock_lsf_status_daemon(current_lsf_host, 'another_owner', seconds_valid=-1)
sleep_time_got = daemon.check_jobs_status()
self.assertEquals(sleep_time_got, daemon.DEFAULT_SLEEP_TIME,
msg='The lock must not be respected because it expired!')
locks.set_lsf_lock(current_lsf_host, 'another_owner')
my_hostname = socket.gethostname()
lock_got = delayed_job_models.get_lock_for_lsf_host(current_lsf_host)
self.assertEquals(lock_got.lock_owner, my_hostname, msg='A new lock must have been created owned by me!')
\ No newline at end of file
sleep_time_got, jobs_were_checked = daemon.check_jobs_status()
self.assertFalse(jobs_were_checked, msg='The jobs should have not been checked')
......@@ -9,4 +9,5 @@ python-dateutil
gunicorn==20.0.4
mysqlclient==1.4.2.post1
flask-cors==3.0.8
marshmallow==3.5.0
\ No newline at end of file
marshmallow==3.5.0
Flask-Caching==1.8.0
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment