Commit e82a3e0b authored by David Mendez's avatar David Mendez
Browse files

Status Agent: Set up a url for every output found after a job finishes

parent d6eb9946
......@@ -21,7 +21,12 @@ def create_app():
"""
base_path = RUN_CONFIG.get('base_path', '')
flask_app = Flask(__name__, static_url_path=f'{base_path}/outputs', static_folder=job_submission_service.JOBS_OUTPUT_DIR)
outputs_base_path = RUN_CONFIG.get('outputs_base_path', 'outputs')
flask_app = Flask(__name__,
static_url_path=f'{base_path}/{outputs_base_path}',
static_folder=job_submission_service.JOBS_OUTPUT_DIR)
flask_app.config['SERVER_NAME'] = RUN_CONFIG.get('server_public_host')
flask_app.config['SQLALCHEMY_DATABASE_URI'] = RUN_CONFIG.get('sql_alchemy').get('database_uri')
flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = RUN_CONFIG.get('sql_alchemy').get('track_modifications')
flask_app.config['SECRET_KEY'] = RUN_CONFIG.get('server_secret_key')
......
......@@ -75,6 +75,12 @@ except FileNotFoundError:
if not RUN_CONFIG.get('server_public_host'):
RUN_CONFIG['server_public_host'] = '0.0.0.0:5000'
if not RUN_CONFIG.get('base_path'):
RUN_CONFIG['base_path'] = ''
if not RUN_CONFIG.get('outputs_base_path'):
RUN_CONFIG['outputs_base_path'] = 'outputs'
# Hash keys and passwords
RUN_CONFIG['admin_password'] = hash_secret(RUN_CONFIG.get('admin_password'))
......
......@@ -12,6 +12,7 @@ import json
from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.blueprints.job_submission.services import job_submission_service
AGENT_RUN_DIR = RUN_CONFIG.get('status_agent_run_dir', str(Path().absolute()) + '/status_agents_run')
if not os.path.isabs(AGENT_RUN_DIR):
......@@ -179,6 +180,8 @@ def react_to_bjobs_json_output(json_output):
lsf_date_str = record['FINISH_TIME']
finished_at = parse_bjobs_output_date(lsf_date_str)
job.finished_at = finished_at
save_job_outputs(job)
delayed_job_models.save_job(job)
print(f'Job {job.id} with lsf id {job.lsf_job_id} new state is {new_status}')
......@@ -209,5 +212,57 @@ def parse_bjobs_output_date(lsf_date_str):
# Just return current date, to avoid date parsing issues. LSF is not responding to the -hms parameter
return datetime.datetime.now(tz=datetime.timezone.utc)
def save_job_outputs(job):
"""
Lists the files of the output dir of the job and saves the corresponding output objects
:param job: job that is finished
"""
print('SAVE JOB OUTPUTS ', job.id)
job_outputs_dir = job.output_dir_path
print('job_outputs_dir: ', job_outputs_dir)
paths_list = []
append_files_in_dir(job_outputs_dir, paths_list)
print('paths_list: ', paths_list)
for path in paths_list:
relative_path = path.replace(f'{job_submission_service.JOBS_OUTPUT_DIR}/', '', 1)
print('relative_path: ', relative_path)
output_url = get_output_file_url(relative_path)
print('output_url: ', output_url)
print('--------------')
def append_files_in_dir(path, paths_list):
"""
Appends to the lists all the paths of the files in path and subdirectories recursively
:param path: base directory for which to list the files
:param paths_list: list where to accumulate the paths
"""
for item in os.listdir(path):
abs_path = Path(path).joinpath(item).resolve()
print('abs_path: ', abs_path)
if os.path.isfile(abs_path):
paths_list.append(str(abs_path))
else:
append_files_in_dir(abs_path, paths_list)
def get_output_file_url(file_relative_path):
"""
:param file_relative_path: the relative path from the job outputs dir.
For example: Job-1/subdir/output_0.txt
:return: the url of an output file given a path from the job outputs dir
"""
server_name = RUN_CONFIG.get("server_public_host")
server_base_path = RUN_CONFIG.get('base_path', '')
if server_base_path == '':
server_base_path_with_slash = ''
else:
server_base_path_with_slash = f'{server_base_path}/'
outputs_base_path = RUN_CONFIG.get('outputs_base_path')
return f'{server_name}/{server_base_path_with_slash}{outputs_base_path}/{file_relative_path}'
......@@ -10,11 +10,13 @@ import shutil
import os
from sqlalchemy import and_
import flask
from app import create_app
from app.models import delayed_job_models
from app.config import RUN_CONFIG
from app.job_status_daemon import daemon
from app.blueprints.job_submission.services import job_submission_service
class TestJobStatusDaemon(unittest.TestCase):
......@@ -56,8 +58,10 @@ class TestJobStatusDaemon(unittest.TestCase):
type='TEST',
lsf_job_id=i,
status=status,
lsf_host=assigned_host
lsf_host=assigned_host,
)
job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
os.makedirs(job.output_dir_path, exist_ok=True)
delayed_job_models.save_job(job)
i += 1
......@@ -84,6 +88,8 @@ class TestJobStatusDaemon(unittest.TestCase):
status=status,
lsf_host=assigned_host
)
job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
os.makedirs(job.output_dir_path, exist_ok=True)
delayed_job_models.save_job(job)
i += 1
......@@ -274,3 +280,56 @@ class TestJobStatusDaemon(unittest.TestCase):
Generates some mock jobs, then sends a mock output to the function to test that it interprets that it finished.
The finished job should have now the output files set
"""
self.create_test_jobs_0()
sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_1.txt')
with self.flask_app.app_context():
# Prepare the test scenario
lsf_job_id = 4
job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
print('output_dir_path: ', job.output_dir_path)
daemon.parse_bjobs_output(sample_output)
output_urls_must_be = []
for i in range(0, 2):
for subdir in ['', 'subdir/']:
out_file_name = f'output_{i}.txt'
out_file_path = f'{job.output_dir_path}/{subdir}{out_file_name}'
print('out_file_path ', out_file_path)
os.makedirs(Path(out_file_path).parent, exist_ok=True)
with open(out_file_path, 'wt') as out_file:
out_file.write(f'This is output file {i}')
server_base_path = RUN_CONFIG.get('base_path', '')
if server_base_path == '':
server_base_path_with_slash = ''
else:
server_base_path_with_slash = f'{server_base_path}/'
outputs_base_path = RUN_CONFIG.get('outputs_base_path')
output_url_must_be = f'{RUN_CONFIG.get("server_public_host")}/' \
f'{server_base_path_with_slash}{outputs_base_path}/' \
f'{job.id}/{subdir}{out_file_name}'
output_urls_must_be.append(output_url_must_be)
print('output_url_must_be: ', output_url_must_be)
print('---')
print('output_urls_must_be: ', output_urls_must_be)
# FINISH to prepare the test scenario
job_outputs_got = job.output_files
print('job_outputs_got: ', job_outputs_got)
self.assertEqual(len(job_outputs_got), 4, msg='There must be 2 outputs for this job!')
base_path = RUN_CONFIG.get('base_path', '')
base_static_path = f'{base_path}/outputs'
print('base_static_path: ', base_static_path)
print('jobs output: ', flask.url_for('static', filename='some_file.txt'))
......@@ -353,4 +353,4 @@ def get_lsf_job_ids_to_check(lsf_host):
ids = [job.lsf_job_id for job in job_to_check_status]
DB.session.commit()
return ids
return ids
\ No newline at end of file
......@@ -15,4 +15,5 @@ server_public_host: some_server:30001 # The base url for the jobs to send feedba
enable_cors: True
generate_default_config: True # Generates a default configuration for the jobs
status_agent_run_dir: 'Where the status agents will run their scripts'
run_status_script: False # Sets if I should actually run the status script, if missing assumed true. Useful for testing
\ No newline at end of file
run_status_script: False # Sets if I should actually run the status script, if missing assumed true. Useful for testing
outputs_base_path: 'outputs' # base path for which to serve the job outputs under
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment