Commit 7128a126 authored by David Mendez's avatar David Mendez
Browse files

Merge branch 'staging' into 'master'

Remove cache since now not neccessary

See merge request !27
parents b197e6fe 5f971351
Pipeline #266183 failed with stage
in 1 minute and 17 seconds
......@@ -6,7 +6,6 @@ from flask import Flask
from flask_cors import CORS
from app.config import RUN_CONFIG
from app.cache import CACHE
from app.blueprints.swagger_description.swagger_description_blueprint import SWAGGER_BLUEPRINT
from app.blueprints.subset_generator_blueprint.controllers.subset_generator_controller import SUBSET_GENERATOR_BLUEPRINT
from app.blueprints.admin.controllers.authorisation_controller import ADMIN_AUTH_BLUEPRINT
......@@ -31,7 +30,6 @@ def create_app():
flask_app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = RUN_CONFIG.get('sql_alchemy').get('track_modifications')
flask_app.config['SECRET_KEY'] = RUN_CONFIG.get('server_secret_key')
with flask_app.app_context():
DB.init_app(flask_app)
......@@ -39,17 +37,6 @@ def create_app():
if create_tables:
DB.create_all()
CACHE.init_app(flask_app)
# pylint: disable=protected-access
if RUN_CONFIG.get('cache_config').get('CACHE_TYPE') == 'memcached':
CACHE.cache._client.behaviors['tcp_nodelay'] = True
CACHE.cache._client.behaviors['_noreply'] = True
CACHE.cache._client.behaviors['no_block'] = True
CACHE.cache._client.behaviors['remove_failed'] = 10
CACHE.cache._client.behaviors['retry_timeout'] = 10
CACHE.cache._client.behaviors['retry_timeout'] = 600
flask_app.register_blueprint(SWAGGER_BLUEPRINT, url_prefix=f'{base_path}/swagger')
flask_app.register_blueprint(SUBSET_GENERATOR_BLUEPRINT, url_prefix=f'{base_path}/es_subsets')
flask_app.register_blueprint(ADMIN_AUTH_BLUEPRINT, url_prefix=f'{base_path}/admin')
......
"""
Module that handles the connection with the cache
"""
from flask_caching import Cache
from app.config import RUN_CONFIG
CACHE = Cache(config=RUN_CONFIG['cache_config'])
......@@ -8,12 +8,13 @@ ELASTICSEARCH_HOST = RUN_CONFIG.get('elasticsearch').get('host')
ELASTICSEARCH_PORT = RUN_CONFIG.get('elasticsearch').get('port')
ELASTICSEARCH_USERNAME = RUN_CONFIG.get('elasticsearch').get('username')
ELASTICSEARCH_PASSWORD = RUN_CONFIG.get('elasticsearch').get('password')
ELASTICSEARCH_TIMEOUT = RUN_CONFIG.get('elasticsearch').get('timeout', 30)
ELASTICSEARCH_TIMEOUT = RUN_CONFIG.get('elasticsearch').get('timeout', 300)
ELASTICSEARCH_RETRY_ON_TIMEOUT = RUN_CONFIG.get('elasticsearch').get('retry_on_timeout', True)
ES = elasticsearch.Elasticsearch(
hosts=[ELASTICSEARCH_HOST],
http_auth=(ELASTICSEARCH_USERNAME, ELASTICSEARCH_PASSWORD),
retry_on_timeout=ELASTICSEARCH_RETRY_ON_TIMEOUT,
port=ELASTICSEARCH_PORT
port=ELASTICSEARCH_PORT,
timeout=ELASTICSEARCH_TIMEOUT,
)
......@@ -360,7 +360,10 @@ def delete_all_expired_tasks():
for task in tasks_to_delete:
delete_task(task)
ES.indices.delete(index=task.destination_index)
try:
ES.indices.delete(index=task.destination_index)
except es_exceptions.NotFoundError:
pass # if the index has already been deleted it's fine
num_deleted += 1
return num_deleted
......
......@@ -3,7 +3,6 @@ Module that implements the daemon that checks for reindexing tasks that need to
"""
import socket
import datetime
import random
import json
from app import app_logging
......@@ -11,51 +10,6 @@ from app.es_subset_generator import reindexer
from app.tasks.models import task_models
from app.task_statistics import statistics_saver
from app.config import RUN_CONFIG
from app.tasks_daemon import locks
def check_for_tasks_to_do():
"""
Checks if there are reindexing tasks to perform
:return: (sleeptime, jobs_were_checked) the amount of seconds to wait for the next run and if the tasks to do were
checked or not
"""
my_hostname = socket.gethostname()
min_sleep_time = RUN_CONFIG.get('tasks_daemon').get('min_sleep_time')
max_sleep_time = RUN_CONFIG.get('tasks_daemon').get('max_sleep_time')
sleep_time = random.uniform(min_sleep_time, max_sleep_time)
existing_lock = locks.get_lock()
if existing_lock is not None:
print(f'I ({my_hostname}) found a lock, waiting {sleep_time} seconds before checking again')
return sleep_time, False
print(f'Locking task queue access, I am {my_hostname}')
locks.set_lsf_lock(my_hostname)
print('Checking for tasks to do...')
pop_task_to_do()
locks.delete_lsf_lock()
return sleep_time, True
def pop_task_to_do():
"""
Pops from the database a task to perform
:return: the task if a task was performed. None if no task to do was found.
"""
task_to_do = task_models.pop_task_to_do()
if task_to_do is None:
app_logging.info(f'No tasks to do found')
return None
app_logging.info(f'Popped task {task_to_do.id}')
perform_task(task_to_do)
return task_to_do
def perform_task(task):
......
"""
Module that handles the locking system for the task daemons
"""
from app.config import RUN_CONFIG
from app.cache import CACHE
LOCK_KEY = 'ES-SUBSET-GENERATOR-TASKS-DAEMON-LOCK'
def get_lock():
"""
Returns a lock if it exists
:return: dict with the lock, None if it doesn't exist
"""
return CACHE.get(key=LOCK_KEY)
def set_lsf_lock(lock_owner):
"""
Creates a lock on the lsf_host given as parameter in the name of the owner given as parameter, it will expire in the
time set up in the configuration, set by the value status_agent.lock_validity_seconds
:param lsf_host: cluster to lock
:param lock_owner: identifier (normally a hostname) of the process that owns the lock
"""
seconds_valid = RUN_CONFIG.get('tasks_daemon').get('lock_validity_seconds')
lock_dict = {
'owner': lock_owner
}
CACHE.set(key=LOCK_KEY, value=lock_dict, timeout=seconds_valid)
def delete_lsf_lock():
"""
Deletes the lock for the lsf host passed as parameter
:param lsf_host: lsf host for which to delete the lock
"""
CACHE.delete(key=LOCK_KEY)
......@@ -153,7 +153,7 @@ class TestTaskHandling(unittest.TestCase):
]
task_id = subset_generator.prepare_index_and_create_reindex_task(origin_index, items_ids, fields_list)
popped_task = daemon.pop_task_to_do()
popped_task = task_models.get_task_by_id(task_id)
task_id_performed = daemon.perform_task(popped_task)
self.assertEqual(task_id, task_id_performed,
......@@ -196,7 +196,7 @@ class TestTaskHandling(unittest.TestCase):
]
task_id = subset_generator.prepare_index_and_create_reindex_task(origin_index, items_ids, fields_list)
popped_task = daemon.pop_task_to_do()
popped_task = task_models.get_task_by_id(task_id)
task_id_performed = daemon.perform_task(popped_task)
self.assertEqual(task_id, task_id_performed,
......
......@@ -9,10 +9,7 @@ flask-cors==3.0.8
flask-sqlalchemy==2.4.1
requests==2.24.0
pyjwt==1.7.1
Flask-Caching==1.9.0
psycopg2==2.9.3
pylibmc==1.6.1
pymemcache==3.1.0
Werkzeug==1.0.1
itsdangerous==2.0.1
Jinja2==3.0.3
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment