Skip to content
Snippets Groups Projects
Commit d6622b88 authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

end to end tests

parent 6d604d8f
No related branches found
No related tags found
No related merge requests found
......@@ -4,7 +4,7 @@ metadata:
name: {{ .Release.Name }}-globus
stringData:
secrets.properties: |
globus.guest-collection.domain={{ .Values.secrets.GLOBUS_DOMAIN }}
globus.aai.client-id={{ .Values.secrets.GLOBUS_CLIENT_ID }}
globus.aai.client-secret={{ .Values.secrets.GLOBUS_CLIENT_SECRET }}
globus.aai.scopes={{ .Values.secrets.GLOBUS_SCOPES }}
\ No newline at end of file
globus.guest-collection.domain={{ .Values.secrets.globusDomain }}
globus.aai.client-id={{ .Values.secrets.globusClientId }}
globus.aai.client-secret={{ .Values.secrets.globusClientSecret }}
globus.aai.scopes={{ .Values.secrets.globusScopes }}
\ No newline at end of file
......@@ -45,10 +45,9 @@ globflowParams:
# values are set with --set, using environment variables from the local machine
# https://stackoverflow.com/a/49930497
secrets:
GLOBUS_DOMAIN: dummy
GLOBUS_CLIENT_ID: dummy
GLOBUS_CLIENT_SECRET: dummy
GLOBUS_SCOPES: dummy
globusDomain: dummy
globusClientId: dummy
globusClientSecret: dummy
globusScopes: dummy
towerToken: dummy
towerId: dummy
import enum
from typing import Optional
from pydantic import Field, DirectoryPath
from pydantic_settings import BaseSettings
class K8SNamespace(enum.Enum):
DEV = "intervene-dev"
TEST = "intervene-test"
PROD = "intervene-prod"
def __str__(self):
return str(self.value)
class Settings(BaseSettings):
HELM_CHART_PATH: DirectoryPath = Field(
description="Path to the helmvatti chart directory"
......@@ -16,12 +26,20 @@ class Settings(BaseSettings):
)
TOWER_TOKEN: str = Field(description="Seqera platform token")
TOWER_WORKSPACE: str = Field(description="Seqera platform workspace ID")
GLOBUS_DOMAIN: str = Field(description="Globus collection domain")
GLOBUS_CLIENT_ID: str = Field(description="Globus client ID")
GLOBUS_CLIENT_SECRET: str = Field(description="Secret for Globus API")
GLOBUS_SCOPES: str = Field(description="Globus scopes")
GCP_PROJECT: Optional[str] = Field(
default=None, description="Google Cloud Platform (GCP) project ID"
)
GCP_LOCATION: Optional[str] = Field(
default=None, description="Location to request GCP resources from"
)
NAMESPACE: K8SNamespace = Field(
default=K8SNamespace.DEV,
description="Kubernetes namespace to deploy resources to",
)
settings = Settings()
"""
This module provides classes for validating and rendering a helm template.
It's assumed input parameters are validated by JobModels. This module aims to model and
validate generated job configuration, like work bucket names.
"""
import json
import pathlib
from functools import lru_cache
from typing import Optional
import yaml
from pydantic import BaseModel, Field, field_validator
from fastapi.encoders import jsonable_encoder
from .config import settings
from .jobmodels import JobModel
@lru_cache
def parse_value_template():
values_template = pathlib.Path(settings.HELM_CHART_PATH / "values.yaml")
values_template = pathlib.Path(settings.HELM_CHART_PATH / "values-example.yaml")
return yaml.safe_load(values_template.read_text())
......@@ -20,14 +29,7 @@ def check_gcp_bucket(name: str) -> str:
return name
def valid_target_build(cls, target_build: str) -> str:
good_builds = ["GRCh37", "GRCh38"]
if target_build not in good_builds:
raise ValueError(f"Target build not in {good_builds}")
return target_build
class HelmNextflowValues(BaseModel, validate_assignment=True):
class NextflowParams(BaseModel, validate_assignment=True):
"""Represents nextflow configuration values that can be templated by helm"""
workBucketPath: str
......@@ -40,19 +42,46 @@ class HelmNextflowValues(BaseModel, validate_assignment=True):
check_bucket = field_validator("workBucketPath")(check_gcp_bucket)
class HelmJobValues(BaseModel, validate_assignment=True):
class CalcJobParams(BaseModel, validate_assignment=True):
"""Represents workflow instance values that can be templated by helm"""
input: str
min_overlap: float = Field(ge=0, le=1)
pgs_id: Optional[str] = Field(pattern="PGS[0-9]{6}")
pgp_id: Optional[str] = Field(pattern="PGP[0-9]{6}")
pgs_id: Optional[str]
pgp_id: Optional[str]
trait_efo: Optional[str]
target_build: str
format: str
outdir: str
check_bucket = field_validator("outdir")(check_gcp_bucket)
check_build = field_validator("target_build")(valid_target_build)
class JobInput(BaseModel):
sampleset: str
chrom: Optional[str]
vcf_import_dosage: bool
geno: str
pheno: str
variants: str
format: str
class GlobflowParams(BaseModel):
input: str
outdir: str
config_secrets: str
class Secrets(BaseModel):
"""These secrets must be templated with pyvatti environment variables"""
globusDomain: str
globusClientId: str
globusClientSecret: str
globusScopes: str
towerToken: str
towerId: str
class HelmValues(BaseModel, validate_assignment=True):
......@@ -61,14 +90,61 @@ class HelmValues(BaseModel, validate_assignment=True):
baseImage: str
dockerTag: str
pullPolicy: str
towerToken: str
towerId: str
# do model these
nxf: HelmNextflowValues
job: HelmJobValues
json_input: str
# don't model this
serviceAccount: dict
nxfParams: NextflowParams
# a JSON string
calcWorkflowInput: str
calcJobParams: CalcJobParams
# a JSON string
globflowInput: str
globflowParams: GlobflowParams
secrets: Secrets
# don't model this
serviceAccount: dict
HELM_VALUES = HelmValues(**parse_value_template())
def _add_secrets(job: HelmValues):
"""Add secrets from the fastAPI settings object"""
job.secrets.towerToken = settings.TOWER_TOKEN
job.secrets.towerId = settings.TOWER_WORKSPACE
job.secrets.globusDomain = settings.GLOBUS_DOMAIN
job.secrets.globusClientId = settings.GLOBUS_CLIENT_ID
job.secrets.globusClientSecret = settings.GLOBUS_CLIENT_SECRET
job.secrets.globusScopes = settings.GLOBUS_SCOPES
def _add_bucket_path(job, bucketPath):
"""Add bucket details to the job request"""
for x in ("geno", "pheno", "variants"):
for genome in job.pipeline_param.target_genomes:
setattr(genome, x, f"gs://{bucketPath}/data/{getattr(genome, x)}")
def render_template(
job: JobModel, work_bucket_path: str, results_bucket_path: str
) -> dict:
"""Render the helm template using new values from the job model"""
_add_bucket_path(job, work_bucket_path)
job_values: HelmValues = HelmValues(**parse_value_template())
_add_secrets(job_values)
# set bucket paths to follow nextflow standards (gs:// prefix and can't use root of bucket)
job_values.globflowParams.outdir = f"gs://{work_bucket_path}/data"
job_values.nxfParams.workBucketPath = f"gs://{work_bucket_path}/work"
job_values.calcJobParams.outdir = f"gs://{results_bucket_path}/results"
job_values.calcWorkflowInput = json.dumps(
job.pipeline_param.target_genomes, default=jsonable_encoder
).replace("\n", "")
job_values.globflowInput = job.globus_details.json()
for x in ("pgs_id", "pgp_id", "trait_efo", "target_build"):
setattr(
job_values.calcJobParams, x, getattr(job.pipeline_param.nxf_params_file, x)
)
return job_values.dict()
# type: ignore
"""This module contains a state machine that represents job states and their transitions"""
import enum
import logging
from transitions import EventData
from transitions.extensions.asyncio import AsyncMachine
from .resources import GoogleResourceHandler
from .jobstates import States
logger = logging.getLogger(__name__)
class States(enum.Enum):
REQUESTED = "requested"
CREATED = "created"
DEPLOYED = "deployed"
FAILED = "failed"
SUCCEEDED = "succeeded"
class PolygenicScoreJob(AsyncMachine):
"""This is a state machine for polygenic score calculation jobs
......@@ -145,7 +137,7 @@ class PolygenicScoreJob(AsyncMachine):
async def destroy_resources(self, event: EventData):
"""Delete all resources associated with this job"""
print(f"deleting all resources: {self.intp_id}")
await self.handler.destroy_resources()
await self.handler.destroy_resources(state=event.state.value)
async def notify(self, event):
"""Notify the backend about the job state"""
......
......@@ -6,7 +6,7 @@ from typing import Optional, Self
from pgscatalog.core import GenomeBuild
from pydantic import BaseModel, UUID4, model_validator, field_validator
from pydantic import BaseModel, model_validator, field_validator, Field
class GlobusFile(BaseModel):
......@@ -19,7 +19,6 @@ class GlobusFile(BaseModel):
class GlobusConfig(BaseModel):
"""Details required to stage files from Globus for working on"""
guest_collection_id: UUID4
dir_path_on_guest_collection: str
files: list[GlobusFile]
......@@ -59,8 +58,8 @@ class SamplesheetFormat(enum.Enum):
class PGSParams(BaseModel):
"""Runtime parameters for the PGS calculation workflow"""
pgs_id: Optional[str] = None
pgp_id: Optional[str] = None
pgs_id: Optional[str] = Field(pattern="PGS[0-9]{6}", default=None)
pgp_id: Optional[str] = Field(pattern="PGP[0-9]{6}", default=None)
trait_efo: Optional[str] = None
target_build: GenomeBuild
format: SamplesheetFormat = SamplesheetFormat.JSON
......
import enum
class States(enum.Enum):
REQUESTED = "requested"
CREATED = "created"
DEPLOYED = "deployed"
FAILED = "failed"
SUCCEEDED = "succeeded"
......@@ -3,11 +3,15 @@
import abc
import asyncio
import logging
import tempfile
import yaml
from google.cloud import storage
from .jobmodels import JobModel
from .helm import render_template
from .config import settings
from .jobstates import States
logger = logging.getLogger(__name__)
......@@ -28,10 +32,12 @@ class ResourceHandler(abc.ABC):
...
@abc.abstractmethod
def destroy_resources(self):
def destroy_resources(self, state):
"""Destroy the created resources
Cleaning up properly is very important to keep sensitive data safe
In the error state all buckets should be cleared up if they weren't already present
"""
...
......@@ -47,10 +53,12 @@ class GoogleResourceHandler(ResourceHandler):
):
super().__init__(intp_id=intp_id.lower())
self.project_id = project_id
self._work_bucket = f"{self.intp_id}-work"
self._results_bucket = f"{self.intp_id}-results"
self._bucket_root = f"{str(settings.NAMESPACE)}-{self.intp_id}"
self._work_bucket = f"{self._bucket_root}-work"
self._results_bucket = f"{self._bucket_root}-results"
self._location = location
self._work_bucket_existed_on_create = False
self._results_bucket_existed_on_create = False
async def create_resources(self, job_model: JobModel):
"""Create some resources to run the job, including:
......@@ -61,14 +69,19 @@ class GoogleResourceHandler(ResourceHandler):
"""
self.make_buckets(job_model=job_model)
await helm_install(job_model=job_model)
async def destroy_resources(self):
# TODO: if the bucket exists already, we shouldn't destroy it in the error state
await helm_uninstall(
namespace="intervene-dev", release_name="helmvatti-1712756412"
await helm_install(
job_model=job_model,
work_bucket_path=self._work_bucket,
results_bucket_path=self._results_bucket,
)
self._delete_work_bucket()
async def destroy_resources(self, state):
await helm_uninstall(self.project_id)
if state == States.FAILED:
self._delete_buckets(results=True)
else:
self._delete_buckets(results=False)
def make_buckets(self, job_model: JobModel):
"""Create the buckets needed to run the job"""
......@@ -118,6 +131,7 @@ class GoogleResourceHandler(ResourceHandler):
soft_policy.retention_duration_seconds = 0
iam = storage.bucket.IAMConfiguration(bucket=bucket)
iam.public_access_prevention = "enforced"
iam.uniform_bucket_level_access_enabled = True
bucket.create(location=self._location)
......@@ -128,6 +142,7 @@ class GoogleResourceHandler(ResourceHandler):
if bucket.exists():
logger.critical(f"Bucket {self._results_bucket} exists!")
self._results_bucket_existed_on_create = True
raise FileExistsError
# results stay live for 7 days
......@@ -139,10 +154,11 @@ class GoogleResourceHandler(ResourceHandler):
soft_policy.retention_duration_seconds = 0
iam = storage.bucket.IAMConfiguration(bucket=bucket)
iam.public_access_prevention = "enforced"
iam.uniform_bucket_level_access_enabled = True
bucket.create(location=self._location)
def _delete_work_bucket(self):
def _delete_buckets(self, results=False):
# TODO: what if this is slow? it's not async!
if self._work_bucket_existed_on_create:
# don't delete a bucket that existed before the job was created
......@@ -168,21 +184,28 @@ class GoogleResourceHandler(ResourceHandler):
logger.info(f"Deleting {bucket}")
bucket.delete(force=True)
if results:
results_bucket = client.get_bucket(self._results_bucket)
results_bucket.delete(force=True)
async def helm_install(job_model: JobModel):
if GoogleResourceHandler.dry_run:
dry_run = "--dry-run"
logger.info("{dry_run} enabled")
else:
dry_run = ""
# TODO: add chart path and values file
cmd = f"helm # install -n intervene-dev {dry_run}"
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
async def helm_install(
job_model: JobModel, work_bucket_path: str, results_bucket_path: str
):
release_name: str = job_model.pipeline_param.id.lower()
template = render_template(
job_model,
work_bucket_path=work_bucket_path,
results_bucket_path=results_bucket_path,
)
stdout, stderr = await proc.communicate()
with tempfile.NamedTemporaryFile(mode="wt") as temp_f:
yaml.dump(template, temp_f)
cmd = f"helm install {release_name} {settings.HELM_CHART_PATH} -n {str(settings.NAMESPACE)} -f {temp_f.name}"
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
logger.critical(f"{stderr.decode()}")
......@@ -191,18 +214,8 @@ async def helm_install(job_model: JobModel):
logger.info("helm install OK")
async def helm_render(job_model: JobModel):
pass
async def helm_uninstall(release_name: str, namespace: str):
if GoogleResourceHandler.dry_run:
dry_run = "--dry-run"
logger.info(f"{dry_run} enabled")
else:
dry_run = ""
cmd = f"helm # uninstall --namespace {namespace} {dry_run} {release_name}"
async def helm_uninstall(release_name: str):
cmd = f"helm uninstall --namespace {str(settings.NAMESPACE)} {release_name.lower()}"
proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment