Commit 6e5e5afd authored by David Mendez's avatar David Mendez
Browse files

Status Daemon: append to lsk check log instead of printing to std out

parent 85efdf90
......@@ -18,6 +18,7 @@ from app.models import delayed_job_models
from app.cache import CACHE
from app.rate_limiter import RATE_LIMITER
def create_app():
"""
Creates the flask app
......@@ -65,5 +66,6 @@ def create_app():
return flask_app
if __name__ == '__main__':
flask_app = create_app()
......@@ -46,35 +46,41 @@ def check_jobs_status(job_id, delete_lock_after_finishing=True):
sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time')
existing_lock = locks.get_lock_for_job(job_id)
print('existing_lock: ')
print(existing_lock)
if existing_lock is not None:
if existing_lock != my_hostname:
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
check_msg = f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
return sleep_time, False
print(f'Locking LSF status check for job {job_id}, I am {my_hostname}')
check_msg = f'Locking LSF status check for job {job_id}, I am {my_hostname}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
locks.set_lock_for_job(job_id, my_hostname)
lsf_job_id = delayed_job_models.get_job_by_id(job_id).lsf_job_id
print(f'lsf_job_id: {lsf_job_id}')
check_msg = f'lsf_job_id: {lsf_job_id}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
script_path = prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id)
must_run_script = RUN_CONFIG.get('run_status_script', True)
if not must_run_script:
print('Not running script because run_status_script is False')
check_msg = 'Not running script because run_status_script is False'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, False
try:
script_output = get_status_script_output(script_path)
os.remove(script_path) # Remove the script after running so it doesn't fill up the NFS
print(f'deleted script: {script_path}')
check_msg = f'deleted script: {script_path}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
parse_bjobs_output(script_output)
locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
return sleep_time, True
except JobStatusDaemonError as error:
print(error)
check_msg = f'ERROR: \n {error}'
delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
def get_check_job_status_script_path(job_id, lsf_job_id):
......
......@@ -476,6 +476,21 @@ def update_job_progress(job_id, progress, status_log, status_description):
return job
def append_to_lsf_check_log(job_id, checker_name, check_log):
"""
Appends to the lsf check log
:param job_id: id of the job to modify
:param checker_name: name of the process that checked the job in lsf
:param check_log: text to append
"""
job = get_job_by_id(job_id)
if job.lsf_check_log is None:
job.lsf_check_log = ''
job.lsf_check_log += f'{datetime.datetime.now().isoformat()}-{checker_name}: {check_log}\n'
save_job(job)
def add_input_file_to_job(job, input_file):
"""
Adds an input file to a job and saves the job
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment