Skip to content
Snippets Groups Projects
Commit 10a98a5f authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

set up globus file handler

parent 73f980be
No related branches found
No related tags found
No related merge requests found
......@@ -4,16 +4,24 @@
# step 0. load dependencies
# (curl, jq, and singularity/apptainer are preloaded on Puhti)
module load nextflow/22.10.1
module load parallel
# step 1: transfer data in parallel
parallel --col-sep ' ' \
-a {work_dir}/{name}/transfer.txt \
java -Xmx256m \
-jar {globus_path} \
--spring.config.location={globus_parent_path}/config/ \
-s \{1} -d $TMPDIR --file_size \{2}
# the job _must be_ scheduled on a node with local storage
# it's cleaned up by the system after the job finishes
# can't use --chdir, $TMPDIR is set when the job starts
cd $TMPDIR
set_tmpdir() \{
cd $TMPDIR
}
# step 1: transfer data using HTTPS + curl
# (assume transfer.sh is on PATH)
# just dumps files into current working directory
transfer.sh
set_tmpdir
# step 2: run pgsc_calc and calculate some scores
nextflow run {pgsc_calc_dir} -profile singularity \
......
......@@ -45,7 +45,10 @@ struct Args {
work_dir: PathBuf,
/// Read messages from the queue and create SLURM job files, but don't submit them to the SLURM scheduler
#[arg(long)]
dry_run: bool
dry_run: bool,
/// Path to the globus file handler jar
#[arg(short, long)]
globus_jar_path: PathBuf
}
/// A directory for storing working data
......@@ -95,7 +98,7 @@ async fn main() {
if let Some(jobs) = jobs {
for job in jobs {
let job_path = job.create(&wd);
let job_path = job.create(&wd, &args.globus_jar_path);
if !args.dry_run {
job.stage(&conn);
job.submit(&conn, job_path);
......
use std::{fs, io};
use std::fs::OpenOptions;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
......@@ -8,7 +8,7 @@ use log::{info, warn};
use serde::Serialize;
use tinytemplate::TinyTemplate;
use crate::slurm::job_request::{JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::slurm::job_request::{FileData, JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
/// A JobPath is the path to a job script that's submitted to SLURM via sbatch
......@@ -23,7 +23,7 @@ pub struct JobPath {
}
impl JobRequest {
pub fn create(&self, wd: &WorkingDirectory) -> JobPath {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf) -> JobPath {
let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) };
info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display());
......@@ -36,7 +36,7 @@ impl JobRequest {
let header: Header = render_header(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param);
let vars: EnvVars = render_environment_variables(&&self);
let workflow: Workflow = render_nxf(&&self.pipeline_param, &wd.path);
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path);
let job = JobTemplate { header, callback, vars, workflow };
let path = &instance_wd.path.join("job.sh");
......@@ -44,6 +44,7 @@ impl JobRequest {
write_samplesheet(&&self.pipeline_param, &instance_wd);
write_config(&&self.pipeline_param.nxf_params_file, &instance_wd);
write_allas(&instance_wd);
write_transfer(&self.globus_details.files, &instance_wd);
JobPath { path: path.clone() }
}
......@@ -148,6 +149,8 @@ struct NextflowContext {
name: String,
work_dir: String,
pgsc_calc_dir: String,
globus_path: String,
globus_parent_path: String
}
/// Rendering context for callback
......@@ -218,7 +221,7 @@ fn render_environment_variables(request: &JobRequest) -> EnvVars {
}
/// Render the workflow commands using TinyTemplate
fn render_nxf(param: &PipelineParam, work_dir: &Path) -> Workflow {
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) -> Workflow {
/// included workflow template
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
......@@ -228,7 +231,12 @@ fn render_nxf(param: &PipelineParam, work_dir: &Path) -> Workflow {
// todo: make dynamic based on deployment namespace
/// installation directory of pgsc_calc (TODO: make this a parameter)
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(), work_dir: wd, pgsc_calc_dir: PGSC_CALC_DIR.to_string() };
let context = NextflowContext { name: name.clone(),
work_dir: wd,
pgsc_calc_dir: PGSC_CALC_DIR.to_string(),
globus_path: globus_path.to_str().expect("Globus path").to_string(),
globus_parent_path: globus_path.parent().expect("Globus parent").to_str().expect("Globus parent path").to_string()
};
Workflow { content: tt.render("nxf", &context).expect("Rendered nextflow") }
}
......@@ -255,3 +263,21 @@ fn allas_config() -> AllasConfig {
AllasConfig { content: ALLAS.to_string() }
}
/// Write transfer details to a text file in the working directory
///
/// The text file is space delimited with two columns:
///
/// | filename | file_size |
/// | ------------ | --------- |
/// | hapnest.psam | 8517 |
///
/// (no header is present in the output file)
fn write_transfer(x: &Vec<FileData>, wd: &WorkingDirectory) {
info!("Writing transfer data");
let out_path = wd.path.join("transfer.txt");
let mut file = File::create(out_path).expect("Transfer file");
for data in x {
let line = format!("{} {}\n", data.filename, data.file_size);
file.write_all(&line.as_bytes()).expect("Line written");
}
}
\ No newline at end of file
......@@ -28,10 +28,17 @@ pub struct NxfParamsFile {
pub struct GlobusDetails {
pub guest_collection_id: String,
pub dir_path_on_guest_collection: String,
pub files: Vec<FileData>,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct JobRequest {
pub pipeline_param: PipelineParam,
pub globus_details: GlobusDetails,
pub globus_details: GlobusDetails
}
#[derive(Debug, Deserialize, Serialize)]
pub struct FileData {
pub filename: String,
pub file_size: u64,
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment