Skip to content
Snippets Groups Projects
Commit 5e357bdc authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

add docs

parent 6bc0e7e1
No related branches found
No related tags found
No related merge requests found
/target
/docs
.DS_Store
.idea
//! All job state is stored in a SQLite database
/// Connect to a SQLite database
pub mod open;
pub mod job;
/// Stream and validate job request messages
pub mod ingest;
\ No newline at end of file
......@@ -4,6 +4,10 @@ use rusqlite::Connection;
use crate::request::message::AllasMessage;
/// Load an AllasMessage into a database
///
/// The AllasMessage is stored in a JSON column and the schema will automatically extract the
/// INTERVENE ID and add an insertion timestamp
pub fn ingest_message(conn: &Connection, message: &AllasMessage) -> Result<()> {
info!("Adding {} to db", &message.key);
let json = &message.content;
......
//! Job loading, updating, and submission to SLURM scheduler
//!
//! Takes care of deserialising unsubmitted jobs from the database into a JobRequest.
//! Also responsible for updating the database once JobRequests are staged (rendered templates written
//! to disk) or submitted (sbatch system command run).
pub mod load;
pub mod update;
pub mod state;
\ No newline at end of file
......@@ -4,6 +4,7 @@ use serde_json::Result as JsonResult;
use crate::slurm::job_request::JobRequest;
/// Fetch and deserialise valid unsubmitted jobs from the database
pub fn get_valid_jobs(conn: &Connection, dry_run: bool) -> Option<Vec<JobRequest>> {
let mut stmt = conn.prepare("SELECT manifest FROM job WHERE valid == 1 AND staged == 0 AND submitted == 0").expect("");
let rows = stmt.query_map([], |row| row.get(0)).expect("");
......@@ -24,6 +25,7 @@ pub fn get_valid_jobs(conn: &Connection, dry_run: bool) -> Option<Vec<JobRequest
}
}
/// Deserialise validated JSON into a [JobRequest]
fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> {
let mut jobs: Vec<JobRequest> = Vec::new();
for string in json_strings {
......@@ -33,6 +35,7 @@ fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> {
Ok(jobs)
}
/// Release (commit transaction) or rollback (abort transaction) messages to the database
fn release_or_rollback(conn: &Connection, dry_run: bool) {
match dry_run {
true => {
......
......@@ -3,8 +3,14 @@ pub enum JobState {
Submitted
}
/// A simple way to keep track of job state.
///
/// Currently only two states are supported: staged (rendered templates written to disk) and
/// submitted (after sbatch system command exits 0). Other job states could include things like
/// INITIALISED (request received) or PENDING (parsing squeue output) in the future.
impl JobState {
/// db columns are all lower case, enum used in sql statement
/// TODO: migrate to a single enum column called "state"
pub fn to_string(&self) -> &str {
match self {
JobState::Staged => "staged",
......
use log::info;
use crate::WorkingDirectory;
/// Open a connection to an existing database, or create a new one if it doesn't exist
pub fn open_db(wd: &WorkingDirectory) -> rusqlite::Result<rusqlite::Connection> {
let path = &wd.path.join("hattivatti.db");
if !path.exists() { info!("Creating new database {}", path.display()) }
let conn = rusqlite::Connection::open(&path)?;
/// A SQLite database schema that stores job status
static SCHEMA: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/db/schema.sql"));
conn.execute(SCHEMA, [], )?;
......
//! `hattivatti` submits [`pgsc_calc`](https://github.com/PGScatalog/pgsc_calc) jobs to
//! [Puhti HPC](https://docs.csc.fi/computing/systems-puhti/) at CSC. Jobs are configured to execute
//! in a secure way because genomes are sensitive data. `hattivatti` does the following:
//!
//! - Check [Allas](https://docs.csc.fi/data/Allas/) bucket for messages (JSON files)
//! - Stream messages and validate them with JSON Schema
//! - Ingest into SQLite database and delete message in bucket
//! - Load valid messages from database and deserialise into [JobRequest]
//! - Render job templates to [WorkingDirectory]
//! - Submit jobs with sbatch system command and update the database with `SLURM_JOB_ID`
#![warn(missing_docs)]
use std::fs;
use std::path::{PathBuf};
......@@ -22,6 +35,7 @@ mod slurm;
"This program reads job request messages from the INTERVENE backend and submits a sensitive data
processing task to the SLURM scheduler. The program also monitors the state of submitted jobs,
and notifies the INTERVENE backend when a requested job has succeeded or failed.")]
/// CLI arguments (automatically parsed by CLAP)
struct Args {
/// A directory path that contains a set of JSON schema to validate messages in the job queue
#[arg(short, long)]
......@@ -34,10 +48,19 @@ struct Args {
dry_run: bool
}
/// A directory for storing working data
///
/// Working data includes:
/// - a SQLite database to store job request content and status
/// - Rendered SLURM templates for each job request (split into directories based on INTERVENE ID)
///
/// TODO:
/// - [ ] Clean up completed job templates
pub struct WorkingDirectory {
path: PathBuf,
}
/// Entrypoint to the program
#[tokio::main]
async fn main() {
env_logger::init();
......
//! Read job request messages from a bucket and validate their contents
pub mod message;
pub mod schema;
\ No newline at end of file
......@@ -7,6 +7,10 @@ use rusoto_s3::S3;
use serde_json::Value;
use tokio::io::AsyncReadExt;
/// Create an s3 client capable of connecting to Allas
///
/// [Allas](https://docs.csc.fi/data/Allas/) is an s3-compatible object store hosted at CSC and
/// requires some configuration.
pub fn make_allas_client() -> rusoto_s3::S3Client {
let region = rusoto_core::Region::Custom {
name: "us-east-1".to_owned(),
......@@ -25,6 +29,10 @@ pub fn make_allas_client() -> rusoto_s3::S3Client {
region)
}
/// A message in the work queue on Allas
///
/// Messages may be invalid or valid. All messages are ingested into the database, but only valid
/// messages are loaded for submission after being ingested.
pub struct AllasMessage {
pub bucket: String,
pub key: String,
......@@ -32,6 +40,7 @@ pub struct AllasMessage {
pub valid: bool
}
/// Fetch all messages in the work queue on Allas
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) -> Option<Vec<AllasMessage>> {
let bucket = "intervene-dev";
let prefix = "job-queue";
......@@ -70,6 +79,7 @@ pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) ->
Some(jobs)
}
/// Stream job content (JSON) from a bucket object into a String
async fn read_job(s3_client: &rusoto_s3::S3Client, bucket: &str, key: &String) -> String {
let get_object_request = rusoto_s3::GetObjectRequest {
bucket: bucket.into(),
......@@ -93,7 +103,7 @@ async fn read_job(s3_client: &rusoto_s3::S3Client, bucket: &str, key: &String) -
String::from_utf8_lossy(&body).to_string()
}
/// Validate job content with a JSON schema
fn validate_message(json_string: &Value, schema: &JSONSchema) -> Result<(), io::Error> {
info!("Validating message against JSON schema");
match schema.validate(json_string) {
......@@ -113,6 +123,11 @@ fn validate_message(json_string: &Value, schema: &JSONSchema) -> Result<(), io::
}
impl AllasMessage {
/// Create a new AllasMessage. Message content and a schema reference must be supplied at
/// creation time.
///
/// It's important to keep track of an AllasMessage's bucket and key so it can be deleted
/// after being ingested into the database.
pub fn new(content: String, bucket: String, key: String, schema: &JSONSchema) -> AllasMessage {
info!("Parsing JSON into untyped structure");
let value: Value = serde_json::from_str(&content).expect("Valid JSON");
......@@ -120,6 +135,11 @@ impl AllasMessage {
AllasMessage { content, bucket, key, valid }
}
/// Delete messages in the work queue
///
/// It's important to delete after the job has been ingested into the database. Jobs in the
/// database must have unique identifiers. Violating this constraint will currently cause a
/// panic.
pub async fn delete(&self, s3_client: &rusoto_s3::S3Client) {
let bucket = self.bucket.to_string();
let key = self.key.to_string();
......
......@@ -8,22 +8,26 @@ use log::{info};
use serde_json::{Value};
use url::Url;
/// Read, resolve, and compile the INTERVENE API JSON schema
pub fn load_schema(schema_dir: &Path) -> JSONSchema {
info!("Reading schema from {}", schema_dir.display());
let schema_json = read_schema(schema_dir);
compile_schema(&schema_json, schema_dir)
}
/// Read the main INTERVENE API schema document
fn read_schema(schema_dir: &Path) -> Value {
let schema_path = schema_dir.join("api.json");
read_json_from_path(schema_path.as_path())
}
/// Read and load valid generic JSON
fn read_json_from_path(path: &Path) -> Value {
let json_string = fs::read_to_string(path).expect("Valid path");
serde_json::from_str(&json_string).expect("Valid JSON")
}
/// Resolve and compile a set of JSON schema for fast validation
fn compile_schema(schema: &Value, schema_dir: &Path) -> JSONSchema {
let resolver = LocalResolver { schema_dir: PathBuf::from(schema_dir) };
info!("Compiling JSON schema");
......@@ -33,16 +37,17 @@ fn compile_schema(schema: &Value, schema_dir: &Path) -> JSONSchema {
.expect("Valid schema")
}
/*
Set up a resolver that will work with local JSON validate
The local validate contain relative references to local files in the same directory
In the future we should change to online schemas with absolute references
*/
/// A [SchemaResolver] that supports local JSON references
///
/// The local validate contain relative references to local files in the same directory
/// In the future we should change to online schemas with absolute references
struct LocalResolver {
schema_dir: PathBuf,
}
impl SchemaResolver for LocalResolver {
/// Resolve linked schema, assume linked schema are present in the same directory as the parent
/// schema
fn resolve(&self, _root_schema: &Value, url: &Url, _original_reference: &str) -> Result<Arc<Value>, SchemaResolverError> {
match url.scheme() {
"json-schema" => {
......
//! Load job configuration templates and render them
/// Valid JSON messages are deserialised into a set of structs defined here
pub mod job_request;
/// Read configuration templates and render them with message content
pub mod job;
\ No newline at end of file
......@@ -11,6 +11,13 @@ use tinytemplate::TinyTemplate;
use crate::slurm::job_request::{JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
/// A JobPath is the path to a job script that's submitted to SLURM via sbatch
///
/// A JobPath **requires** the following files in the same directory:
/// - AllasConfig -> allas.config
/// - target_genomes -> input.json
/// - NxfParamsFile -> params.json
// TODO: add these paths to the struct to make this clearer
pub struct JobPath {
pub path: PathBuf,
}
......@@ -42,6 +49,7 @@ impl JobRequest {
}
}
/// All rendered data necessary to submit an INTERVENE pgsc_calc job to SLURM
struct JobTemplate {
header: Header,
callback: Callback,
......@@ -50,12 +58,14 @@ struct JobTemplate {
}
impl JobTemplate {
/// Write complete job script to disk by appending rendered template sections to the file
fn write(self, out_path: &Path) -> Result<(), io::Error> {
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(out_path)?;
// order is important when writing the file
let contents = [
self.header.content,
self.callback.content,
......@@ -71,22 +81,52 @@ impl JobTemplate {
}
}
/// Rendered HTTP callback
///
/// Uses curl to do a HTTP POST to the INTERVENE backend with job status. Currently supports two
/// states depending on exit status: 0 (succeeded) or not 0 (failed). Uses a bash trap to callback
/// when an error happens.
struct Callback {
content: String,
}
/// Rendered SBATCH header
///
/// SLURM jobs options can be parsed by sbatch using #SBATCH headers [before executable commands](https://slurm.schedmd.com/sbatch.html#SECTION_DESCRIPTION).
/// Parts of the header should be set from message parameters, metadata, or CLI options, but only
/// some are only implemented:
/// - [X] job name
/// - [ ] queue / partition (small)
/// - [X] job time
/// - [ ] local node storage (256gb)
/// - [ ] job RAM
/// - [ ] account for billing usage
///
/// Other options shouldn't be changed:
/// - exclusive node execution
/// - exporting all environment variables
struct Header {
content: String,
}
/// Rendered environment variables section
///
/// Environment variables are used to control nextflow execution and the globus transfer.
struct EnvVars {
content: String,
}
/// Rendered workflow commands
///
/// Workflow commands include:
/// - loading dependencies using environment modules
/// - staging sensitive data to local node storage with a HTTP globus transfer
/// - running pgsc_calc with job-specific configuration
struct Workflow {
content: String,
}
/// Rendering context for header
#[derive(Serialize)]
struct HeaderContext {
name: String,
......@@ -94,6 +134,7 @@ struct HeaderContext {
time_now: String,
}
/// Rendering context for environment variables
#[derive(Serialize)]
struct EnvVarContext {
globus_base_url: String,
......@@ -101,6 +142,7 @@ struct EnvVarContext {
message: String,
}
/// Rendering context for workflow
#[derive(Serialize)]
struct NextflowContext {
name: String,
......@@ -108,11 +150,13 @@ struct NextflowContext {
pgsc_calc_dir: String,
}
/// Rendering context for callback
#[derive(Serialize)]
struct CallbackContext {
name: String,
}
/// Write nextflow parameters to working directory
fn write_config(nxf_params: &NxfParamsFile, wd: &WorkingDirectory) {
let params_file: String = serde_json::to_string(nxf_params).expect("Deserialised");
let out_path = wd.path.join("params.json");
......@@ -121,6 +165,7 @@ fn write_config(nxf_params: &NxfParamsFile, wd: &WorkingDirectory) {
}
/// Extract the target_genomes object to a JSON file (`pgsc_calc --input` parameter)
fn write_samplesheet(param: &PipelineParam, wd: &WorkingDirectory) {
let genomes: &Vec<TargetGenome> = &param.target_genomes;
let samplesheet: String = serde_json::to_string(genomes).expect("Deserialised");
......@@ -129,6 +174,7 @@ fn write_samplesheet(param: &PipelineParam, wd: &WorkingDirectory) {
fs::write(out_path, samplesheet).expect("Can't write file");
}
/// Write static Allas configuration to the working directory
fn write_allas(wd: &WorkingDirectory) {
let allas: AllasConfig = allas_config();
let out_path = wd.path.join("allas.config");
......@@ -136,7 +182,9 @@ fn write_allas(wd: &WorkingDirectory) {
fs::write(out_path, allas.content).expect("Can't write file");
}
/// Render the SBATCH header using TinyTemplate
fn render_header(param: &PipelineParam) -> Header {
/// included header template
static HEADER: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/header.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("header", HEADER).expect("Template");
......@@ -151,7 +199,9 @@ fn render_header(param: &PipelineParam) -> Header {
Header { content: tt.render("header", &context).expect("Rendered document") }
}
/// Render environment variables using TinyTemplate
fn render_environment_variables(request: &JobRequest) -> EnvVars {
/// included environment variables template
static ENV_VARS: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/env_vars.txt"));
let mut tt = TinyTemplate::new();
// html escape breaks JSON
......@@ -167,19 +217,24 @@ fn render_environment_variables(request: &JobRequest) -> EnvVars {
EnvVars { content: tt.render("env_var", &context).expect("Rendered document") }
}
/// Render the workflow commands using TinyTemplate
fn render_nxf(param: &PipelineParam, work_dir: &Path) -> Workflow {
/// included workflow template
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("nxf", NXF).expect("Template");
let name: &String = &param.id;
let wd = work_dir.to_str().expect("path").to_string();
// todo: make dynamic based on deployment namespace
/// installation directory of pgsc_calc (TODO: make this a parameter)
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(), work_dir: wd, pgsc_calc_dir: PGSC_CALC_DIR.to_string() };
Workflow { content: tt.render("nxf", &context).expect("Rendered nextflow") }
}
/// Render the callback using TinyTemplate
fn render_callback(param: &PipelineParam) -> Callback {
/// included callback template
static CALLBACK: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/callback.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("callback", CALLBACK).expect("Template");
......@@ -188,11 +243,14 @@ fn render_callback(param: &PipelineParam) -> Callback {
Callback { content: tt.render("callback", &context).expect("Rendered callback") }
}
/// Static nextflow configuration for publishing results to Allas
struct AllasConfig {
content: String,
}
/// Load static allas configuration
fn allas_config() -> AllasConfig {
/// included allas configuration (static)
static ALLAS: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/allas.config"));
AllasConfig { content: ALLAS.to_string() }
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment