Skip to content
Snippets Groups Projects
Commit 45b6379e authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

set up --dry-run

parent 0df7a04f
No related branches found
No related tags found
No related merge requests found
use log::info;
use rusqlite::Connection;
use serde_json::Result as JsonResult;
use crate::slurm::job_request::JobRequest;
pub fn get_valid_jobs(conn: &Connection) -> Option<Vec<JobRequest>> {
pub fn get_valid_jobs(conn: &Connection, dry_run: bool) -> Option<Vec<JobRequest>> {
let mut stmt = conn.prepare("SELECT manifest FROM job WHERE valid == 1 AND staged == 0 AND submitted == 0").expect("");
let rows = stmt.query_map([], |row| row.get(0)).expect("");
......@@ -14,6 +15,8 @@ pub fn get_valid_jobs(conn: &Connection) -> Option<Vec<JobRequest>> {
json.push(json_string);
}
release_or_rollback(&conn, dry_run);
let jobs = deserialise(json).expect("Deserialised JSON");
match jobs.is_empty() {
true => { None }
......@@ -28,4 +31,17 @@ fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> {
jobs.push(job);
}
Ok(jobs)
}
fn release_or_rollback(conn: &Connection, dry_run: bool) {
match dry_run {
true => {
info!("--dry-run set, rolling back database state");
conn.execute("ROLLBACK TO dry_run", []).expect("rollback");
}
false => {
info!("--dry-run not set, releasing dry run save point");
conn.execute("RELEASE dry_run", []).expect("release");
}
}
}
\ No newline at end of file
......@@ -9,5 +9,8 @@ pub fn open_db(wd: &WorkingDirectory) -> rusqlite::Result<rusqlite::Connection>
static SCHEMA: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/db/schema.sql"));
conn.execute(SCHEMA, [], )?;
info!("Creating dry run save point");
conn.execute("SAVEPOINT dry_run", []).expect("Start transaction");
Ok(conn)
}
......@@ -29,7 +29,7 @@ struct Args {
/// A directory where hattivatti can store jobs before submitting them to the SLURM scheduler
#[arg(short, long)]
work_dir: PathBuf,
/// TODO: Read messages from the queue and create SLURM job files, but don't submit them to the SLURM scheduler
/// Read messages from the queue and create SLURM job files, but don't submit them to the SLURM scheduler
#[arg(long)]
dry_run: bool
}
......@@ -60,21 +60,26 @@ async fn main() {
// todo: rollback insertions and updates with dry run
// todo: prevent deletion with dry run
let _ = ingest_message(&conn, &message);
message.delete(&s3_client).await;
if !args.dry_run {
message.delete(&s3_client).await;
} else {
info!("--dry-run set, not deleting message in queue");
}
}
} else {
info!("No new jobs in queue");
}
let jobs: Option<Vec<JobRequest>> = get_valid_jobs(&conn);
let jobs: Option<Vec<JobRequest>> = get_valid_jobs(&conn, args.dry_run);
if let Some(jobs) = jobs {
for job in jobs {
job.create_job(&wd);
job.stage(&conn);
if !args.dry_run {
// submit to slurm
job.stage(&conn);
} else {
info!("--dry-run set, not submitting job to slurm");
}
}
} else {
......
......@@ -35,6 +35,7 @@ pub struct AllasMessage {
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) -> Option<Vec<AllasMessage>> {
let bucket = "intervene-dev";
let prefix = "job-queue";
info!("Checking Allas queue {bucket}/{prefix} for new messages");
let list_request = rusoto_s3::ListObjectsV2Request {
bucket: bucket.into(),
......@@ -52,11 +53,12 @@ pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) ->
match objects {
None => { return None; }
Some(objects) => {
info!("Found new messages in queue");
for object in objects {
let key = object.key.unwrap();
info!("Object key: {}", key);
let content = read_job(&s3_client, bucket, &key).await;
info!("Object content: {content}");
// info!("Object content: {content}");
jobs.push(AllasMessage::new(content,
bucket.to_string(),
key,
......@@ -95,7 +97,10 @@ async fn read_job(s3_client: &rusoto_s3::S3Client, bucket: &str, key: &String) -
fn validate_message(json_string: &Value, schema: &JSONSchema) -> Result<(), io::Error> {
info!("Validating message against JSON schema");
match schema.validate(json_string) {
Ok(_) => Ok(()),
Ok(_) => {
info!("Message is valid");
Ok(())
},
Err(errors) => {
for error in errors {
warn!("Validation error: {}", error);
......
......@@ -4,7 +4,7 @@ use std::io::Write;
use std::path::Path;
use chrono::Utc;
use log::info;
use log::{info, warn};
use serde::Serialize;
use tinytemplate::TinyTemplate;
......@@ -15,12 +15,19 @@ impl JobRequest {
pub fn create_job(&self, wd: &WorkingDirectory) {
let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) };
info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display());
fs::create_dir(&instance_wd.path).expect("Can't create working directory");
if instance_wd.path.exists() {
warn!("Job directory already exists, files will be overwritten");
fs::remove_dir_all(&instance_wd.path).expect("Delete existing directory");
}
fs::create_dir(&instance_wd.path).expect("Create working directory");
let header: Header = render_header(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param);
let vars: EnvVars = render_environment_variables(&&self);
let workflow: Workflow = render_nxf(&&self.pipeline_param, &wd.path);
let job = JobTemplate { header, callback, vars, workflow };
job.write(&instance_wd.path.join("job.sh")).expect("Can't write job script");
write_samplesheet(&&self.pipeline_param, &instance_wd);
write_config(&&self.pipeline_param.nxf_params_file, &instance_wd);
......@@ -42,11 +49,16 @@ impl JobTemplate {
.append(true)
.open(out_path)?;
[self.header.content, self.callback.content, self.vars.content, self.workflow.content].map(
|str| {
file.write_all(str.as_bytes()).expect("Can't write job");
}
);
let contents = [
self.header.content,
self.callback.content,
self.vars.content,
self.workflow.content,
];
for content in contents.iter() {
file.write_all(content.as_bytes())?;
}
Ok(())
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment