Skip to content
Snippets Groups Projects
Commit 5e3db761 authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

set up namespace support

parent a7586a02
No related branches found
No related tags found
No related merge requests found
......@@ -10,5 +10,8 @@ module load python-data/3.10-23.07
# run the monitor in the background
python3 {workflow_monitor_path} &
# grab workflow monitor pid
workflow_monitor_pid=$!
# ------------------------------------------------------------------------------
......@@ -28,7 +28,7 @@ nextflow run {pgsc_calc_dir} -profile singularity \
-c {work_dir}/{name}/allas.config \
-params-file {work_dir}/{name}/params.json \
--input {work_dir}/{name}/input.json \
--outdir s3://intervene-dev/{name} \
--outdir s3://intervene-{namespace}/{name} \
--min_overlap 0.01 \
--max_cpus 40 \
--max_memory "32.GB" \
......
......@@ -12,7 +12,7 @@
#![warn(missing_docs)]
use std::fs;
use std::path::{PathBuf};
use std::path::PathBuf;
use clap::Parser;
use log::info;
......@@ -20,11 +20,13 @@ use rusqlite::Connection;
use crate::db::ingest::message::ingest_message;
use crate::db::job::load::get_valid_jobs;
use crate::namespace::PlatformNamespace;
use crate::slurm::job_request::JobRequest;
mod db;
mod request;
mod slurm;
mod namespace;
#[derive(Parser, Debug)]
#[command(name = "hattivatti")]
......@@ -48,7 +50,10 @@ struct Args {
dry_run: bool,
/// Path to the globus file handler jar
#[arg(short, long)]
globus_jar_path: PathBuf
globus_jar_path: PathBuf,
/// Which platform namespace do you want to deploy to? [dev, test, prod]
#[arg(short, long, value_enum)]
namespace: PlatformNamespace
}
/// A directory for storing working data
......@@ -70,7 +75,9 @@ async fn main() {
info!("terve! starting up :)");
let args = Args::parse();
let wd = WorkingDirectory { path: args.work_dir };
let wd = WorkingDirectory { path: args.work_dir.join(args.namespace.to_string()) };
info!("Setting work directory to: {:?}", &wd.path);
fs::create_dir_all(&wd.path).expect("Can't create working directory");
let conn: Connection = db::open::open_db(&wd)
......@@ -78,7 +85,7 @@ async fn main() {
let schema = request::schema::load_schema(args.schema_dir.as_path());
let s3_client = request::message::make_allas_client();
let messages = request::message::fetch_all(&s3_client, &schema).await;
let messages = request::message::fetch_all(&s3_client, &schema, &args.namespace).await;
if let Some(messages) = messages {
for message in messages {
......@@ -98,7 +105,7 @@ async fn main() {
if let Some(jobs) = jobs {
for job in jobs {
let job_path = job.create(&wd, &args.globus_jar_path);
let job_path = job.create(&wd, &args.globus_jar_path, &args.namespace);
if !args.dry_run {
job.stage(&conn);
job.submit(&conn, job_path);
......
use std::fmt;
use clap::ValueEnum;
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum PlatformNamespace {
Dev,
Test,
Prod
}
impl fmt::Display for PlatformNamespace {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PlatformNamespace::Dev => write!(f, "dev"),
PlatformNamespace::Test => write!(f, "test"),
PlatformNamespace::Prod => write!(f, "prod")
}
}
}
\ No newline at end of file
......@@ -6,6 +6,7 @@ use log::{info, warn};
use rusoto_s3::S3;
use serde_json::Value;
use tokio::io::AsyncReadExt;
use crate::namespace::PlatformNamespace;
/// Create an s3 client capable of connecting to Allas
///
......@@ -41,13 +42,13 @@ pub struct AllasMessage {
}
/// Fetch all messages in the work queue on Allas
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) -> Option<Vec<AllasMessage>> {
let bucket = "intervene-dev";
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema, namespace: &PlatformNamespace) -> Option<Vec<AllasMessage>> {
let bucket = format!("intervene-{namespace}");
let prefix = "job-queue";
info!("Checking Allas queue {bucket}/{prefix} for new messages");
let list_request = rusoto_s3::ListObjectsV2Request {
bucket: bucket.into(),
bucket: bucket.clone(),
prefix: Some(prefix.into()),
..Default::default()
};
......@@ -66,7 +67,7 @@ pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) ->
for object in objects {
let key = object.key.unwrap();
info!("Object key: {}", key);
let content = read_job(&s3_client, bucket, &key).await;
let content = read_job(&s3_client, &bucket, &key).await;
// info!("Object content: {content}");
jobs.push(AllasMessage::new(content,
bucket.to_string(),
......
......@@ -7,6 +7,7 @@ use chrono::Utc;
use log::{info, warn};
use serde::Serialize;
use tinytemplate::TinyTemplate;
use crate::namespace::PlatformNamespace;
use crate::slurm::job_request::{GlobusDetails, JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
......@@ -23,7 +24,7 @@ pub struct JobPath {
}
impl JobRequest {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf) -> JobPath {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf, namespace: &PlatformNamespace) -> JobPath {
let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) };
info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display());
......@@ -36,7 +37,7 @@ impl JobRequest {
let header: Header = render_header(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param);
let vars: EnvVars = read_environment_variables();
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path);
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path, &namespace);
let job = JobTemplate { header, callback, vars, workflow };
let path = &instance_wd.path.join("job.sh");
......@@ -148,6 +149,7 @@ struct EnvVarContext {
struct NextflowContext {
name: String,
work_dir: String,
namespace: String,
pgsc_calc_dir: String,
globus_path: String,
globus_parent_path: String
......@@ -211,18 +213,18 @@ fn read_environment_variables() -> EnvVars {
}
/// Render the workflow commands using TinyTemplate
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) -> Workflow {
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path, namespace: &PlatformNamespace) -> Workflow {
/// included workflow template
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("nxf", NXF).expect("Template");
let name: &String = &param.id;
let wd = work_dir.to_str().expect("path").to_string();
// todo: make dynamic based on deployment namespace
/// installation directory of pgsc_calc (TODO: make this a parameter)
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(),
work_dir: wd,
namespace: namespace.to_string(),
pgsc_calc_dir: PGSC_CALC_DIR.to_string(),
globus_path: globus_path.to_str().expect("Globus path").to_string(),
globus_parent_path: globus_path.parent().expect("Globus parent").to_str().expect("Globus parent path").to_string()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment