Commit 7b387c4c authored by David Mendez's avatar David Mendez
Browse files

Remove funtions that determine which jobs to check in LSF

parent 54b050ee
......@@ -19,7 +19,6 @@ from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
from app.job_statistics import statistics_saver
from app.job_status_daemon.job_statistics import statistics_generator
import app.app_logging as app_logging
AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run')
if not os.path.isabs(AGENT_RUN_DIR):
......@@ -36,10 +35,11 @@ print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}')
print('------------------------------------------------------------------------------')
def check_jobs_status(delete_lock_after_finishing=True):
def check_jobs_status(delete_lock_after_finishing=True, only_job_id=None):
"""
The main function of this module. Checks for jobs to check the status, and checks their status in lsf
:param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing
:param only_job_id: used to specify a job id to check, not the ones that are available for checkin
:return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs
were checked or not
"""
......@@ -63,7 +63,10 @@ def check_jobs_status(delete_lock_after_finishing=True):
locks.set_lsf_lock(current_lsf_host, my_hostname)
print('Looking for jobs to check...')
lsf_job_ids_to_check = get_lsf_job_ids_to_check()
if only_job_id is None:
lsf_job_ids_to_check = get_lsf_job_ids_to_check()
else:
lsf_job_ids_to_check = [only_job_id]
print(f'lsf_job_ids_to_check: {lsf_job_ids_to_check}')
if len(lsf_job_ids_to_check) == 0:
......@@ -88,20 +91,6 @@ def check_jobs_status(delete_lock_after_finishing=True):
print(error)
def get_lsf_job_ids_to_check():
"""
:return: a list of LSF job IDs for which it is necessary check the status in the LSF cluster. The jobs that are
checked are the ones that:
1. Were submitted to the same LSF cluster that I am running with (defined in configuration)
2. Are not in Error or Finished state.
"""
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
return delayed_job_models.get_lsf_job_ids_to_check(lsf_host)
def get_check_job_status_script_path():
"""
:return: the path to use for creating the job status script
......
......@@ -13,6 +13,14 @@ def get_lock_for_lsf_host(lsf_host):
"""
return cache.fail_proof_get(key=lsf_host)
# def get_lock_for_lsf_host(lsf_host):
# """
# Returns a lock for a lsf host if it exists
# :param lsf_host: lsf host for which get the lock
# :return: dict with the lock for the lsf_host given as parameter, None if it doesn't exist
# """
# return cache.fail_proof_get(key=lsf_host)
def set_lsf_lock(lsf_host, lock_owner):
"""
......
......@@ -146,75 +146,7 @@ class TestJobStatusDaemon(unittest.TestCase):
delayed_job_models.save_job(job)
i += 1
def test_determines_for_which_jobs_check_status_0(self):
"""
Given a set of jobs currently in the database, knows for which it is required to check the status.
In this case, some jobs require a check.
"""
self.create_test_jobs_0()
with self.flask_app.app_context():
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
status_is_not_error_or_finished = delayed_job_models.DelayedJob.status.notin_(
[delayed_job_models.JobStatuses.ERROR, delayed_job_models.JobStatuses.FINISHED]
)
lsf_host_is_my_host = delayed_job_models.DelayedJob.lsf_host == lsf_host
job_to_check_status_must_be = delayed_job_models.DelayedJob.query.filter(
and_(lsf_host_is_my_host, status_is_not_error_or_finished)
)
lsf_ids_to_check_status_must_be = [job.lsf_job_id for job in job_to_check_status_must_be]
lsf_ids_to_check_got = daemon.get_lsf_job_ids_to_check()
self.assertListEqual(lsf_ids_to_check_status_must_be, lsf_ids_to_check_got,
msg='The jobs for which to check the status were not created correctly!')
def test_determines_for_which_jobs_check_status_1(self):
"""
Given a set of jobs currently in the database, knows for which it is required to check the status.
In this case, some jobs require a check, the ones running in the same run environment
"""
self.create_test_jobs_2()
current_run_environment = RUN_CONFIG.get('run_env')
with self.flask_app.app_context():
lsf_config = RUN_CONFIG.get('lsf_submission')
lsf_host = lsf_config['lsf_host']
status_is_not_error_or_finished = delayed_job_models.DelayedJob.status.notin_(
[delayed_job_models.JobStatuses.ERROR, delayed_job_models.JobStatuses.FINISHED]
)
lsf_host_is_my_host = delayed_job_models.DelayedJob.lsf_host == lsf_host
run_environment_is_my_current_environment = \
delayed_job_models.DelayedJob.run_environment == current_run_environment
job_to_check_status_must_be = delayed_job_models.DelayedJob.query.filter(
and_(lsf_host_is_my_host, status_is_not_error_or_finished, run_environment_is_my_current_environment)
)
lsf_ids_to_check_status_must_be = [job.lsf_job_id for job in job_to_check_status_must_be]
job_ids_must_be = [job.id for job in job_to_check_status_must_be]
lsf_ids_to_check_got = daemon.get_lsf_job_ids_to_check()
self.assertListEqual(lsf_ids_to_check_status_must_be, lsf_ids_to_check_got,
msg=f'The jobs for which to check the status were not created '
f'correctly! jobs must be {job_ids_must_be}')
def test_determines_for_which_jobs_check_status_2(self):
"""
Given a set of jobs currently in the database, knows for which it is required to check the status.
In this case, NO jobs require a check.
"""
self.create_test_jobs_1()
with self.flask_app.app_context():
lsf_ids_to_check_status_must_be = []
lsf_ids_to_check_got = daemon.get_lsf_job_ids_to_check()
self.assertListEqual(lsf_ids_to_check_status_must_be, lsf_ids_to_check_got,
msg='The jobs for which to check the status were not created correctly!')
# TODO: CONTINUE HERE
def test_produces_a_correct_job_status_check_script_path(self):
"""
Test that produces a correct path for the job status script
......
......@@ -137,6 +137,7 @@ class DelayedJob(DB.Model):
num_failures = DB.Column(DB.Integer, default=0) # How many times the job has failed.
lsf_job_id = DB.Column(DB.Integer)
lsf_host = DB.Column(DB.Text)
lsf_check_lock_owner = DB.Column(DB.String(length=240))
last_lsf_checked_at = DB.Column(DB.DateTime)
last_lsf_check_status = DB.Column(DB.Integer)
lsf_check_log = DB.Column(DB.Text) # log for the checks in LSF
......@@ -606,40 +607,6 @@ def get_custom_config_values(job_type):
return CustomJobConfig.query.filter_by(job_type=job_type)
def get_lsf_job_ids_to_check(lsf_host):
"""
:param lsf_host: lsf host for which to return the jobs to check
:return: a list of LSF job IDs for which it is necessary check the status in the LSF cluster. The jobs that are
checked are the ones that:
1. Were submitted to the same LSF cluster that I am running with (defined in configuration)
2. Are not in Error or Finished state.
"""
DB.session.commit()
status_is_not_error_or_finished = DelayedJob.status.notin_(
[JobStatuses.ERROR, JobStatuses.FINISHED]
)
lsf_host_is_my_host = DelayedJob.lsf_host == lsf_host
current_run_environment = RUN_CONFIG.get('run_env')
run_environment_is_my_current_environment = \
DelayedJob.run_environment == current_run_environment
job_to_check_status = DelayedJob.query.filter(
and_(lsf_host_is_my_host, status_is_not_error_or_finished, run_environment_is_my_current_environment)
)
# Make sure there are no None value. This can happen when the server has created a job and is submitting it, and the
# same time the daemon asks for jobs to check. This makes the daemon crash.
ids = [job.lsf_job_id for job in job_to_check_status if job.lsf_job_id is not None]
DB.session.commit()
return ids
def add_output_to_job(job, internal_path, public_url):
"""
Adds an output to the job given as a parameter
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment