test_status_daemon.py 18.3 KB
Newer Older
1 2 3 4
"""
This Module tests the basic functions of the status daemon
"""
import unittest
5 6
import socket
from pathlib import Path
7
from datetime import datetime, timedelta
8 9
from os import path
import shutil
10
import os
11

12 13
from sqlalchemy import and_

14
from app import create_app
15 16
from app.models import delayed_job_models
from app.config import RUN_CONFIG
17
from app.job_status_daemon import daemon
18
from app.blueprints.job_submission.services import job_submission_service
19
from app.job_status_daemon import locks
20

21 22 23 24 25 26 27 28 29 30

class TestJobStatusDaemon(unittest.TestCase):
    """
    Class to test Job Status Daemon
    """

    def setUp(self):
        self.flask_app = create_app()
        self.client = self.flask_app.test_client()

31 32 33 34
        with self.flask_app.app_context():
            delayed_job_models.delete_all_jobs()
            shutil.rmtree(daemon.AGENT_RUN_DIR, ignore_errors=True)

35
    def tearDown(self):
36 37
        with self.flask_app.app_context():
            delayed_job_models.delete_all_jobs()
38
            shutil.rmtree(daemon.AGENT_RUN_DIR, ignore_errors=True)
39 40 41 42 43 44 45 46 47 48

    def create_test_jobs_0(self):
        """
        This will create:
        - 2 Jobs in created state, each running in a different lsf cluster
        - 2 Jobs in queued state, each running in a different lsf cluster
        - 2 Jobs in running state, each running in a different lsf cluster
        - 2 Jobs in error state, each running in a different lsf cluster
        - 2 Jobs in finished state, each running in a different lsf cluster
        """
49 50 51 52 53
        lsf_config = RUN_CONFIG.get('lsf_submission')
        lsf_host = lsf_config['lsf_host']

        run_environment = RUN_CONFIG.get('run_env')

54 55 56 57 58 59 60 61 62 63 64 65
        with self.flask_app.app_context():

            i = 0
            for status in [delayed_job_models.JobStatuses.CREATED, delayed_job_models.JobStatuses.QUEUED,
                           delayed_job_models.JobStatuses.RUNNING, delayed_job_models.JobStatuses.FINISHED,
                           delayed_job_models.JobStatuses.ERROR]:

                for assigned_host in [lsf_host, 'another_host']:
                    job = delayed_job_models.DelayedJob(
                        id=f'Job-{assigned_host}-{status}',
                        type='TEST',
                        lsf_job_id=i,
66
                        status=status,
67
                        lsf_host=assigned_host,
68 69 70 71
                        run_environment=run_environment,
                        created_at=datetime.utcnow(),
                        started_at=datetime.utcnow() + timedelta(seconds=1),
                        finished_at=datetime.utcnow() + timedelta(seconds=2),
72
                    )
73 74
                    job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
                    os.makedirs(job.output_dir_path, exist_ok=True)
75 76 77
                    delayed_job_models.save_job(job)
                    i += 1

78 79 80 81 82 83
    def create_test_jobs_1(self):
        """
        This will create:
        - 2 Jobs in error state, each running in a different lsf cluster
        - 2 Jobs in finished state, each running in a different lsf cluster
        """
84 85 86 87
        run_environment = RUN_CONFIG.get('run_env')
        lsf_config = RUN_CONFIG.get('lsf_submission')
        lsf_host = lsf_config['lsf_host']

88 89 90 91 92 93 94 95 96 97 98 99
        with self.flask_app.app_context():

            i = 0
            for status in [delayed_job_models.JobStatuses.FINISHED,
                           delayed_job_models.JobStatuses.ERROR]:

                for assigned_host in [lsf_host, 'another_host']:
                    job = delayed_job_models.DelayedJob(
                        id=f'Job-{assigned_host}-{status}',
                        type='TEST',
                        lsf_job_id=i,
                        status=status,
100
                        lsf_host=assigned_host,
101 102 103 104
                        run_environment=run_environment,
                        created_at=datetime.utcnow(),
                        started_at=datetime.utcnow() + timedelta(seconds=1),
                        finished_at=datetime.utcnow() + timedelta(seconds=2)
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
                    )
                    job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
                    os.makedirs(job.output_dir_path, exist_ok=True)
                    delayed_job_models.save_job(job)
                    i += 1

    def create_test_jobs_2(self):
        """
        This will create:
        - 2 Jobs in created state, each running in a different run environment
        - 2 Jobs in queued state, each running in a different run environment
        - 2 Jobs in running state, each running in a different run environment
        - 2 Jobs in error state, each running in a different run environment
        - 2 Jobs in finished state, each running in a different run environment
        """
        lsf_config = RUN_CONFIG.get('lsf_submission')
        lsf_host = lsf_config['lsf_host']

        run_environment = RUN_CONFIG.get('run_env')

        with self.flask_app.app_context():

            i = 0
            for status in [delayed_job_models.JobStatuses.CREATED, delayed_job_models.JobStatuses.QUEUED,
                           delayed_job_models.JobStatuses.RUNNING, delayed_job_models.JobStatuses.FINISHED,
                           delayed_job_models.JobStatuses.ERROR]:

                for run_env in [run_environment, 'another_environment']:
                    job = delayed_job_models.DelayedJob(
                        id=f'Job-{status}-{run_env}',
                        type='TEST',
                        lsf_job_id=i,
                        status=status,
                        lsf_host=lsf_host,
139 140 141 142
                        run_environment=run_env,
                        created_at=datetime.utcnow(),
                        started_at=datetime.utcnow() + timedelta(seconds=1),
                        finished_at=datetime.utcnow() + timedelta(seconds=2)
143
                    )
144 145
                    job.output_dir_path = job_submission_service.get_job_output_dir_path(job)
                    os.makedirs(job.output_dir_path, exist_ok=True)
146 147 148
                    delayed_job_models.save_job(job)
                    i += 1

149
    # TODO: CONTINUE HERE
150
    def test_produces_a_correct_job_status_check_script_path(self):
151 152 153
        """
        Test that produces a correct path for the job status script
        """
154 155 156 157 158 159

        filename = f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_check_lsf_job_status.sh'
        job_status_check_script_path_must_be = Path(daemon.AGENT_RUN_DIR).joinpath(socket.gethostname(), filename)

        job_status_check_script_path_got = daemon.get_check_job_status_script_path()

160 161
        # remove the last character (the second) to avoid annoying false negatives
        self.assertEqual(str(job_status_check_script_path_must_be)[:-1], str(job_status_check_script_path_got)[:-1],
162
                         msg='The path for the job status checking job was not produced correctly!')
163

164 165 166 167 168 169 170 171 172 173
    def test_prepares_the_job_status_script(self):
        """
        Test that the job status script is created and can be executed.
        """
        self.create_test_jobs_0()

        with self.flask_app.app_context():
            lsf_ids_to_check = daemon.get_lsf_job_ids_to_check()
            script_path_got = daemon.prepare_job_status_check_script(lsf_ids_to_check)
            self.assertTrue(path.isfile(script_path_got), msg='The job status check script has not been created!')
174 175 176

            self.assertTrue(os.access(script_path_got, os.X_OK),
                            msg=f'The script file for the job ({script_path_got}) is not executable!')
177

178 179 180 181 182 183 184 185 186 187 188
    def load_sample_file(self, file_path):
        """
        Loads a file with a sample read from the path specified as a parameter
        :param file_path: path of the sample
        :return: a string with the contents of the file
        """
        sample_output_file_path = Path(file_path).resolve()
        with open(sample_output_file_path, 'rt') as sample_output_file:
            sample_output = sample_output_file.read()
            return sample_output

189 190 191 192 193 194
    def test_parses_the_output_of_bjobs_when_no_jobs_were_found(self):
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly
        """
        self.create_test_jobs_0()
195
        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_0.txt')
196

197 198 199
        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)
            # No status should have changed
200

201
            for status_must_be in [delayed_job_models.JobStatuses.CREATED, delayed_job_models.JobStatuses.QUEUED,
202 203
                                   delayed_job_models.JobStatuses.RUNNING, delayed_job_models.JobStatuses.FINISHED,
                                   delayed_job_models.JobStatuses.ERROR]:
204

205 206
                lsf_config = RUN_CONFIG.get('lsf_submission')
                lsf_host = lsf_config['lsf_host']
207

208 209 210 211 212 213
                for assigned_host in [lsf_host, 'another_host']:
                    id_to_check = f'Job-{assigned_host}-{status_must_be}'
                    job = delayed_job_models.get_job_by_id(id_to_check)
                    status_got = job.status
                    self.assertEqual(status_got, status_must_be,
                                     msg='The status was modified! This should have not modified the status')
214

215
    def test_parses_the_output_of_bjobs_running_job(self):
216 217 218 219 220 221
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly. This test focuses on a job that switched to running state.
        """
        self.create_test_jobs_0()

222 223 224 225 226 227 228 229 230 231
        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_1.txt')

        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)
            # job with lsf id 0 should be in running state now
            lsf_job_id = 0
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
            status_got = job.status
            status_must_be = delayed_job_models.JobStatuses.RUNNING
            self.assertEqual(status_got, status_must_be, msg='The status of the job was not changed accordingly!')
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249

    def test_parses_the_output_of_bjobs_pending_job(self):
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly. This test focuses on a job that switched to pending state.
        """
        self.create_test_jobs_0()

        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_2.txt')

        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)
            # job with lsf id 0 should be in running state now
            lsf_job_id = 0
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
            status_got = job.status
            status_must_be = delayed_job_models.JobStatuses.QUEUED
            self.assertEqual(status_got, status_must_be, msg='The status of the job was not changed accordingly!')
250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267

    def test_parses_the_output_of_bjobs_error_job(self):
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly. This test focuses on a job that switched to error state.
        """
        self.create_test_jobs_0()

        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_1.txt')

        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)
            # job with lsf id 0 should be in running state now
            lsf_job_id = 2
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
            status_got = job.status
            status_must_be = delayed_job_models.JobStatuses.ERROR
            self.assertEqual(status_got, status_must_be, msg='The status of the job was not changed accordingly!')
268

269 270 271 272 273 274 275 276 277 278 279 280 281 282
    def test_parses_the_output_of_bjobs_job_not_found(self):
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly. This test focuses on an error on the system but still gives output
        """
        self.create_test_jobs_0()

        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_3.txt')
        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)

            lsf_job_id = 2
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
            status_got = job.status
283
            status_must_be = delayed_job_models.JobStatuses.UNKNOWN
284
            self.assertEqual(status_got, status_must_be, msg='The status of the job was not changed accordingly!')
285

David Mendez's avatar
David Mendez committed
286
    def test_parses_the_output_of_bjobs_finished_job(self):
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
        """
        Generates mock jobs, then sends a mock output to the the function to test that it interpreted the output
        accordingly. This test focuses on a job that switched to finished state.
        """
        self.create_test_jobs_0()

        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_1.txt')

        with self.flask_app.app_context():
            daemon.parse_bjobs_output(sample_output)
            # job with lsf id 0 should be in running state now
            lsf_job_id = 4
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)
            status_got = job.status
            status_must_be = delayed_job_models.JobStatuses.FINISHED
            self.assertEqual(status_got, status_must_be, msg='The status of the job was not changed accordingly!')
303

304 305 306 307
            finished_time = job.finished_at

            delta = timedelta(days=RUN_CONFIG.get('job_expiration_days'))
            expiration_date_must_be = finished_time + delta
308
            expiration_date_must_be_timestamp = expiration_date_must_be.timestamp()
309

310
            expiration_date_got = job.expires_at
311 312 313 314
            expiration_date_got_timestamp = expiration_date_got.timestamp()

            self.assertAlmostEqual(expiration_date_got_timestamp, expiration_date_must_be_timestamp,
                                   msg='The expiration date was not calculated correctly', delta=1)
315

316 317 318 319 320
    def test_collects_the_urls_for_the_outputs_of_a_finished_job(self):
        """
        Generates some mock jobs, then sends a mock output to the function to test that it interprets that it finished.
        The finished job should have now the output files set
        """
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
        self.create_test_jobs_0()

        sample_output = self.load_sample_file('app/job_status_daemon/test/data/sample_lsf_output_1.txt')

        with self.flask_app.app_context():
            # Prepare the test scenario
            lsf_job_id = 4
            job = delayed_job_models.get_job_by_lsf_id(lsf_job_id)

            output_urls_must_be = []

            for i in range(0, 2):

                for subdir in ['', 'subdir/']:

                    out_file_name = f'output_{i}.txt'
                    out_file_path = f'{job.output_dir_path}/{subdir}{out_file_name}'
                    os.makedirs(Path(out_file_path).parent, exist_ok=True)
                    with open(out_file_path, 'wt') as out_file:
                        out_file.write(f'This is output file {i}')

                    server_base_path = RUN_CONFIG.get('base_path', '')
                    if server_base_path == '':
                        server_base_path_with_slash = ''
                    else:
                        server_base_path_with_slash = f'{server_base_path}/'

                    outputs_base_path = RUN_CONFIG.get('outputs_base_path')
349
                    output_url_must_be = f'/{server_base_path_with_slash}{outputs_base_path}/' \
350 351 352 353
                                         f'{job.id}/{subdir}{out_file_name}'

                    output_urls_must_be.append(output_url_must_be)

354
            # END to prepare the test scenario
355 356

            daemon.parse_bjobs_output(sample_output)
357
            job_outputs_got = job.output_files
358
            self.assertEqual(len(job_outputs_got), 4, msg='There must be 4 outputs for this job!')
359

360 361
            for output_file in job.output_files:
                output_url_got = output_file.public_url
362 363 364 365 366 367 368
                self.assertIn(output_url_got, output_urls_must_be, msg='The output url was not set correctly')

    def test_daemon_creates_lock_when_checking_lsf(self):
        """
        Tests that the daemon creates a lock while checking LSF
        """
        with self.flask_app.app_context():
369
            daemon.check_jobs_status(delete_lock_after_finishing=False)
370
            current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
371 372

            lock_got = locks.get_lock_for_lsf_host(current_lsf_host)
373 374
            self.assertIsNotNone(lock_got, msg='The LSF lock was not created!')

375 376 377 378 379
            lock_hostname_got = lock_got.get('owner')
            lock_hostname_must_be = socket.gethostname()

            self.assertEqual(lock_hostname_got, lock_hostname_must_be, msg='The lock was not saved correctly!')

380 381
            locks.delete_lsf_lock(current_lsf_host)

382 383
    def test_agent_respects_a_lock(self):
        """
384 385 386 387 388
        Tests that when a lock has been created for another host, the agent respects it. This means that
        the agent does not check anything in lsf
        """
        with self.flask_app.app_context():
            current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')
389
            locks.set_lsf_lock(current_lsf_host, 'another_owner')
390

391 392
            sleep_time_got, jobs_were_checked = daemon.check_jobs_status()
            self.assertFalse(jobs_were_checked, msg='The jobs should have not been checked')
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410

            min_sleep_time = RUN_CONFIG.get('status_agent').get('min_sleep_time')
            max_sleep_time = RUN_CONFIG.get('status_agent').get('max_sleep_time')
            self.assertTrue(min_sleep_time <= sleep_time_got <= max_sleep_time,
                            msg='The sleep time was not calculated correctly!')

            locks.delete_lsf_lock(current_lsf_host)

    def test_deletes_lock_after_finishing(self):
        """
        Tests that it requests the deletion of the lock after checking the jobs
        """
        with self.flask_app.app_context():
            daemon.check_jobs_status()
            current_lsf_host = RUN_CONFIG.get('lsf_submission').get('lsf_host')

            lock_got = locks.get_lock_for_lsf_host(current_lsf_host)
            self.assertIsNone(lock_got, msg='The LSF lock was not deleted!')