Skip to content
Snippets Groups Projects
Unverified Commit 84ff2168 authored by Benjamin Wingfield's avatar Benjamin Wingfield Committed by GitHub
Browse files

Merge pull request #3 from ebi-gdp/namespace

set up namespace support
parents a7586a02 fea1db89
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,12 @@ set -euxo pipefail
module load python-data/3.10-23.07
# run the monitor in the background
python3 {workflow_monitor_path} &
python3 {workflow_monitor_path} \
--callback_token $CALLBACK_TOKEN \
--namespace {namespace} &
# grab workflow monitor pid
workflow_monitor_pid=$!
# ------------------------------------------------------------------------------
......@@ -28,7 +28,7 @@ nextflow run {pgsc_calc_dir} -profile singularity \
-c {work_dir}/{name}/allas.config \
-params-file {work_dir}/{name}/params.json \
--input {work_dir}/{name}/input.json \
--outdir s3://intervene-dev/{name} \
--outdir s3://intervene-{namespace}/{name} \
--min_overlap 0.01 \
--max_cpus 40 \
--max_memory "32.GB" \
......
......@@ -12,7 +12,7 @@
#![warn(missing_docs)]
use std::fs;
use std::path::{PathBuf};
use std::path::PathBuf;
use clap::Parser;
use log::info;
......@@ -20,11 +20,13 @@ use rusqlite::Connection;
use crate::db::ingest::message::ingest_message;
use crate::db::job::load::get_valid_jobs;
use crate::namespace::PlatformNamespace;
use crate::slurm::job_request::JobRequest;
mod db;
mod request;
mod slurm;
mod namespace;
#[derive(Parser, Debug)]
#[command(name = "hattivatti")]
......@@ -48,7 +50,10 @@ struct Args {
dry_run: bool,
/// Path to the globus file handler jar
#[arg(short, long)]
globus_jar_path: PathBuf
globus_jar_path: PathBuf,
/// Which platform namespace do you want to deploy to? [dev, test, prod]
#[arg(short, long, value_enum)]
namespace: PlatformNamespace
}
/// A directory for storing working data
......@@ -70,7 +75,9 @@ async fn main() {
info!("terve! starting up :)");
let args = Args::parse();
let wd = WorkingDirectory { path: args.work_dir };
let wd = WorkingDirectory { path: args.work_dir.join(args.namespace.to_string()) };
info!("Setting work directory to: {:?}", &wd.path);
fs::create_dir_all(&wd.path).expect("Can't create working directory");
let conn: Connection = db::open::open_db(&wd)
......@@ -78,7 +85,7 @@ async fn main() {
let schema = request::schema::load_schema(args.schema_dir.as_path());
let s3_client = request::message::make_allas_client();
let messages = request::message::fetch_all(&s3_client, &schema).await;
let messages = request::message::fetch_all(&s3_client, &schema, &args.namespace).await;
if let Some(messages) = messages {
for message in messages {
......@@ -98,7 +105,7 @@ async fn main() {
if let Some(jobs) = jobs {
for job in jobs {
let job_path = job.create(&wd, &args.globus_jar_path);
let job_path = job.create(&wd, &args.globus_jar_path, &args.namespace);
if !args.dry_run {
job.stage(&conn);
job.submit(&conn, job_path);
......
use std::fmt;
use clap::ValueEnum;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum PlatformNamespace {
Dev,
Test,
Prod
}
impl fmt::Display for PlatformNamespace {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PlatformNamespace::Dev => write!(f, "dev"),
PlatformNamespace::Test => write!(f, "test"),
PlatformNamespace::Prod => write!(f, "prod")
}
}
}
\ No newline at end of file
......@@ -6,6 +6,7 @@ use log::{info, warn};
use rusoto_s3::S3;
use serde_json::Value;
use tokio::io::AsyncReadExt;
use crate::namespace::PlatformNamespace;
/// Create an s3 client capable of connecting to Allas
///
......@@ -41,13 +42,13 @@ pub struct AllasMessage {
}
/// Fetch all messages in the work queue on Allas
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) -> Option<Vec<AllasMessage>> {
let bucket = "intervene-dev";
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema, namespace: &PlatformNamespace) -> Option<Vec<AllasMessage>> {
let bucket = format!("intervene-{namespace}");
let prefix = "job-queue";
info!("Checking Allas queue {bucket}/{prefix} for new messages");
let list_request = rusoto_s3::ListObjectsV2Request {
bucket: bucket.into(),
bucket: bucket.clone(),
prefix: Some(prefix.into()),
..Default::default()
};
......@@ -66,7 +67,7 @@ pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) ->
for object in objects {
let key = object.key.unwrap();
info!("Object key: {}", key);
let content = read_job(&s3_client, bucket, &key).await;
let content = read_job(&s3_client, &bucket, &key).await;
// info!("Object content: {content}");
jobs.push(AllasMessage::new(content,
bucket.to_string(),
......
......@@ -7,6 +7,7 @@ use chrono::Utc;
use log::{info, warn};
use serde::Serialize;
use tinytemplate::TinyTemplate;
use crate::namespace::PlatformNamespace;
use crate::slurm::job_request::{GlobusDetails, JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
......@@ -23,7 +24,7 @@ pub struct JobPath {
}
impl JobRequest {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf) -> JobPath {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf, namespace: &PlatformNamespace) -> JobPath {
let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) };
info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display());
......@@ -34,9 +35,9 @@ impl JobRequest {
fs::create_dir(&instance_wd.path).expect("Create working directory");
let header: Header = render_header(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param, &namespace);
let vars: EnvVars = read_environment_variables();
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path);
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path, &namespace);
let job = JobTemplate { header, callback, vars, workflow };
let path = &instance_wd.path.join("job.sh");
......@@ -148,6 +149,7 @@ struct EnvVarContext {
struct NextflowContext {
name: String,
work_dir: String,
namespace: String,
pgsc_calc_dir: String,
globus_path: String,
globus_parent_path: String
......@@ -157,7 +159,8 @@ struct NextflowContext {
#[derive(Serialize)]
struct CallbackContext {
name: String,
workflow_monitor_path: String
workflow_monitor_path: String,
namespace: String
}
/// Write nextflow parameters to working directory
......@@ -211,18 +214,18 @@ fn read_environment_variables() -> EnvVars {
}
/// Render the workflow commands using TinyTemplate
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) -> Workflow {
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path, namespace: &PlatformNamespace) -> Workflow {
/// included workflow template
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("nxf", NXF).expect("Template");
let name: &String = &param.id;
let wd = work_dir.to_str().expect("path").to_string();
// todo: make dynamic based on deployment namespace
/// installation directory of pgsc_calc (TODO: make this a parameter)
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(),
work_dir: wd,
namespace: namespace.to_string(),
pgsc_calc_dir: PGSC_CALC_DIR.to_string(),
globus_path: globus_path.to_str().expect("Globus path").to_string(),
globus_parent_path: globus_path.parent().expect("Globus parent").to_str().expect("Globus parent path").to_string()
......@@ -231,7 +234,7 @@ fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) ->
}
/// Render the callback using TinyTemplate
fn render_callback(param: &PipelineParam) -> Callback {
fn render_callback(param: &PipelineParam, namespace: &PlatformNamespace) -> Callback {
/// included callback template
static CALLBACK: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/callback.txt"));
let mut tt = TinyTemplate::new();
......@@ -239,7 +242,9 @@ fn render_callback(param: &PipelineParam) -> Callback {
let name: &String = &param.id;
static WORKFLOW_MONITOR_PATH: &str = "/scratch/project_2004504/bwingfield/workflow-monitor/main.py";
let context = CallbackContext { name: name.clone(),
workflow_monitor_path: WORKFLOW_MONITOR_PATH.to_string() };
workflow_monitor_path: WORKFLOW_MONITOR_PATH.to_string(),
namespace: namespace.to_string()
};
Callback { content: tt.render("callback", &context).expect("Rendered callback") }
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment