Commit de398e95 authored by David Mendez's avatar David Mendez
Browse files

Status daemon: save statistics when jobs finished or errored

parent bbadb33c
......@@ -15,6 +15,8 @@ from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.blueprints.job_submission.services import job_submission_service
from app.job_status_daemon import locks
from app.job_statistics import statistics_saver
from app.job_status_daemon.job_statistics import statistics_generator
AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run')
if not os.path.isabs(AGENT_RUN_DIR):
......@@ -218,6 +220,7 @@ def react_to_bjobs_json_output(json_output):
if job.num_failures is None:
job.num_failures = 0
job.num_failures += 1
save_job_statistics(job)
elif new_status == delayed_job_models.JobStatuses.FINISHED:
......@@ -225,10 +228,30 @@ def react_to_bjobs_json_output(json_output):
parse_job_finished_at_time_if_not_set(job, record)
set_job_expiration_time(job)
save_job_outputs(job)
save_job_statistics(job)
delayed_job_models.save_job(job)
print(f'Job {job.id} with lsf id {job.lsf_job_id} new state is {new_status}')
def save_job_statistics(job):
"""
Saves the corresponding statistics for the job entered as parameter
:param job: job object for which to save the statistics
"""
statistics_saver.save_job_record(
job_type=job.type,
run_env_type=RUN_CONFIG.get('run_env'),
lsf_host=job.lsf_host,
started_at=job.started_at.timestamp() * 1000,
finished_at=job.finished_at.timestamp() * 1000,
seconds_taken_from_created_to_running=statistics_generator.get_seconds_from_created_to_running(job),
seconds_taken_from_running_to_finished_or_error=statistics_generator.get_seconds_from_running_to_finished(job),
final_state=job.status,
num_output_files=statistics_generator.get_num_output_files_of_job(job),
total_output_bytes=statistics_generator.get_total_bytes_of_output_files_of_job(job),
num_input_files=statistics_generator.get_num_input_files_of_job(job),
total_input_bytes=statistics_generator.get_total_bytes_of_input_files_of_job(job)
)
def set_job_expiration_time(job):
"""
......
......@@ -4,11 +4,11 @@ This Module generates generic statistics for a job
import os
def get_seconds_from_created_to_queued(job):
def get_seconds_from_created_to_running(job):
"""
:param job: DelayedJob object for which to do the calculation.
:return: the amount of seconds that passed from the time the job was created to the time it
was queued
started running
"""
created_at = job.created_at
started_at = job.started_at
......
......@@ -47,7 +47,7 @@ class TestJobStatisticsGeneration(unittest.TestCase):
started_at=started_at
)
seconds_got = statistics_generator.get_seconds_from_created_to_queued(job)
seconds_got = statistics_generator.get_seconds_from_created_to_running(job)
self.assertEqual(seconds_got, seconds_must_be,
msg='The seconds from created to queued were not calculated correctly!')
......
......@@ -66,7 +66,10 @@ class TestJobStatusDaemon(unittest.TestCase):
lsf_job_id=i,
status=status,
lsf_host=assigned_host,
run_environment=run_environment
run_environment=run_environment,
created_at=datetime.utcnow(),
started_at=datetime.utcnow() + timedelta(seconds=1),
finished_at=datetime.utcnow() + timedelta(seconds=2),
)
job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
os.makedirs(job.output_dir_path, exist_ok=True)
......@@ -98,7 +101,10 @@ class TestJobStatusDaemon(unittest.TestCase):
lsf_job_id=i,
status=status,
lsf_host=assigned_host,
run_environment=run_environment
run_environment=run_environment,
created_at=datetime.utcnow(),
started_at=datetime.utcnow() + timedelta(seconds=1),
finished_at=datetime.utcnow() + timedelta(seconds=2)
)
job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
os.makedirs(job.output_dir_path, exist_ok=True)
......@@ -133,7 +139,10 @@ class TestJobStatusDaemon(unittest.TestCase):
lsf_job_id=i,
status=status,
lsf_host=lsf_host,
run_environment=run_env
run_environment=run_env,
created_at=datetime.utcnow(),
started_at=datetime.utcnow() + timedelta(seconds=1),
finished_at=datetime.utcnow() + timedelta(seconds=2)
)
job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
os.makedirs(job.output_dir_path, exist_ok=True)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment