""" Module that implements the daemon that checks the statuses of jobs in LSF """ import os from pathlib import Path import socket import datetime import stat import subprocess import re import json import time import shutil from app.models import delayed_job_models from app.config import RUN_CONFIG from app.blueprints.job_submission.services import job_submission_service from app.job_status_daemon import locks from app.job_statistics import statistics_saver from app.job_status_daemon.job_statistics import statistics_generator from app import utils AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run') if not os.path.isabs(AGENT_RUN_DIR): AGENT_RUN_DIR = Path(AGENT_RUN_DIR).resolve() os.makedirs(AGENT_RUN_DIR, exist_ok=True) class JobStatusDaemonError(Exception): """Base class for exceptions in this module.""" print('------------------------------------------------------------------------------') print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}') print('------------------------------------------------------------------------------') def check_job_status(job_id, delete_lock_after_finishing=True): """ The main function of this module. Checks for jobs to check the status, and checks their status in lsf :param job_id: job id to check :param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing :return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs were checked or not """ my_hostname = socket.gethostname() sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time') existing_lock = locks.get_lock_for_job(job_id) if existing_lock is not None: if existing_lock != my_hostname: check_msg = f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) return sleep_time, False check_msg = f'Locking LSF status check for job {job_id}, I am {my_hostname}' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) locks.set_lock_for_job(job_id, my_hostname) lsf_job_id = delayed_job_models.get_job_by_id(job_id).lsf_job_id check_msg = f'lsf_job_id: {lsf_job_id}' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) script_path = prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id) must_run_script = RUN_CONFIG.get('run_status_script', True) if not must_run_script: check_msg = 'Not running script because run_status_script is False' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None return sleep_time, False try: script_output = get_status_script_output(job_id, my_hostname, script_path) os.remove(script_path) # Remove the script after running so it doesn't fill up the NFS check_msg = f'deleted script: {script_path}' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) parse_bjobs_output(script_output) locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None return sleep_time, True except JobStatusDaemonError as error: print(error) check_msg = f'ERROR: \n {error}' delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg) def get_check_job_status_script_path(job_id, lsf_job_id): """ :param job_id: job id for which to get the script path :param lsf_job_id: lsf job id to check :return: the path to use for creating the job status script """ filename = f'{utils.get_utc_now().strftime("%Y-%m-%d-%H-%M-%S")}_job_id-{job_id}_lsf_job_id-{lsf_job_id}' \ f'_check_lsf_job_status.sh' job_status_check_script_path = Path(AGENT_RUN_DIR).joinpath(socket.gethostname(), filename) return job_status_check_script_path # ---------------------------------------------------------------------------------------------------------------------- # Preparing status script # ---------------------------------------------------------------------------------------------------------------------- def prepare_job_status_check_script(job_id, lsf_job_id): """ Prepares the script that will check for the job status to LSF :param job_id: the job id being checked :param lsf_job_id: the lsf job if to check :return: the final path of the script that was created """ job_status_script_template_path = os.path.join(Path().absolute(), 'templates', 'get_jobs_status.sh') with open(job_status_script_template_path, 'r') as template_file: job_status_template = template_file.read() lsf_config = RUN_CONFIG.get('lsf_submission') lsf_user = lsf_config['lsf_user'] lsf_host = lsf_config['lsf_host'] job_submission_script = job_status_template.format( LSF_JOB_IDS=lsf_job_id, LSF_USER=lsf_user, LSF_HOST=lsf_host ) status_script_path = get_check_job_status_script_path(job_id=job_id, lsf_job_id=lsf_job_id) status_script_path.parent.mkdir(parents=True, exist_ok=True) with open(status_script_path, 'w') as status_script_file: status_script_file.write(job_submission_script) print(f'created script: {status_script_path}') # make sure file is executable file_stats = os.stat(status_script_path) os.chmod(status_script_path, file_stats.st_mode | stat.S_IEXEC) return status_script_path # ---------------------------------------------------------------------------------------------------------------------- # Parsing status script output # ---------------------------------------------------------------------------------------------------------------------- def get_status_script_output(job_id, checker_name, script_path): """ Runs the status script and returns a text with the output obtained, if there is an error raises an exception :param job_id: id of the job, used to save the outputs to the check log :param checker_name: name of the process that checks the job in lsf :param script_path: path of the script :return: the text output of stdout """ lsf_config = RUN_CONFIG.get('lsf_submission') id_rsa_path = lsf_config['id_rsa_file'] run_command = f'{script_path} {id_rsa_path}' check_msg = f'Going to run job status script, command: {run_command}' delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg) status_check_process = subprocess.run(run_command.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE) return_code = status_check_process.returncode check_msg = f'script return code was: {return_code}' delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg) delayed_job_models.set_last_lsf_check_status(job_id, return_code) delayed_job_models.set_last_lsf_checked_at(job_id) if return_code not in [0, 255]: status_output_path = f'{script_path}.out' status_error_path = f'{script_path}.err' with open(status_output_path, 'wb') as status_out_file: status_out_file.write(status_check_process.stdout) with open(status_error_path, 'wb') as status_err_file: status_err_file.write(status_check_process.stderr) check_msg = f'There was an error when running the job status script! Check the directory {script_path} for logs!' delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg) check_msg = f'Output: \n {status_check_process.stdout}' delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg) check_msg = f'Error: \n {status_check_process.stderr}' delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg) raise JobStatusDaemonError('There was an error when running the job status script! Please check the logs') else: return status_check_process.stdout.decode() def parse_bjobs_output(script_output): """ parses the output passed as parameter. Modifies the status of the job in the database accordingly :param script_output: string output of the script that requests the status of the job """ match = re.search(r'START_REMOTE_SSH[\s\S]*(FINISH_REMOTE_SSH|$)', script_output) bjobs_output_str = re.split(r'(START_REMOTE_SSH\n|\nFINISH_REMOTE_SSH)', match.group(0))[2] try: json_output = json.loads(bjobs_output_str) react_to_bjobs_json_output(json_output) except json.decoder.JSONDecodeError as error: print(f'unable to decode output. Will try again later anyway {error}') def react_to_bjobs_json_output(json_output): """ Reads the dict obtained from the status script output, modifies the jobs accordingly :param json_output: dict with the output parsed from running the command """ for record in json_output['RECORDS']: lsf_id = record['JOBID'] lsf_status = record.get('STAT', 'ERROR') new_status_must_be = map_lsf_status_to_job_status(lsf_status) job = delayed_job_models.get_job_by_lsf_id(lsf_id) old_status = job.status status_changed = old_status != new_status_must_be if not status_changed: continue if new_status_must_be == delayed_job_models.JobStatuses.RUNNING: parse_job_started_at_time_if_not_set(job, record) elif new_status_must_be == delayed_job_models.JobStatuses.ERROR: # If the job ran too fast, the started at could have not been captured by my previous run. parse_job_started_at_time_if_not_set(job, record) parse_job_finished_at_time_if_not_set(job, record) if job.num_failures is None: job.num_failures = 0 job.num_failures += 1 elif new_status_must_be == delayed_job_models.JobStatuses.FINISHED: parse_job_started_at_time_if_not_set(job, record) parse_job_finished_at_time_if_not_set(job, record) save_job_outputs(job) job.update_run_status(new_status_must_be) delayed_job_models.save_job(job) print(f'Job {job.id} with lsf id {job.lsf_job_id} new state is {new_status_must_be}') if job.status in [delayed_job_models.JobStatuses.ERROR, delayed_job_models.JobStatuses.FINISHED]: save_job_statistics(job) def save_job_statistics(job): """ Saves the corresponding statistics for the job entered as parameter :param job: job object for which to save the statistics """ statistics_saver.save_job_record( job_type=str(job.type), run_env_type=RUN_CONFIG.get('run_env'), lsf_host=job.lsf_host, started_at=job.finished_at.timestamp() * 1000, finished_at=job.finished_at.timestamp() * 1000, seconds_taken_from_created_to_running=statistics_generator.get_seconds_from_created_to_running(job), seconds_taken_from_running_to_finished_or_error=statistics_generator.get_seconds_from_running_to_finished(job), final_state=job.status, num_output_files=statistics_generator.get_num_output_files_of_job(job), total_output_bytes=statistics_generator.get_total_bytes_of_output_files_of_job(job), num_input_files=statistics_generator.get_num_input_files_of_job(job), total_input_bytes=statistics_generator.get_total_bytes_of_input_files_of_job(job) ) def parse_job_started_at_time_if_not_set(job, lsf_record): """ saves the started at time of the job from the lsf record obtained if it was not set before :param job: job object to which save the started at time :param lsf_record: record obtained from bjobs output """ if job.started_at is None: lsf_date_str = lsf_record.get('START_TIME') started_at = parse_bjobs_output_date(lsf_date_str) job.started_at = started_at print(f'Job {job.id} started at time is {started_at}') def parse_job_finished_at_time_if_not_set(job, lsf_record): """ saves the started at time of the job from the lsf record obtained if it was not set before :param job: job object to which save the started at time :param lsf_record: record obtained from bjobs output """ if job.finished_at is None: lsf_date_str = lsf_record.get('FINISH_TIME') finished_at = parse_bjobs_output_date(lsf_date_str) job.finished_at = finished_at print(f'Job {job.id} finished at time is {finished_at}') def map_lsf_status_to_job_status(lsf_status): """ maps the lsf status to a status defined in models :param lsf_status: status obtained from lsf :return: one of the status defined in the delayed_jobs_models module """ if lsf_status == 'RUN': return delayed_job_models.JobStatuses.RUNNING elif lsf_status == 'PEND': return delayed_job_models.JobStatuses.QUEUED elif lsf_status == 'EXIT': return delayed_job_models.JobStatuses.ERROR elif lsf_status == 'DONE': return delayed_job_models.JobStatuses.FINISHED else: return delayed_job_models.JobStatuses.UNKNOWN def parse_bjobs_output_date(lsf_date_str): """ Parses the data obtained by the date provided by the bjobs output :param lsf_date_str: the string received from bjobs :return: a python datetime representing the date parsed """ # Just return current date, to avoid date parsing issues. LSF is not responding to the -hms parameter return datetime.datetime.utcnow() def save_job_outputs(job): """ Lists the files of the output dir of the job and saves the corresponding output objects :param job: job that is finished """ job_outputs_dir = job.output_dir_path paths_list = [] append_files_in_dir(job_outputs_dir, paths_list) for absolute_path in paths_list: relative_path = absolute_path.replace(f'{job_submission_service.JOBS_OUTPUT_DIR}/', '', 1) output_url = get_output_file_url(relative_path) delayed_job_models.add_output_to_job(job, absolute_path, output_url) print(f'Added output file {absolute_path} with url {output_url} to job {job.id}') def append_files_in_dir(path, paths_list): """ Appends to the lists all the paths of the files in path and subdirectories recursively :param path: base directory for which to list the files :param paths_list: list where to accumulate the paths """ for item in os.listdir(path): abs_path = Path(path).joinpath(item).resolve() if os.path.isfile(abs_path): paths_list.append(str(abs_path)) else: append_files_in_dir(abs_path, paths_list) def get_output_file_url(file_relative_path): """ :param file_relative_path: the relative path from the job outputs dir. For example: Job-1/subdir/output_0.txt :return: the url of an output file given a path from the job outputs dir """ server_base_path = RUN_CONFIG.get('base_path', '') if server_base_path == '': server_base_path_with_slash = '' else: server_base_path_with_slash = f'{server_base_path}/' if server_base_path_with_slash.startswith('//'): server_base_path_with_slash = server_base_path_with_slash[1:] if server_base_path_with_slash.endswith('/'): server_base_path_with_slash = server_base_path_with_slash[:-1] outputs_base_path = RUN_CONFIG.get('outputs_base_path') return f'{server_base_path_with_slash}/{outputs_base_path}/{file_relative_path}' def delete_old_daemon_files(): """ Deletes old daemon scripts that may have been created by the daemons The time for which a scrip t is considered old is determined by the configuration. :return: the number of items that were deleted """ daemon_scripts_expiration_minutes = RUN_CONFIG.get('daemon_scripts_expiration_minutes') now = time.time() expiration_time = now - (daemon_scripts_expiration_minutes * 60) num_items_deleted = 0 for item in os.listdir(AGENT_RUN_DIR): full_path = os.path.join(AGENT_RUN_DIR, item) modified_time = os.path.getmtime(full_path) if modified_time < expiration_time: if os.path.isfile(full_path): os.remove(full_path) else: shutil.rmtree(full_path) num_items_deleted += 1 return num_items_deleted