-
Benjamin Wingfield authored45b6379e
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main.rs 2.77 KiB
use std::fs;
use std::path::{PathBuf};
use clap::Parser;
use log::info;
use rusqlite::Connection;
use crate::db::ingest::message::ingest_message;
use crate::db::job::load::get_valid_jobs;
use crate::slurm::job_request::JobRequest;
mod db;
mod request;
mod slurm;
#[derive(Parser, Debug)]
#[command(name = "hattivatti")]
#[command(author = "Benjamin Wingfield <bwingfield@ebi.ac.uk>")]
#[command(version = "0.1")]
#[command(about = "Submit pgsc_calc jobs to Puhti")]
#[command(long_about =
"This program reads job request messages from the INTERVENE backend and submits a sensitive data
processing task to the SLURM scheduler. The program also monitors the state of submitted jobs,
and notifies the INTERVENE backend when a requested job has succeeded or failed.")]
struct Args {
/// A directory path that contains a set of JSON schema to validate messages in the job queue
#[arg(short, long)]
schema_dir: PathBuf,
/// A directory where hattivatti can store jobs before submitting them to the SLURM scheduler
#[arg(short, long)]
work_dir: PathBuf,
/// Read messages from the queue and create SLURM job files, but don't submit them to the SLURM scheduler
#[arg(long)]
dry_run: bool
}
pub struct WorkingDirectory {
path: PathBuf,
}
#[tokio::main]
async fn main() {
env_logger::init();
info!("terve! starting up :)");
let args = Args::parse();
let wd = WorkingDirectory { path: args.work_dir };
fs::create_dir_all(&wd.path).expect("Can't create working directory");
let conn: Connection = db::open::open_db(&wd)
.expect("Database connection");
let schema = request::schema::load_schema(args.schema_dir.as_path());
let s3_client = request::message::make_allas_client();
let messages = request::message::fetch_all(&s3_client, &schema).await;
if let Some(messages) = messages {
for message in messages {
// todo: check result of loading before deleting
// todo: rollback insertions and updates with dry run
// todo: prevent deletion with dry run
let _ = ingest_message(&conn, &message);
if !args.dry_run {
message.delete(&s3_client).await;
} else {
info!("--dry-run set, not deleting message in queue");
}
}
} else {
info!("No new jobs in queue");
}
let jobs: Option<Vec<JobRequest>> = get_valid_jobs(&conn, args.dry_run);
if let Some(jobs) = jobs {
for job in jobs {
job.create_job(&wd);
if !args.dry_run {
job.stage(&conn);
} else {
info!("--dry-run set, not submitting job to slurm");
}
}
} else {
info!("No jobs to load from database");
}
info!("finished :D")
}