Skip to content
Snippets Groups Projects
Commit 1f4ca4bf authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

set up pydantic-settings

parent 9dc8ebaa
No related branches found
No related tags found
No related merge requests found
......@@ -883,6 +883,25 @@ files = [
[package.dependencies]
typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
[[package]]
name = "pydantic-settings"
version = "2.2.1"
description = "Settings management using Pydantic"
optional = false
python-versions = ">=3.8"
files = [
{file = "pydantic_settings-2.2.1-py3-none-any.whl", hash = "sha256:0235391d26db4d2190cb9b31051c4b46882d28a51533f97440867f012d4da091"},
{file = "pydantic_settings-2.2.1.tar.gz", hash = "sha256:00b9f6a5e95553590434c0fa01ead0b216c3e10bc54ae02e37f359948643c5ed"},
]
[package.dependencies]
pydantic = ">=2.3.0"
python-dotenv = ">=0.21.0"
[package.extras]
toml = ["tomli (>=2.0.1)"]
yaml = ["pyyaml (>=6.0.1)"]
[[package]]
name = "pyliftover"
version = "0.4.1"
......@@ -1538,4 +1557,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.12"
content-hash = "de29b14f18c17cc28ae1e11d5fccf69b571e725f226cc50709177b277c893570"
content-hash = "32d9727b794d46dc74a8ee81f9e990207ebbff9a70deb2c50b3751f8d91ecd4b"
......@@ -16,6 +16,7 @@ uvicorn = {extras = ["standard"], version = "^0.29.0"}
pgscatalog-core = "^0.1.0"
transitions = "^0.9.0"
google-cloud-storage = "^2.16.0"
pydantic-settings = "^2.2.1"
[tool.poetry.group.dev.dependencies]
pytest = "^8.1.1"
......
import asyncio
import logging
import pathlib
import tempfile
import httpx
logger = logging.getLogger(__name__)
log_fmt = "%(name)s: %(asctime)s %(levelname)-8s %(message)s"
logging.basicConfig(format=log_fmt, datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
SHELF_LOCK = asyncio.Lock()
CLIENT = httpx.AsyncClient()
TEMP_DIR = tempfile.mkdtemp()
SHELF_PATH = str(pathlib.Path(TEMP_DIR) / "shelve.dat")
logger.info(f"Created temporary shelf file {SHELF_PATH}")
from pydantic import Field, DirectoryPath
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
HELM_CHART_PATH: DirectoryPath = Field(
description="Path to the helmvatti chart directory"
)
TIMEOUT_SECONDS: int = Field(
gt=0,
default=60 * 60 * 24,
description="Number of seconds before active (requested, created, "
"or deployed) jobs are transitioned to FAILED state",
)
settings = Settings()
import asyncio
import pathlib
import shutil
import tempfile
from contextlib import asynccontextmanager
import datetime
import httpx
from fastapi import FastAPI, BackgroundTasks, HTTPException
import logging
import shelve
......@@ -15,22 +12,18 @@ from starlette import status
from .job import PolygenicScoreJob
from .jobmodels import JobModel
from .logmodels import LogMessage, LogEvent, MonitorMessage, SummaryTrace
from .config import settings
from . import CLIENT, SHELF_LOCK, SHELF_PATH, TEMP_DIR
logger = logging.getLogger()
logger.setLevel(logging.INFO)
SHELF_LOCK = asyncio.Lock()
CLIENT = httpx.AsyncClient()
@asynccontextmanager
async def lifespan(app: FastAPI):
tempdir = tempfile.mkdtemp()
Config.SHELF_PATH = pathlib.Path(tempdir) / "shelve.dat"
logger.info(f"Created temporary shelf file {Config.SHELF_PATH}")
yield
shutil.rmtree(tempdir)
logger.info(f"Cleaned up {Config.SHELF_PATH}")
shutil.rmtree(TEMP_DIR)
logger.info(f"Cleaned up {SHELF_PATH}")
# close the connection pool
await CLIENT.aclose()
logger.info("Closed httpx thread pool")
......@@ -47,7 +40,7 @@ async def launch_job(job_model: JobModel):
await job_instance.create(job_model=job_model, client=CLIENT)
async with SHELF_LOCK:
with shelve.open(Config.SHELF_PATH) as db:
with shelve.open(SHELF_PATH) as db:
db[id] = job_instance
......@@ -55,11 +48,11 @@ async def timeout_job(job_id: str):
"""Background task to check if a job is still on the shelf after a timeout.
If it is, trigger the error state, which will force a cleanup and notify the backend"""
logger.info(f"Async timeout for {Config.TIMEOUT_SECONDS}s started for {job_id}")
await asyncio.sleep(Config.TIMEOUT_SECONDS)
logger.info(f"Async timeout for {settings.TIMEOUT_SECONDS}s started for {job_id}")
await asyncio.sleep(settings.TIMEOUT_SECONDS)
async with SHELF_LOCK:
with shelve.open(Config.SHELF_PATH) as db:
with shelve.open(SHELF_PATH) as db:
job_instance: PolygenicScoreJob = db.get(job_id, None)
if job_instance is not None:
......@@ -74,7 +67,7 @@ async def timeout_job(job_id: str):
@app.post("/launch", status_code=status.HTTP_201_CREATED)
async def launch(job: JobModel, background_tasks: BackgroundTasks):
with shelve.open(Config.SHELF_PATH) as db:
with shelve.open(SHELF_PATH) as db:
if job.pipeline_param.id in db:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
......@@ -118,20 +111,15 @@ async def monitor(message: LogMessage):
async def update_job_state(state, message: MonitorMessage, delete=False):
with shelve.open(Config.SHELF_PATH) as db:
with shelve.open(SHELF_PATH) as db:
job_instance: PolygenicScoreJob = db[message.run_name]
logger.info(f"Triggering state {state}")
await job_instance.trigger(state, client=CLIENT, message=message)
async with SHELF_LOCK:
with shelve.open(Config.SHELF_PATH) as db:
with shelve.open(SHELF_PATH) as db:
if not delete:
db[message.run_name] = job_instance
else:
db.pop(message.run_name)
class Config:
SHELF_PATH = None
TIMEOUT_SECONDS = 60 * 60 * 24
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment