update templates

# set up reporting job status to INTERVENE backend
# need to escape starting curly bracket to prevent templating error
# (the comment above only makes sense in the original template file)
callback_exit_handler() \{
# report the status of the job to the INTERVENE backend
# todo: don't change the url, change the message content
if [ $exit_code -eq 0 ]; then
# do a callback
curl -s "$url"
exit $exit_code
# run everything in "strict mode". error early, error often!
set -euxo pipefail
trap callback_exit_handler EXIT
# ------------------------------------------------------------------------------
# these variables shouldn't be sensitive
# set up environment variables
# secrets are inherited from parent environment (hattivatti deployment)
# nextflow configuration
export NXF_OPTS="-Xms500M -Xmx4G"
export NXF_ANSI_TERM=false
export NXF_EXECUTOR=local
export NXF_WORK=$TMPDIR/work
export NXF_HOME=$TMPDIR/home
export CAPSULE_LOG=none
# configuration
export GLOBUS_BASE_URL={globus_base_url}
export GLOBUS_GUEST_COLLECTION_ID={guest_collection_id}
export JOB_MESSAGE="{message}"
export JOB_MESSAGE='{message}'
# ------------------------------------------------------------------------------
......@@ -5,6 +5,12 @@
# script was automatically generated by hattivatti for the INTERVENE platform.
# created at {time_now}
# ------------------------------------------------------------------------------
# request an exclusive node with local storage from the SLURM scheduler
# all sensitive data is processed on _local storage only_
# local storage is automatically wiped after job exits
#SBATCH --job-name={name}
#SBATCH --account=project_2004504
#SBATCH --partition=small
......@@ -12,6 +18,6 @@
#SBATCH --gres=nvme:256
#SBATCH --exclusive
#SBATCH --mem=32G
# pass through secrets from deployment environment
#SBATCH --export=ALL
# ------------------------------------------------------------------------------
# step 0. load required modules
# time to start doing work!
# step 0. load dependencies
# (curl, jq, and singularity/apptainer are preloaded on Puhti)
module load nextflow/22.10.1
# the job _must be_ scheduled on a node with local storage
# it's cleaned up by the system after the job finishes
# can't use --chdir, $TMPDIR is set when the job starts
# step 1: transfer data using HTTPS + curl
# (assume is on PATH) {work_dir}
# just dumps files into current working directory
# step 2. run pgsc_calc!
nextflow run pgscatalog/pgsc_calc -profile test,singularity \
-c {work_dir}/{name}/config \
# step 2: run pgsc_calc and calculate some scores
nextflow run {pgsc_calc_dir} -profile singularity \
-c {work_dir}/{name}/allas.config \
-params-file {work_dir}/{name}/params.json \
--input {work_dir}/{name}/input.json \
--outdir s3://intervene-dev/{name} \
--min_overlap 0.01 \
--max_cpus 40 \
--max_memory "32.GB" \
# step 3: report job status to INTERVENE platform
# (automatically executed in callback_exit_handler by trap)
# ------------------------------------------------------------------------------
set -euxo pipefail
module load nextflow/22.10.1
use std::io;
use std::{fs, io};
use std::path::{Path, PathBuf};
use clap::Parser;
......@@ -43,6 +43,8 @@ fn main() {
let args = Args::parse();
let wd = WorkingDirectory { path: args.work_dir };
fs::create_dir_all(&wd.path).expect("Can't create working directory");
let conn: Connection = db::open::open_db(Path::new(&args.db_path))
.expect("Database connection");
use std::{fs, io};
use std::fs::OpenOptions;
use std::io;
use std::io::Write;
use std::path::Path;
use chrono::Utc;
use log::info;
use serde::Serialize;
use serde_json::Value;
use tinytemplate::error::Error;
use tinytemplate::TinyTemplate;
use chrono::Utc;
use crate::slurm::job_request::{JobRequest, NxfParamsFile, PipelineParam};
use crate::slurm::job_request::{JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
pub fn create_job(request: JobRequest, wd: &WorkingDirectory) {
info!("Creating job {}", &;
let instance_wd = WorkingDirectory { path: wd.path.join(& };
info!("Creating job {} in working directory {}", &, &instance_wd.path.display());
fs::create_dir(&instance_wd.path).expect("Can't create working directory");
let header: Header = render_header(&request.pipeline_param);
let callback: Callback = render_callback(&request.pipeline_param);
let vars: EnvVars = render_environment_variables(&request);
let workflow: Workflow = render_nxf(&request.pipeline_param, &wd.path);
let job = JobTemplate { header, vars, workflow };
let _ = make_input_file(&request.pipeline_param.nxf_params_file);
let job = JobTemplate { header, callback, vars, workflow };
job.write(&instance_wd.path.join("")).expect("Can't write job script");
write_samplesheet(&request.pipeline_param, &instance_wd);
write_config(&request.pipeline_param.nxf_params_file, &instance_wd);
struct JobTemplate {
header: Header,
callback: Callback,
vars: EnvVars,
workflow: Workflow,
......@@ -38,9 +42,9 @@ impl JobTemplate {
[self.header.content, self.vars.content, self.workflow.content].map(
[self.header.content, self.callback.content, self.vars.content, self.workflow.content].map(
|str| {
file.write_all(str.as_bytes()).expect("Can't write job");
......@@ -48,6 +52,10 @@ impl JobTemplate {
struct Callback {
content: String,
struct Header {
content: String,
......@@ -64,7 +72,7 @@ struct Workflow {
struct HeaderContext {
name: String,
job_time: String,
time_now: String
time_now: String,
......@@ -78,8 +86,36 @@ struct EnvVarContext {
struct NextflowContext {
name: String,
work_dir: String,
pgsc_calc_dir: String,
struct CallbackContext {
name: String,
fn write_config(nxf_params: &NxfParamsFile, wd: &WorkingDirectory) {
let params_file: String = serde_json::to_string(nxf_params).expect("Deserialised");
let out_path = wd.path.join("params.json");
info!("Writing params to {}", out_path.display());
fs::write(out_path, params_file).expect("Can't write config");
fn write_samplesheet(param: &PipelineParam, wd: &WorkingDirectory) {
let genomes: &Vec<TargetGenome> = &param.target_genomes;
let samplesheet: String = serde_json::to_string(genomes).expect("Deserialised");
let out_path = wd.path.join("input.json");
info!("Writing samplesheet to {}", out_path.display());
fs::write(out_path, samplesheet).expect("Can't write file");
fn write_allas(wd: &WorkingDirectory) {
let allas: AllasConfig = allas_config();
let out_path = wd.path.join("allas.config");
info!("Writing allas config to {}", out_path.display());
fs::write(out_path, allas.content).expect("Can't write file");
fn render_header(param: &PipelineParam) -> Header {
static HEADER: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/header.txt"));
......@@ -90,7 +126,7 @@ fn render_header(param: &PipelineParam) -> Header {
// (todo: run job for 1 hour)
job_time: "01:00:00".to_string(),
time_now: Utc::now().to_string()
time_now: Utc::now().to_string(),
Header { content: tt.render("header", &context).expect("Rendered document") }
......@@ -103,23 +139,34 @@ fn render_environment_variables(request: &JobRequest) -> EnvVars {
tt.add_template("env_var", ENV_VARS).expect("Template");
// todo: set globus base url dynamically
let globus_base_url: String = "".to_string();
let guest_collection_id = request.globus_details.guest_collection_id.clone();
let message: String = serde_json::to_string(&request.pipeline_param).expect("Deserialised");
let message: String = serde_json::to_string(&request).expect("Deserialised");
let context = EnvVarContext { globus_base_url, guest_collection_id, message };
EnvVars { content: tt.render("env_var", &context).expect("Rendered document") }
fn render_nxf(param: &PipelineParam, work_dir: &Path) -> Workflow {
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("nxf", NXF).expect("Template");
let name: &String = &;
let wd = work_dir.to_str().expect("path").to_string();
let context = NextflowContext { name: name.clone(), work_dir: wd };
Workflow { content: tt.render("nxf", &context).expect("Rendered document") }
// todo: make dynamic based on deployment namespace
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(), work_dir: wd, pgsc_calc_dir: PGSC_CALC_DIR.to_string() };
Workflow { content: tt.render("nxf", &context).expect("Rendered nextflow") }
fn render_callback(param: &PipelineParam) -> Callback {
static CALLBACK: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/callback.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("callback", CALLBACK).expect("Template");
let name: &String = &;
let context = CallbackContext { name: name.clone() };
Callback { content: tt.render("callback", &context).expect("Rendered callback") }
struct AllasConfig {
......@@ -131,9 +178,3 @@ fn allas_config() -> AllasConfig {
AllasConfig { content: ALLAS.to_string() }
// pgsc_calc requires --input in json format (samplesheet)
// this describes the structure of the input genomes
fn make_input_file(params_file: &NxfParamsFile) {
let val: Value = serde_json::to_value(params_file).expect("Deserialised params file");
info!("{:?}", val);
\ No newline at end of file
