From 329fd5776bf8b945f359138afc126d9ca41be041 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield <bwingfield@ebi.ac.uk> Date: Wed, 21 Jun 2023 15:54:44 +0100 Subject: [PATCH] start building templates for loaded jobs --- Cargo.lock | 11 ++ Cargo.toml | 1 + data/templates/allas.config | 7 ++ data/templates/env_vars.txt | 7 ++ data/templates/header.txt | 12 ++ data/templates/nxf.sh | 8 ++ data/templates/setup.txt | 2 + src/db.rs | 3 +- src/db/load.rs | 1 + src/db/{job.rs => load/message.rs} | 5 +- src/db/submit.rs | 1 + src/db/submit/load.rs | 28 +++++ src/main.rs | 19 +++- src/request.rs | 3 +- src/slurm.rs | 2 + src/slurm/job.rs | 111 +++++++++++++++++++ src/{request/job.rs => slurm/job_request.rs} | 16 +-- 17 files changed, 217 insertions(+), 20 deletions(-) create mode 100644 data/templates/allas.config create mode 100644 data/templates/env_vars.txt create mode 100644 data/templates/header.txt create mode 100644 data/templates/nxf.sh create mode 100644 data/templates/setup.txt create mode 100644 src/db/load.rs rename src/db/{job.rs => load/message.rs} (82%) create mode 100644 src/db/submit.rs create mode 100644 src/db/submit/load.rs create mode 100644 src/slurm.rs create mode 100644 src/slurm/job.rs rename src/{request/job.rs => slurm/job_request.rs} (66%) diff --git a/Cargo.lock b/Cargo.lock index 8c3eb53..9f107c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -409,6 +409,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "tinytemplate", "url", ] @@ -1067,6 +1068,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 965fffd..df8b299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,4 @@ url = "2.4.0" anyhow = "1.0.71" clap = { version = "4.3.4", features = ["derive"] } rusqlite = { version = "0.29.0", features = ["bundled"] } +tinytemplate = "1.2.1" diff --git a/data/templates/allas.config b/data/templates/allas.config new file mode 100644 index 0000000..cea17c9 --- /dev/null +++ b/data/templates/allas.config @@ -0,0 +1,7 @@ +// (this config file is static) +aws { + client { + endpoint = "a3s.fi" + protocol = "https" + } +} \ No newline at end of file diff --git a/data/templates/env_vars.txt b/data/templates/env_vars.txt new file mode 100644 index 0000000..fe22cc7 --- /dev/null +++ b/data/templates/env_vars.txt @@ -0,0 +1,7 @@ +export NXF_OPTS="-Xms500M -Xmx4G" +export NXF_ANSI_TERM=false +export NXF_EXECUTOR=local +export NXF_WORK=$TMPDIR/work +export GLOBUS_BASE_URL={globus_base_url} +export GLOBUS_GUEST_COLLECTION_ID={guest_collection_id} +export JOB_MESSAGE="{message}" diff --git a/data/templates/header.txt b/data/templates/header.txt new file mode 100644 index 0000000..22d4a56 --- /dev/null +++ b/data/templates/header.txt @@ -0,0 +1,12 @@ +#!/usr/bin/env bash + +#SBATCH --job-name={name} +#SBATCH --account=project_2004504 +#SBATCH --partition=small +#SBATCH --time={time} +#SBATCH --gres=nvme:256 +#SBATCH --exclusive +#SBATCH --mem=32G +# pass through secrets from hattivatti deployment +#SBATCH --export=AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,GLOBUS_SECRET_TOKEN + diff --git a/data/templates/nxf.sh b/data/templates/nxf.sh new file mode 100644 index 0000000..38fabe4 --- /dev/null +++ b/data/templates/nxf.sh @@ -0,0 +1,8 @@ +nextflow run pgscatalog/pgsc_calc -profile test,singularity \ + -c ${HATTIVATTI_WORK_DIR}/{name}/config \ + -c ${HATTIVATTI_WORK_DIR}/allas.config \ + --input ${HATTIVATTI_WORK_DIR}/{name}/input.json \ + --min_overlap 0.01 \ + --max_cpus 40 \ + --max_memory "32.GB" \ + --parallel diff --git a/data/templates/setup.txt b/data/templates/setup.txt new file mode 100644 index 0000000..eded93a --- /dev/null +++ b/data/templates/setup.txt @@ -0,0 +1,2 @@ +set -euxo pipefail +module load nextflow/22.10.1 diff --git a/src/db.rs b/src/db.rs index e2e891a..d553bed 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,2 +1,3 @@ pub mod open; -pub mod job; \ No newline at end of file +pub mod submit; +pub mod load; \ No newline at end of file diff --git a/src/db/load.rs b/src/db/load.rs new file mode 100644 index 0000000..7397c12 --- /dev/null +++ b/src/db/load.rs @@ -0,0 +1 @@ +pub mod message; \ No newline at end of file diff --git a/src/db/job.rs b/src/db/load/message.rs similarity index 82% rename from src/db/job.rs rename to src/db/load/message.rs index e1a3fc5..d4db1c2 100644 --- a/src/db/job.rs +++ b/src/db/load/message.rs @@ -2,11 +2,10 @@ use std::{fs, io}; use std::path::Path; use log::{info, warn}; use rusqlite::Connection; -use crate::request::job::JobRequest; use anyhow::Result; -// once a message is read, start -pub fn add_job(conn: &Connection, job: Result<(), io::Error>, path: &Path) -> Result<()> { +// load all messages into the database +pub fn load_message(conn: &Connection, job: Result<(), io::Error>, path: &Path) -> Result<()> { info!("Adding job to db"); // read raw message content again to store in db let json: String = fs::read_to_string(path)?; diff --git a/src/db/submit.rs b/src/db/submit.rs new file mode 100644 index 0000000..f2e2ce3 --- /dev/null +++ b/src/db/submit.rs @@ -0,0 +1 @@ +pub mod load; \ No newline at end of file diff --git a/src/db/submit/load.rs b/src/db/submit/load.rs new file mode 100644 index 0000000..d3b92a5 --- /dev/null +++ b/src/db/submit/load.rs @@ -0,0 +1,28 @@ +use log::info; +use rusqlite::Connection; +use serde_json::Result as JsonResult; +use crate::slurm::job_request::JobRequest; + +pub fn get_valid_jobs(conn: &Connection) -> Result<Vec<JobRequest>, rusqlite::Error> { + let mut stmt = conn.prepare("SELECT manifest FROM job WHERE valid == 1 AND submitted == 0")?; + let rows = stmt.query_map([], |row| row.get(0))?; + + let mut json: Vec<String> = Vec::new(); + for row in rows { + let json_string: String = row?; + info!("Loading valid job from db: {} ...", &json_string[..50]); + json.push(json_string); + } + + let jobs = deserialise(json).expect("Deserialised JSON"); + Ok(jobs) +} + +fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> { + let mut jobs: Vec<JobRequest> = Vec::new(); + for string in json_strings { + let job: JobRequest = serde_json::from_str(&string)?; + jobs.push(job); + } + Ok(jobs) +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index d04e4c6..ad25400 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,15 +4,15 @@ use std::path::{Path, PathBuf}; use clap::Parser; use log::info; use rusqlite::Connection; -use serde::de::Error; -use request::job::JobRequest; - -use crate::db::job::add_job; +use crate::db::load::message::load_message; +use crate::db::submit::load::get_valid_jobs; use crate::request::message::Message; +use crate::slurm::job::create_job; mod db; mod request; +mod slurm; #[derive(Parser, Debug)] #[command(name = "hattivatti")] @@ -30,6 +30,8 @@ struct Args { message_dir: PathBuf, #[arg(short, long)] db_path: String, + #[arg(short, long)] + work_dir: PathBuf } fn main() { @@ -47,10 +49,15 @@ fn main() { for message in messages.unwrap() { let job: Result<(), io::Error> = message.read(&schema); - let _ = add_job(&conn, job, message.path.as_path()); + let _ = load_message(&conn, job, message.path.as_path()); } - // todo: select messages where valid = 1 and submitted = 0 + let jobs = get_valid_jobs(&conn).unwrap(); + info!("{:?}", jobs); + + for job in jobs { + let _ = create_job(job); + } //let x = m.read(); diff --git a/src/request.rs b/src/request.rs index 0a7c9aa..01f31ce 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,3 +1,2 @@ pub mod message; -pub mod schema; -pub mod job; \ No newline at end of file +pub mod schema; \ No newline at end of file diff --git a/src/slurm.rs b/src/slurm.rs new file mode 100644 index 0000000..cbe791a --- /dev/null +++ b/src/slurm.rs @@ -0,0 +1,2 @@ +pub mod job_request; +pub mod job; \ No newline at end of file diff --git a/src/slurm/job.rs b/src/slurm/job.rs new file mode 100644 index 0000000..3f6d361 --- /dev/null +++ b/src/slurm/job.rs @@ -0,0 +1,111 @@ +use std::fs::OpenOptions; +use std::io; +use std::io::Write; +use std::path::Path; +use log::info; +use tinytemplate::TinyTemplate; +use crate::slurm::job_request::{JobRequest, NxfParamsFile, PipelineParam}; +use serde::Serialize; +use serde_json::Value; +use tinytemplate::error::Error; + +pub fn create_job(request: JobRequest) { + info!("Creating job {}", &request.pipeline_param.id); + let header: Header = render_header(&request.pipeline_param); + let vars: EnvVars = get_environment_variables(&request); + let job = JobTemplate { header, vars }; + job.write(Path::new("/Users/bwingfield/Downloads/test.txt")).expect("out"); + let _ = make_input_file(&request.pipeline_param.nxf_params_file); +} + +struct JobTemplate { + header: Header, + vars: EnvVars, +} + +impl JobTemplate { + fn write(self, out_path: &Path) -> Result<(), io::Error> { + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(out_path)?; + + [self.header.content, self.vars.content].map( + |str| { + file.write_all(str.as_bytes()); + } + ); + + Ok(()) + } +} + +struct Header { + content: String +} + +struct EnvVars { + content: String +} + +#[derive(Serialize)] +struct HeaderContext { + name: String, + time: String +} + +#[derive(Serialize)] +struct EnvVarContext { + globus_base_url: String, + guest_collection_id: String, + message: String +} + +fn render_header(param: &PipelineParam) -> Header { + static HEADER: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/header.txt")); + let mut tt = TinyTemplate::new(); + tt.add_template("header", HEADER).expect("Template"); + + let context = HeaderContext { + name: param.id.to_string(), + // (todo: run job for 1 hour) + time: "01:00:00".to_string() + }; + + Header { content: tt.render("header", &context).expect("Rendered document") } +} + +fn get_environment_variables(request: &JobRequest) -> EnvVars { + static ENV_VARS: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/env_vars.txt")); + let mut tt = TinyTemplate::new(); + // html escape breaks JSON + tt.set_default_formatter(&tinytemplate::format_unescaped); + tt.add_template("env_var", ENV_VARS).expect("Template"); + + let globus_base_url: String = "https://g-1504d5.dd271.03c0.data.globus.org".to_string(); + let guest_collection_id = request.globus_details.guest_collection_id.clone(); + let message: String = serde_json::to_string(&request.pipeline_param).expect("Deserialised"); + let context = EnvVarContext { globus_base_url, guest_collection_id, message }; + + EnvVars { content: tt.render("env_var", &context).expect("Rendered document") } +} + +fn render_nxf(param: &PipelineParam) { + +} + +struct AllasConfig { + content: String +} + +fn allas_config() -> AllasConfig { + static ALLAS: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/allas.config")); + AllasConfig { content: ALLAS.to_string() } +} + +// pgsc_calc requires --input in json format (samplesheet) +// this describes the structure of the input genomes +fn make_input_file(params_file: &NxfParamsFile) { + let val: Value = serde_json::to_value(params_file).expect("Deserialised params file"); + info!("{:?}", val); +} \ No newline at end of file diff --git a/src/request/job.rs b/src/slurm/job_request.rs similarity index 66% rename from src/request/job.rs rename to src/slurm/job_request.rs index 7788920..414d69f 100644 --- a/src/request/job.rs +++ b/src/slurm/job_request.rs @@ -3,13 +3,13 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Deserialize, Serialize)] pub struct PipelineParam { pub id: String, - target_genomes: Vec<TargetGenome>, - nxf_params_file: NxfParamsFile, - nxf_work: String, + pub target_genomes: Vec<TargetGenome>, + pub nxf_params_file: NxfParamsFile, + pub nxf_work: String, } #[derive(Debug, Deserialize, Serialize)] -struct TargetGenome { +pub struct TargetGenome { pvar: String, pgen: String, psam: String, @@ -18,7 +18,7 @@ struct TargetGenome { } #[derive(Debug, Deserialize, Serialize)] -struct NxfParamsFile { +pub struct NxfParamsFile { pgs_id: String, format: String, target_build: String, @@ -26,12 +26,12 @@ struct NxfParamsFile { #[derive(Debug, Deserialize, Serialize)] pub struct GlobusDetails { - guest_collection_id: String, - dir_path_on_guest_collection: String, + pub guest_collection_id: String, + pub dir_path_on_guest_collection: String, } #[derive(Debug, Deserialize, Serialize)] pub struct JobRequest { pub pipeline_param: PipelineParam, - globus_details: GlobusDetails, + pub globus_details: GlobusDetails, } -- GitLab