daemon.py 16.3 KB
Newer Older
1 2 3
"""
Module that implements the daemon that checks the statuses of jobs in LSF
"""
4 5 6
import os
from pathlib import Path
import socket
7
import datetime
8
import stat
9
import subprocess
10 11
import re
import json
12 13
import time
import shutil
14 15 16

from app.models import delayed_job_models
from app.config import RUN_CONFIG
17
from app.blueprints.job_submission.services import job_submission_service
18
from app.job_status_daemon import locks
19 20
from app.job_statistics import statistics_saver
from app.job_status_daemon.job_statistics import statistics_generator
21
from app import utils
22

23 24 25 26 27
AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run')
if not os.path.isabs(AGENT_RUN_DIR):
    AGENT_RUN_DIR = Path(AGENT_RUN_DIR).resolve()
os.makedirs(AGENT_RUN_DIR, exist_ok=True)

28

29 30 31
class JobStatusDaemonError(Exception):
    """Base class for exceptions in this module."""

32

33 34 35 36
print('------------------------------------------------------------------------------')
print(f'AGENT_RUN_DIR: {AGENT_RUN_DIR}')
print('------------------------------------------------------------------------------')

37

38
def check_job_status(job_id, delete_lock_after_finishing=True):
39 40
    """
    The main function of this module. Checks for jobs to check the status, and checks their status in lsf
41
    :param job_id: job id to check
42 43 44
    :param delete_lock_after_finishing: determines if explicitly deletes the lock after finishing
    :return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the jobs
    were checked or not
45
    """
46
    my_hostname = socket.gethostname()
47
    sleep_time = RUN_CONFIG.get('status_agent').get('sleep_time')
48

49
    existing_lock = locks.get_lock_for_job(job_id)
50
    if existing_lock is not None:
51
        if existing_lock != my_hostname:
52 53
            check_msg = f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again'
            delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
54
            return sleep_time, False
55

56 57 58
    check_msg = f'Locking LSF status check for job {job_id}, I am {my_hostname}'
    delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)

59
    locks.set_lock_for_job(job_id, my_hostname)
60

61
    lsf_job_id = delayed_job_models.get_job_by_id(job_id).lsf_job_id
62 63
    check_msg = f'lsf_job_id: {lsf_job_id}'
    delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
64

65
    script_path = prepare_job_status_check_script(job_id=job_id, lsf_job_id=lsf_job_id)
66 67
    must_run_script = RUN_CONFIG.get('run_status_script', True)
    if not must_run_script:
68 69
        check_msg = 'Not running script because run_status_script is False'
        delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
70
        locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
71
        return sleep_time, False
72

73
    try:
74
        script_output = get_status_script_output(job_id, my_hostname, script_path)
75
        os.remove(script_path)  # Remove the script after running so it doesn't fill up the NFS
76 77
        check_msg = f'deleted script: {script_path}'
        delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
78
        parse_bjobs_output(script_output)
79
        locks.delete_lock_for_job(job_id) if delete_lock_after_finishing else None
80
        return sleep_time, True
81 82
    except JobStatusDaemonError as error:
        print(error)
83 84
        check_msg = f'ERROR: \n {error}'
        delayed_job_models.append_to_lsf_check_log(job_id, my_hostname, check_msg)
85

86

87
def get_check_job_status_script_path(job_id, lsf_job_id):
88
    """
89 90
    :param job_id: job id for which to get the script path
    :param lsf_job_id: lsf job id to check
91 92 93
    :return: the path to use for creating the job status script
    """

94
    filename = f'{utils.get_utc_now().strftime("%Y-%m-%d-%H-%M-%S")}_job_id-{job_id}_lsf_job_id-{lsf_job_id}' \
95
               f'_check_lsf_job_status.sh'
96 97 98
    job_status_check_script_path = Path(AGENT_RUN_DIR).joinpath(socket.gethostname(), filename)

    return job_status_check_script_path
99

100

101 102 103
# ----------------------------------------------------------------------------------------------------------------------
# Preparing status script
# ----------------------------------------------------------------------------------------------------------------------
104
def prepare_job_status_check_script(job_id, lsf_job_id):
105
    """
106
    Prepares the script that will check for the job status to LSF
107 108
    :param job_id: the job id being checked
    :param lsf_job_id: the lsf job if to check
109
    :return: the final path of the script that was created
110
    """
111 112 113 114 115 116 117 118 119 120

    job_status_script_template_path = os.path.join(Path().absolute(), 'templates', 'get_jobs_status.sh')
    with open(job_status_script_template_path, 'r') as template_file:
        job_status_template = template_file.read()

        lsf_config = RUN_CONFIG.get('lsf_submission')
        lsf_user = lsf_config['lsf_user']
        lsf_host = lsf_config['lsf_host']

        job_submission_script = job_status_template.format(
121
            LSF_JOB_IDS=lsf_job_id,
122 123 124 125
            LSF_USER=lsf_user,
            LSF_HOST=lsf_host
        )

126
        status_script_path = get_check_job_status_script_path(job_id=job_id, lsf_job_id=lsf_job_id)
127 128 129 130
        status_script_path.parent.mkdir(parents=True, exist_ok=True)
        with open(status_script_path, 'w') as status_script_file:
            status_script_file.write(job_submission_script)

131
        print(f'created script: {status_script_path}')
132 133 134 135
        # make sure file is executable
        file_stats = os.stat(status_script_path)
        os.chmod(status_script_path, file_stats.st_mode | stat.S_IEXEC)

136
    return status_script_path
137

138

139 140 141
# ----------------------------------------------------------------------------------------------------------------------
# Parsing status script output
# ----------------------------------------------------------------------------------------------------------------------
142
def get_status_script_output(job_id, checker_name, script_path):
143
    """
144
    Runs the status script and returns a text with the output obtained, if there is an error raises an exception
145 146
    :param job_id: id of the job, used to save the outputs to the check log
    :param checker_name: name of the process that checks the job in lsf
147 148 149
    :param script_path: path of the script
    :return: the text output of stdout
    """
150 151 152
    lsf_config = RUN_CONFIG.get('lsf_submission')
    id_rsa_path = lsf_config['id_rsa_file']
    run_command = f'{script_path} {id_rsa_path}'
153 154
    check_msg = f'Going to run job status script, command: {run_command}'
    delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
155 156 157
    status_check_process = subprocess.run(run_command.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    return_code = status_check_process.returncode
158 159 160
    check_msg = f'script return code was: {return_code}'
    delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)
    delayed_job_models.set_last_lsf_check_status(job_id, return_code)
161
    delayed_job_models.set_last_lsf_checked_at(job_id)
162

163
    if return_code not in [0, 255]:
164

165 166 167 168 169 170 171 172
        status_output_path = f'{script_path}.out'
        status_error_path = f'{script_path}.err'

        with open(status_output_path, 'wb') as status_out_file:
            status_out_file.write(status_check_process.stdout)

        with open(status_error_path, 'wb') as status_err_file:
            status_err_file.write(status_check_process.stderr)
173

174 175 176
        check_msg = f'There was an error when running the job status script! Check the directory {script_path} for logs!'
        delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)

177 178 179 180 181 182
        check_msg = f'Output: \n {status_check_process.stdout}'
        delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)

        check_msg = f'Error: \n {status_check_process.stderr}'
        delayed_job_models.append_to_lsf_check_log(job_id, checker_name, check_msg)

183
        raise JobStatusDaemonError('There was an error when running the job status script! Please check the logs')
184
    else:
185
        return status_check_process.stdout.decode()
186

187

188 189 190 191 192 193
def parse_bjobs_output(script_output):
    """
    parses the output passed as parameter. Modifies the status of the job in the database accordingly
    :param script_output: string output of the script that requests the status of the job
    """

194
    match = re.search(r'START_REMOTE_SSH[\s\S]*(FINISH_REMOTE_SSH|$)', script_output)
195 196 197 198
    bjobs_output_str = re.split(r'(START_REMOTE_SSH\n|\nFINISH_REMOTE_SSH)', match.group(0))[2]

    try:
        json_output = json.loads(bjobs_output_str)
David Mendez's avatar
David Mendez committed
199
        react_to_bjobs_json_output(json_output)
200
    except json.decoder.JSONDecodeError as error:
David Mendez's avatar
David Mendez committed
201
        print(f'unable to decode output. Will try again later anyway {error}')
202

203

David Mendez's avatar
David Mendez committed
204
def react_to_bjobs_json_output(json_output):
205 206 207 208 209 210
    """
    Reads the dict obtained from the status script output, modifies the jobs accordingly
    :param json_output: dict with the output parsed from running the command
    """
    for record in json_output['RECORDS']:
        lsf_id = record['JOBID']
211 212
        lsf_status = record.get('STAT', 'ERROR')

213
        new_status_must_be = map_lsf_status_to_job_status(lsf_status)
214
        job = delayed_job_models.get_job_by_lsf_id(lsf_id)
215 216

        old_status = job.status
217
        status_changed = old_status != new_status_must_be
218 219 220
        if not status_changed:
            continue

221
        if new_status_must_be == delayed_job_models.JobStatuses.RUNNING:
222

223 224
            parse_job_started_at_time_if_not_set(job, record)

225
        elif new_status_must_be == delayed_job_models.JobStatuses.ERROR:
226

227 228 229
            # If the job ran too fast, the started at could have not been captured by my previous run.
            parse_job_started_at_time_if_not_set(job, record)
            parse_job_finished_at_time_if_not_set(job, record)
230 231 232
            if job.num_failures is None:
                job.num_failures = 0
            job.num_failures += 1
233

234

235
        elif new_status_must_be == delayed_job_models.JobStatuses.FINISHED:
236 237 238

            parse_job_started_at_time_if_not_set(job, record)
            parse_job_finished_at_time_if_not_set(job, record)
239 240
            save_job_outputs(job)

241 242
        job.update_run_status(new_status_must_be)

243
        delayed_job_models.save_job(job)
244
        print(f'Job {job.id} with lsf id {job.lsf_job_id} new state is {new_status_must_be}')
245

246 247 248
        if job.status in [delayed_job_models.JobStatuses.ERROR, delayed_job_models.JobStatuses.FINISHED]:
            save_job_statistics(job)

249

250 251 252 253 254 255
def save_job_statistics(job):
    """
    Saves the corresponding statistics for the job entered as parameter
    :param job: job object for which to save the statistics
    """
    statistics_saver.save_job_record(
256
        job_type=str(job.type),
257 258
        run_env_type=RUN_CONFIG.get('run_env'),
        lsf_host=job.lsf_host,
259 260
        started_at=job.finished_at.timestamp() * 1000,
        finished_at=job.finished_at.timestamp() * 1000,
261 262 263 264 265 266 267 268
        seconds_taken_from_created_to_running=statistics_generator.get_seconds_from_created_to_running(job),
        seconds_taken_from_running_to_finished_or_error=statistics_generator.get_seconds_from_running_to_finished(job),
        final_state=job.status,
        num_output_files=statistics_generator.get_num_output_files_of_job(job),
        total_output_bytes=statistics_generator.get_total_bytes_of_output_files_of_job(job),
        num_input_files=statistics_generator.get_num_input_files_of_job(job),
        total_input_bytes=statistics_generator.get_total_bytes_of_input_files_of_job(job)
    )
269

270

271 272 273 274 275 276 277
def parse_job_started_at_time_if_not_set(job, lsf_record):
    """
    saves the started at time of the job from the lsf record obtained if it was not set before
    :param job: job object to which save the started at time
    :param lsf_record: record obtained from bjobs output
    """
    if job.started_at is None:
278
        lsf_date_str = lsf_record.get('START_TIME')
279 280 281
        started_at = parse_bjobs_output_date(lsf_date_str)
        job.started_at = started_at
        print(f'Job {job.id} started at time is {started_at}')
282 283 284 285 286 287 288 289 290


def parse_job_finished_at_time_if_not_set(job, lsf_record):
    """
    saves the started at time of the job from the lsf record obtained if it was not set before
    :param job: job object to which save the started at time
    :param lsf_record: record obtained from bjobs output
    """
    if job.finished_at is None:
291
        lsf_date_str = lsf_record.get('FINISH_TIME')
292 293 294
        finished_at = parse_bjobs_output_date(lsf_date_str)
        job.finished_at = finished_at
        print(f'Job {job.id} finished at time is {finished_at}')
295 296


297 298 299 300 301 302 303 304
def map_lsf_status_to_job_status(lsf_status):
    """
    maps the lsf status to a status defined in models
    :param lsf_status: status obtained from lsf
    :return: one of the status defined in the delayed_jobs_models module
    """
    if lsf_status == 'RUN':
        return delayed_job_models.JobStatuses.RUNNING
305 306
    elif lsf_status == 'PEND':
        return delayed_job_models.JobStatuses.QUEUED
307
    elif lsf_status == 'EXIT':
308
        return delayed_job_models.JobStatuses.ERROR
309 310
    elif lsf_status == 'DONE':
        return delayed_job_models.JobStatuses.FINISHED
311 312
    else:
        return delayed_job_models.JobStatuses.UNKNOWN
313

314

315 316 317 318 319 320 321
def parse_bjobs_output_date(lsf_date_str):
    """
    Parses the data obtained by the date provided by the bjobs output
    :param lsf_date_str: the string received from bjobs
    :return: a python datetime representing the date parsed
    """
    # Just return current date, to avoid date parsing issues. LSF is not responding to the -hms parameter
David Mendez's avatar
David Mendez committed
322
    return datetime.datetime.utcnow()
323

324

325 326 327 328 329 330 331 332 333 334
def save_job_outputs(job):
    """
    Lists the files of the output dir of the job and saves the corresponding output objects
    :param job: job that is finished
    """
    job_outputs_dir = job.output_dir_path

    paths_list = []
    append_files_in_dir(job_outputs_dir, paths_list)

335 336
    for absolute_path in paths_list:
        relative_path = absolute_path.replace(f'{job_submission_service.JOBS_OUTPUT_DIR}/', '', 1)
337
        output_url = get_output_file_url(relative_path)
338
        delayed_job_models.add_output_to_job(job, absolute_path, output_url)
David Mendez's avatar
David Mendez committed
339
        print(f'Added output file {absolute_path} with url {output_url} to job {job.id}')
340

341 342 343 344 345 346 347 348 349 350 351 352 353 354

def append_files_in_dir(path, paths_list):
    """
    Appends to the lists all the paths of the files in path and subdirectories recursively
    :param path: base directory for which to list the files
    :param paths_list: list where to accumulate the paths
    """
    for item in os.listdir(path):
        abs_path = Path(path).joinpath(item).resolve()
        if os.path.isfile(abs_path):
            paths_list.append(str(abs_path))
        else:
            append_files_in_dir(abs_path, paths_list)

355

356 357 358 359 360 361 362 363 364 365 366
def get_output_file_url(file_relative_path):
    """
    :param file_relative_path: the relative path from the job outputs dir.
    For example: Job-1/subdir/output_0.txt
    :return: the url of an output file given a path from the job outputs dir
    """

    server_base_path = RUN_CONFIG.get('base_path', '')
    if server_base_path == '':
        server_base_path_with_slash = ''
    else:
367 368 369
        server_base_path_with_slash = f'{server_base_path}/'
        if server_base_path_with_slash.startswith('//'):
            server_base_path_with_slash = server_base_path_with_slash[1:]
370 371
        if server_base_path_with_slash.endswith('/'):
            server_base_path_with_slash = server_base_path_with_slash[:-1]
372 373 374

    outputs_base_path = RUN_CONFIG.get('outputs_base_path')

375
    return f'{server_base_path_with_slash}/{outputs_base_path}/{file_relative_path}'
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405


def delete_old_daemon_files():
    """
    Deletes old daemon scripts that may have been created by the daemons
    The time for which a scrip t is considered old is determined by the configuration.
    :return: the number of items that were deleted
    """
    daemon_scripts_expiration_minutes = RUN_CONFIG.get('daemon_scripts_expiration_minutes')

    now = time.time()
    expiration_time = now - (daemon_scripts_expiration_minutes * 60)

    num_items_deleted = 0

    for item in os.listdir(AGENT_RUN_DIR):

        full_path = os.path.join(AGENT_RUN_DIR, item)
        modified_time = os.path.getmtime(full_path)

        if modified_time < expiration_time:

            if os.path.isfile(full_path):
                os.remove(full_path)
            else:
                shutil.rmtree(full_path)

            num_items_deleted += 1

    return num_items_deleted