Commit 15edd207 authored by David Mendez's avatar David Mendez
Browse files

Implement the operation that deletes all jobs by a given type

parent 70ede7bd
......@@ -13,6 +13,7 @@ from app.blueprints.swagger_description.swagger_description_blueprint import SWA
from app.blueprints.job_submission.controllers.job_submissions_controller import SUBMISSION_BLUEPRINT
from app.blueprints.job_status.job_status_controller import JOB_STATUS_BLUEPRINT
from app.blueprints.admin.controllers.authorisation_controller import ADMIN_AUTH_BLUEPRINT
from app.blueprints.admin.controllers.admin_tasks_controller import ADMIN_TASKS_BLUEPRINT
def create_app():
......@@ -52,6 +53,7 @@ def create_app():
flask_app.register_blueprint(SUBMISSION_BLUEPRINT, url_prefix=f'{base_path}/submit')
flask_app.register_blueprint(JOB_STATUS_BLUEPRINT, url_prefix=f'{base_path}/status')
flask_app.register_blueprint(ADMIN_AUTH_BLUEPRINT, url_prefix=f'{base_path}/admin')
flask_app.register_blueprint(ADMIN_TASKS_BLUEPRINT, url_prefix=f'{base_path}/admin')
return flask_app
......
"""
Blueprint for the administrative tasks of the system
"""
from flask import Blueprint, jsonify, request, make_response
ADMIN_TASKS_BLUEPRINT = Blueprint('admin_tasks', __name__)
@ADMIN_TASKS_BLUEPRINT.route('/delete_all_jobs_by_type', methods = ['POST'])
def delete_all_jobs_by_type():
return jsonify({'operation_result': 'Done'})
\ No newline at end of file
......@@ -424,6 +424,23 @@ def delete_all_expired_jobs():
return num_deleted
def delete_all_jobs_by_type(job_type):
"""
Deletes all the jobs that have expired
:return: the number of jobs that were deleted.
"""
jobs_to_delete = DelayedJob.query.filter_by(type=job_type)
num_deleted = 0
for job in jobs_to_delete:
run_dir_path = job.run_dir_path
output_dir_path = job.output_dir_path
delete_job(job)
shutil.rmtree(run_dir_path, ignore_errors=True)
shutil.rmtree(output_dir_path, ignore_errors=True)
num_deleted += 1
return num_deleted
def get_lsf_job_ids_to_check(lsf_host):
"""
:param lsf_host: lsf host for which to return the jobs to check
......
......@@ -11,6 +11,7 @@ from pathlib import Path
from app import create_app
from app.models import delayed_job_models
from app.models.test import utils
class TestExpiredJobDeletion(unittest.TestCase):
......@@ -38,37 +39,6 @@ class TestExpiredJobDeletion(unittest.TestCase):
shutil.rmtree(self.ABS_RUN_DIR_PATH)
shutil.rmtree(self.ABS_OUT_DIR_PATH)
def simulate_inputs_to_job(self, job, job_run_dir):
"""
Creates some input files and adds it to the job object
:param job: job to add the input files to
:param job_run_dir: directory where the job runs
"""
inputs_path = os.path.join(job_run_dir, 'inputs')
for i in range(1, 5):
input_path = os.path.join(inputs_path, f'input{i}.txt')
with open(inputs_path, 'w') as input_file:
input_file.write(f'This is input {i}')
job_input_file = delayed_job_models.InputFile(internal_path=input_path)
delayed_job_models.add_input_file_to_job(job, job_input_file)
def simulate_outputs_of_job(self, job, output_dir):
"""
Creates some output files and adds it to the job object
:param job: job to add the output files to
:param output_dir: directory where the job writes the outputs
"""
outputs_path = os.path.join(output_dir, 'outputs')
for i in range(1, 5):
output_path = os.path.join(outputs_path, f'output{i}.txt')
with open(outputs_path, 'w') as output_file:
output_file.write(f'This is output {i}')
job_output_file = delayed_job_models.OutputFile(internal_path=output_path)
delayed_job_models.add_output_file_to_job(job, job_output_file)
def simulate_finished_job(self, expires_at):
"""
......@@ -95,13 +65,12 @@ class TestExpiredJobDeletion(unittest.TestCase):
output_dir = os.path.join(self.ABS_OUT_DIR_PATH, job.id)
job.output_dir_path = output_dir
os.makedirs(output_dir, exist_ok=True)
print('output_dir: ', output_dir)
# Add some inputs
self.simulate_inputs_to_job(job, job_run_dir)
utils.simulate_inputs_to_job(job, job_run_dir)
# Add some outputs
self.simulate_outputs_of_job(job, output_dir)
utils.simulate_outputs_of_job(job, output_dir)
job.status = delayed_job_models.JobStatuses.FINISHED
job.expires_at = expires_at
......
"""
Tests for deleting jobs of a certain type
"""
import unittest
from pathlib import Path
import os
import shutil
import random
import string
from app import create_app
from app.models import delayed_job_models
from app.models.test import utils
class TestJobDeletionByType(unittest.TestCase):
"""
Class to test deletion of jobs by a given type
"""
TEST_RUN_DIR_NAME = 'test_run_dir'
ABS_RUN_DIR_PATH = str(Path(TEST_RUN_DIR_NAME).resolve())
OUT_RUN_DIR_NAME = 'test_out_dir'
ABS_OUT_DIR_PATH = str(Path(OUT_RUN_DIR_NAME).resolve())
def setUp(self):
self.flask_app = create_app()
self.client = self.flask_app.test_client()
os.makedirs(self.ABS_RUN_DIR_PATH, exist_ok=True)
os.makedirs(self.ABS_OUT_DIR_PATH, exist_ok=True)
def tearDown(self):
with self.flask_app.app_context():
delayed_job_models.delete_all_jobs()
shutil.rmtree(self.ABS_RUN_DIR_PATH)
shutil.rmtree(self.ABS_OUT_DIR_PATH)
def simulate_finished_job_of_a_type(self, job_type):
"""
Creates a database a job that is finished. It will be of the type given as parameter.
There will be some randomness in the parameters to generate some random ids
:param type: type that you want the job to be
"""
params = {
'structure': ''.join(random.choice(string.ascii_lowercase) for i in range(10)),
}
docker_image_url = 'some_url'
job = delayed_job_models.get_or_create(job_type, params, docker_image_url)
# simulate it finished
job_run_dir = os.path.join(self.ABS_RUN_DIR_PATH, job.id)
job.run_dir_path = job_run_dir
os.makedirs(job_run_dir, exist_ok=True)
output_dir = os.path.join(self.ABS_OUT_DIR_PATH, job.id)
job.output_dir_path = output_dir
os.makedirs(output_dir, exist_ok=True)
# Add some inputs
utils.simulate_inputs_to_job(job, job_run_dir)
# Add some outputs
utils.simulate_outputs_of_job(job, output_dir)
job.status = delayed_job_models.JobStatuses.FINISHED
delayed_job_models.save_job(job)
return job
def test_deletes_all_jobs_of_a_given_type(self):
"""
Tests that the operation of deleting a job of a certain type is executed correctly.
"""
with self.flask_app.app_context():
type_to_delete = 'FOR_DELETION'
type_to_not_delete = 'DO_NOT_DELETE'
job_per_type = 6
for job_type in [type_to_delete, type_to_not_delete]:
for i in range(0, job_per_type):
self.simulate_finished_job_of_a_type(job_type)
num_deleted_got = delayed_job_models.delete_all_jobs_by_type(type_to_delete)
self.assertEqual(job_per_type, num_deleted_got, msg='The correct amount of jobs was not deleted!')
num_dirs_to_keep = 0
for job in delayed_job_models.DelayedJob.query.all():
should_have_been_deleted = job.type == type_to_delete
self.assertFalse(should_have_been_deleted,
msg='A job was not deleted correctly')
run_dir = job.run_dir_path
self.assertTrue(os.path.exists(run_dir),
msg="The job run dir was deleted, it didn't expire!")
out_dir = job.output_dir_path
self.assertTrue(os.path.exists(out_dir),
msg="The job output dir was deleted, it didn't expire!")
num_dirs_to_keep += 1
num_run_dirs_got = len(os.listdir(self.ABS_RUN_DIR_PATH))
self.assertEqual(num_run_dirs_got, num_dirs_to_keep, msg='Some run dirs were not deleted!')
num_out_dirs_got = len(os.listdir(self.ABS_OUT_DIR_PATH))
self.assertEqual(num_out_dirs_got, num_dirs_to_keep, msg='Some output dirs were not deleted!')
"""
Module with utils for these tests
"""
import os
from app.models import delayed_job_models
def simulate_inputs_to_job(job, job_run_dir):
"""
Creates some input files and adds it to the job object
:param job: job to add the input files to
:param job_run_dir: directory where the job runs
"""
inputs_path = os.path.join(job_run_dir, 'inputs')
for i in range(1, 5):
input_path = os.path.join(inputs_path, f'input{i}.txt')
with open(inputs_path, 'w') as input_file:
input_file.write(f'This is input {i}')
job_input_file = delayed_job_models.InputFile(internal_path=input_path)
delayed_job_models.add_input_file_to_job(job, job_input_file)
def simulate_outputs_of_job(job, output_dir):
"""
Creates some output files and adds it to the job object
:param job: job to add the output files to
:param output_dir: directory where the job writes the outputs
"""
outputs_path = os.path.join(output_dir, 'outputs')
for i in range(1, 5):
output_path = os.path.join(outputs_path, f'output{i}.txt')
with open(outputs_path, 'w') as output_file:
output_file.write(f'This is output {i}')
job_output_file = delayed_job_models.OutputFile(internal_path=output_path)
delayed_job_models.add_output_file_to_job(job, job_output_file)
\ No newline at end of file
......@@ -203,7 +203,8 @@ paths:
tags:
- 'Admin'
summary: 'Deletes all jobs of the type passed as parameter.'
description: 'Deletes all jobs of the type passed as parameter. Fails if the job type does not exist'
description: "Deletes all jobs of the type passed as parameter. If the job type does not exist it doesn't delete
anything"
operationId: 'admin_delete_all_jobs_by_type'
produces:
- 'application/json'
......@@ -220,8 +221,6 @@ paths:
$ref: '#/definitions/AdminOperationResult'
"401":
description: 'Invalid Admin token supplied'
'400':
description: 'The given type does not exist!'
security:
- adminTokenAuth: []
securityDefinitions:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment