Skip to content
Snippets Groups Projects
Commit bb01d4f2 authored by Benjamin Wingfield's avatar Benjamin Wingfield
Browse files

add cli and message validation

parent 54bc3f98
No related branches found
No related tags found
No related merge requests found
This diff is collapsed.
...@@ -9,3 +9,8 @@ edition = "2021" ...@@ -9,3 +9,8 @@ edition = "2021"
serde_json = "1.0.97" serde_json = "1.0.97"
log = "0.4" log = "0.4"
env_logger = "0.10.0" env_logger = "0.10.0"
serde = { version = "1.0.164", features = ["derive"]}
jsonschema = "0.17.0"
url = "2.4.0"
anyhow = "1.0.71"
clap = { version = "4.3.4", features = ["derive"] }
use std::fs; use std::fs;
use std::path::Path; use std::path::{Path, PathBuf};
use log::{info,warn};
use log::{info, warn};
use serde::{Deserialize, Serialize};
use serde_json::{error, Result, Value};
use serde_json::json;
mod request;
use clap::Parser;
use crate::request::message::Message;
#[derive(Parser, Debug)]
#[command(name = "hattivatti")]
#[command(author = "Benjamin Wingfield <bwingfield@ebi.ac.uk>")]
#[command(version = "0.1")]
#[command(about = "Submit pgsc_calc jobs to Puhti")]
#[command(long_about =
"This program reads job request messages from the INTERVENE backend and submits a sensitive data
processing task to the SLURM scheduler. The program also monitors the state of submitted jobs,
and notifies the INTERVENE backend when a requested job has succeeded or failed.")]
struct Args {
#[arg(short, long)]
schema_dir: PathBuf,
#[arg(short, long)]
message_dir: PathBuf
}
fn main() { fn main() {
env_logger::init(); env_logger::init();
info!("terve! starting up :)"); info!("terve! starting up :)");
let path = Path::new("/Users/bwingfield/Downloads/msgs/msg.jsn"); let args = Args::parse();
let x = read_job_request(&path);
println!("{}", x.unwrap_or_default());
} let schema = request::schema::load_schema(args.schema_dir.as_path());
let m: Message = Message {
path: PathBuf::from(Path::new("/Users/bwingfield/Downloads/msgs/invalid_msg.json")),
compiled_schema: schema
};
fn read_job_request(file_path: &Path) -> Option<String> { let x = m.read();
return match fs::read_to_string(file_path) { println!("Valid serde? {:?}", x.is_some());
Ok(string) => {
info!("Reading job request: {}", file_path.display());
Some(string) // let valid_message = load_message(false);
} // let result = schema.validate(&valid_message);
Err(_) => { //
warn!("Can't read job request at path: {}", file_path.display()); // if let Err(errors) = result {
None // for error in errors {
} // warn!("Message fails validation");
} // warn!("Validation error: {}", error);
// warn!("Instance path: {}", error.instance_path);
// }
// } else {
// info!("Message passes validation")
// }
info!("hattivatti finished")
} }
fn load_message(valid: bool) -> Value {
let valid_path = Path::new("/Users/bwingfield/Downloads/msgs/valid_msg.json");
let invalid_path = Path::new("/Users/bwingfield/Downloads/msgs/invalid_msg.json");
let path = if valid {
valid_path
} else {
invalid_path
};
let string_message = fs::read_to_string(&path).expect("File");
let message: Value = serde_json::from_str(&string_message).expect("Valid JSON");
return message;
}
\ No newline at end of file
use std::{error, fs, io, panic};
use std::path::{Path, PathBuf};
use jsonschema::{ErrorIterator, JSONSchema, ValidationError};
use log::{info, warn};
use serde::{Deserialize, Serialize};
use serde_json::{Error, Value};
enum MessageError {
ValidationFailed,
SerializationError,
ParseError,
}
pub struct Message {
pub path: PathBuf,
pub compiled_schema: JSONSchema,
}
impl Message {
pub fn read(&self) -> Option<JobRequest> {
let valid: bool = self.validate().is_ok();
if valid {
info!("Message is valid");
} else {
warning!("Message is invalid");
}
// if validation fails, parsing into strong types will also fail
// todo: change from option to Result
return self.parse_json();
}
fn validate(&self) -> Result<(), MessageError> {
info!("Validating raw message against JSON schema");
let job = self.parse_untyped_json().ok_or(MessageError::ParseError)?;
let value = serde_json::to_value(&job).map_err(|_| MessageError::SerializationError)?;
self.compiled_schema.validate(&value).map_err(|_| MessageError::ValidationFailed)?;
Ok(())
}
fn read_file(&self) -> Result<String, io::Error> {
let path = self.path.as_path();
info!("Reading file at {}", path.display());
fs::read_to_string(path).map_err(|err| {
warn!("Can't read message job request at path {}: {}", path.display(), err);
err
})
}
fn parse_json(&self) -> Option<JobRequest> {
info!("Deserialising JSON into typed Rust object");
self.read_file()
.ok()
.and_then(|string_json| {
serde_json::from_str(&string_json).map_err(|err| {
warn!("Error parsing JSON: {}", err);
}).ok()
})
}
fn parse_untyped_json(&self) -> Option<Value> {
info!("Parsing JSON into untyped structure");
self.read_file()
.ok()
.and_then(|string_json| {
serde_json::from_str(&string_json).map_err(|err| {
warn!("Error parsing JSON: {}", err);
}).ok()
})
}
}
#[derive(Debug, Deserialize, Serialize)]
struct PipelineParam {
id: String,
target_genomes: Vec<TargetGenome>,
nxf_params_file: NxfParamsFile,
nxf_work: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct TargetGenome {
pvar: String,
pgen: String,
psam: String,
sampleset: String,
chrom: Option<String>,
}
#[derive(Debug, Deserialize, Serialize)]
struct NxfParamsFile {
pgs_id: String,
format: String,
target_build: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct GlobusDetails {
guest_collection_id: String,
dir_path_on_guest_collection: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub struct JobRequest {
pipeline_param: PipelineParam,
globus_details: GlobusDetails,
}
...@@ -4,11 +4,12 @@ use std::sync::Arc; ...@@ -4,11 +4,12 @@ use std::sync::Arc;
use anyhow::anyhow; use anyhow::anyhow;
use jsonschema::{JSONSchema, SchemaResolver, SchemaResolverError}; use jsonschema::{JSONSchema, SchemaResolver, SchemaResolverError};
use log::warn; use log::{info, warn};
use serde_json::{json, Value}; use serde_json::{json, Value};
use url::Url; use url::Url;
pub fn load_schema(schema_dir: &Path) -> JSONSchema { pub fn load_schema(schema_dir: &Path) -> JSONSchema {
info!("Reading schema from {}", schema_dir.display());
let schema_json = read_schema(schema_dir); let schema_json = read_schema(schema_dir);
compile_schema(&schema_json, schema_dir) compile_schema(&schema_json, schema_dir)
} }
...@@ -25,6 +26,7 @@ fn read_json_from_path(path: &Path) -> Value { ...@@ -25,6 +26,7 @@ fn read_json_from_path(path: &Path) -> Value {
fn compile_schema(schema: &Value, schema_dir: &Path) -> JSONSchema { fn compile_schema(schema: &Value, schema_dir: &Path) -> JSONSchema {
let resolver = LocalResolver { schema_dir: PathBuf::from(schema_dir) }; let resolver = LocalResolver { schema_dir: PathBuf::from(schema_dir) };
info!("Compiling JSON schema");
JSONSchema::options() JSONSchema::options()
.with_resolver(resolver) .with_resolver(resolver)
.compile(schema) .compile(schema)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment