From 85df55ad4883a135e317c60be5b7d6d5c49168f1 Mon Sep 17 00:00:00 2001
From: Benjamin Wingfield <bwingfield@ebi.ac.uk>
Date: Tue, 20 Jun 2023 14:35:13 +0100
Subject: [PATCH] add db

---
 Cargo.lock                         | 89 ++++++++++++++++++++++++++++--
 Cargo.toml                         |  1 +
 data/db/setup.sql                  |  9 +++
 src/db.rs                          |  1 +
 src/db/open.rs                     | 14 +++++
 src/main.rs                        | 58 +++++++------------
 src/{request/mod.rs => request.rs} |  1 +
 src/request/message.rs             | 51 ++++++++++++-----
 src/request/read.rs                |  9 +++
 src/request/schema.rs              |  8 +--
 10 files changed, 182 insertions(+), 59 deletions(-)
 create mode 100644 data/db/setup.sql
 create mode 100644 src/db.rs
 create mode 100644 src/db/open.rs
 rename src/{request/mod.rs => request.rs} (69%)
 create mode 100644 src/request/read.rs

diff --git a/Cargo.lock b/Cargo.lock
index 7069ae3..8c3eb53 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -24,6 +24,12 @@ dependencies = [
  "memchr",
 ]
 
+[[package]]
+name = "allocator-api2"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "56fc6cf8dc8c4158eed8649f9b8b0ea1518eb62b544fe9490d66fa0b349eafe9"
+
 [[package]]
 name = "anstream"
 version = "0.3.2"
@@ -112,6 +118,12 @@ version = "1.3.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
 
+[[package]]
+name = "bitflags"
+version = "2.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6dbe3c979c178231552ecba20214a8272df4e09f232a87aef4320cf06539aded"
+
 [[package]]
 name = "bumpalo"
 version = "3.13.0"
@@ -161,7 +173,7 @@ checksum = "c1458a1df40e1e2afebb7ab60ce55c1fa8f431146205aa5f4887e0b111c27636"
 dependencies = [
  "anstream",
  "anstyle",
- "bitflags",
+ "bitflags 1.3.2",
  "clap_lex",
  "strsim",
 ]
@@ -233,6 +245,18 @@ dependencies = [
  "libc",
 ]
 
+[[package]]
+name = "fallible-iterator"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
+
+[[package]]
+name = "fallible-streaming-iterator"
+version = "0.1.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
+
 [[package]]
 name = "fancy-regex"
 version = "0.11.0"
@@ -354,6 +378,25 @@ version = "0.12.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
 
+[[package]]
+name = "hashbrown"
+version = "0.14.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
+dependencies = [
+ "ahash",
+ "allocator-api2",
+]
+
+[[package]]
+name = "hashlink"
+version = "0.8.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "312f66718a2d7789ffef4f4b7b213138ed9f1eb3aa1d0d82fc99f88fb3ffd26f"
+dependencies = [
+ "hashbrown 0.14.0",
+]
+
 [[package]]
 name = "hattivatti"
 version = "0.1.0"
@@ -363,6 +406,7 @@ dependencies = [
  "env_logger",
  "jsonschema",
  "log",
+ "rusqlite",
  "serde",
  "serde_json",
  "url",
@@ -470,7 +514,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99"
 dependencies = [
  "autocfg",
- "hashbrown",
+ "hashbrown 0.12.3",
 ]
 
 [[package]]
@@ -568,6 +612,17 @@ version = "0.2.146"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
 
+[[package]]
+name = "libsqlite3-sys"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326"
+dependencies = [
+ "cc",
+ "pkg-config",
+ "vcpkg",
+]
+
 [[package]]
 name = "linux-raw-sys"
 version = "0.3.8"
@@ -768,6 +823,12 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
 
+[[package]]
+name = "pkg-config"
+version = "0.3.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964"
+
 [[package]]
 name = "proc-macro2"
 version = "1.0.60"
@@ -792,7 +853,7 @@ version = "0.3.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29"
 dependencies = [
- "bitflags",
+ "bitflags 1.3.2",
 ]
 
 [[package]]
@@ -846,13 +907,27 @@ dependencies = [
  "winreg",
 ]
 
+[[package]]
+name = "rusqlite"
+version = "0.29.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2"
+dependencies = [
+ "bitflags 2.3.2",
+ "fallible-iterator",
+ "fallible-streaming-iterator",
+ "hashlink",
+ "libsqlite3-sys",
+ "smallvec",
+]
+
 [[package]]
 name = "rustix"
 version = "0.37.20"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "b96e891d04aa506a6d1f318d2771bcb1c7dfda84e126660ace067c9b474bb2c0"
 dependencies = [
- "bitflags",
+ "bitflags 1.3.2",
  "errno",
  "io-lifetimes",
  "libc",
@@ -1113,6 +1188,12 @@ version = "1.3.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "0fa2982af2eec27de306107c027578ff7f423d65f7250e40ce0fea8f45248b81"
 
+[[package]]
+name = "vcpkg"
+version = "0.2.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
+
 [[package]]
 name = "version_check"
 version = "0.9.4"
diff --git a/Cargo.toml b/Cargo.toml
index 5a49a43..965fffd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -14,3 +14,4 @@ jsonschema = "0.17.0"
 url = "2.4.0"
 anyhow = "1.0.71"
 clap = { version = "4.3.4", features = ["derive"] }
+rusqlite = { version = "0.29.0", features = ["bundled"] }
diff --git a/data/db/setup.sql b/data/db/setup.sql
new file mode 100644
index 0000000..ecca090
--- /dev/null
+++ b/data/db/setup.sql
@@ -0,0 +1,9 @@
+CREATE TABLE IF NOT EXISTS job (
+    job_id INTEGER PRIMARY KEY,
+    intervene_id TEXT AS (json_extract(Manifest, '$.pipeline_param.id')) UNIQUE,
+    manifest TEXT,
+    valid INTEGER,
+    valid_status TEXT,
+    submitted INTEGER,
+    created_at TEXT
+);
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..9272d74
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1 @@
+pub mod open;
\ No newline at end of file
diff --git a/src/db/open.rs b/src/db/open.rs
new file mode 100644
index 0000000..74ba218
--- /dev/null
+++ b/src/db/open.rs
@@ -0,0 +1,14 @@
+use std::path::Path;
+
+use anyhow::Result;
+use rusqlite::{Connection, OpenFlags};
+
+pub fn open_db(path: &Path) -> Result<Connection> {
+    // open flags changed to error if database doesn't exist
+    let db = Connection::open_with_flags(path,
+                                         OpenFlags::SQLITE_OPEN_READ_WRITE
+                                             | OpenFlags::SQLITE_OPEN_URI
+                                             | OpenFlags::SQLITE_OPEN_NO_MUTEX)?;
+    return Ok(db);
+}
+
diff --git a/src/main.rs b/src/main.rs
index 843b112..0b1ec04 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,13 +3,15 @@ use std::path::{Path, PathBuf};
 
 use log::{info, warn};
 use serde::{Deserialize, Serialize};
-use serde_json::{error, Result, Value};
 use serde_json::json;
 
-mod request;
-
 use clap::Parser;
-use crate::request::message::Message;
+use rusqlite::{Connection, OpenFlags};
+use crate::request::message::{JobRequest, Message};
+use anyhow::Result;
+
+mod db;
+mod request;
 
 #[derive(Parser, Debug)]
 #[command(name = "hattivatti")]
@@ -24,7 +26,9 @@ struct Args {
     #[arg(short, long)]
     schema_dir: PathBuf,
     #[arg(short, long)]
-    message_dir: PathBuf
+    message_dir: PathBuf,
+    #[arg(short, long)]
+    db_path: String
 }
 
 fn main() {
@@ -33,44 +37,24 @@ fn main() {
 
     let args = Args::parse();
 
+    let conn = db::open::open_db(Path::new( &args.db_path))
+        .expect("Database connection");
+
     let schema = request::schema::load_schema(args.schema_dir.as_path());
 
-    let m: Message = Message {
-        path: PathBuf::from(Path::new("/Users/bwingfield/Downloads/msgs/invalid_msg.json")),
-        compiled_schema: schema
-    };
+    let messages: Result<Vec<Message>> = request::message::from_dir(args.message_dir.as_path());
+
+    let mut list: Vec<JobRequest> = Vec::new();
 
-    let x = m.read();
-    println!("Valid serde? {:?}", x.is_some());
+    for message in messages.unwrap() {
+        let x: JobRequest = message.read(&schema).unwrap();
+        info!("{:#?}", x.pipeline_param);
+    }
 
 
-    // let valid_message = load_message(false);
-    // let result = schema.validate(&valid_message);
-    //
-    // if let Err(errors) = result {
-    //     for error in errors {
-    //         warn!("Message fails validation");
-    //         warn!("Validation error: {}", error);
-    //         warn!("Instance path: {}", error.instance_path);
-    //     }
-    // } else {
-    //     info!("Message passes validation")
-    // }
+    //let x = m.read();
+    //println!("Valid serde? {:?}", x.is_ok());
 
 
     info!("hattivatti finished")
 }
-
-fn load_message(valid: bool) -> Value {
-    let valid_path = Path::new("/Users/bwingfield/Downloads/msgs/valid_msg.json");
-    let invalid_path = Path::new("/Users/bwingfield/Downloads/msgs/invalid_msg.json");
-    let path = if valid {
-        valid_path
-    } else {
-        invalid_path
-    };
-
-    let string_message = fs::read_to_string(&path).expect("File");
-    let message: Value = serde_json::from_str(&string_message).expect("Valid JSON");
-    return message;
-}
\ No newline at end of file
diff --git a/src/request/mod.rs b/src/request.rs
similarity index 69%
rename from src/request/mod.rs
rename to src/request.rs
index 01f31ce..03761bf 100644
--- a/src/request/mod.rs
+++ b/src/request.rs
@@ -1,2 +1,3 @@
+pub mod read;
 pub mod message;
 pub mod schema;
\ No newline at end of file
diff --git a/src/request/message.rs b/src/request/message.rs
index 8b907e8..9f87fed 100644
--- a/src/request/message.rs
+++ b/src/request/message.rs
@@ -1,10 +1,13 @@
 use std::fs;
 use std::path::{Path, PathBuf};
+
 use jsonschema::JSONSchema;
 use log::{info, warn};
-use serde_json::Value;
 use serde::{Deserialize, Serialize};
-
+use serde_json::Value;
+use crate::request::read;
+use anyhow::Result;
+use crate::request::message::MessageError::JSONValidationError;
 
 #[derive(Debug)]
 pub enum MessageError {
@@ -15,31 +18,50 @@ pub enum MessageError {
 }
 
 pub struct Message {
-    pub path: PathBuf,
-    pub compiled_schema: JSONSchema,
+    pub path: PathBuf
+}
+
+pub fn from_dir(dir: &Path) -> Result<Vec<Message>> {
+    let mut list: Vec<Message> = Vec::new();
+
+    let paths= read::get_message_paths(dir)?;
+
+    for path in paths {
+        let m = Message { path };
+        list.push(m);
+    }
+
+    Ok(list)
 }
 
 impl Message {
-    pub fn read(&self) -> Result<JobRequest, MessageError> {
+    pub fn read(&self, schema: &JSONSchema) -> Result<JobRequest, MessageError> {
         let json: Value = self.parse_untyped_json()?;
 
-        match self.validate(&json) {
+        match self.validate(&json, schema) {
             Ok(_) => {
                 info!("Message is valid");
                 self.parse_json(json)
             }
             Err(err) => {
                 warn!("Message fails validation");
+                warn!("{:?}", err);
                 Err(err)
             }
         }
     }
 
-    fn validate(&self, json_string: &Value) -> Result<(), MessageError> {
-        info!("Validating raw message against JSON schema");
-        match self.compiled_schema.validate(json_string) {
+    fn validate(&self, json_string: &Value, schema: &JSONSchema) -> Result<(), MessageError> {
+        info!("Validating raw message against JSON validate");
+        match schema.validate(json_string) {
             Ok(_) => Ok(()),
-            Err(_) => Err(MessageError::JSONValidationError),
+            Err(errors) => {
+                for error in errors {
+                    warn!("Validation error: {}", error);
+                    warn!("Instance path: {}", error.instance_path);
+                }
+                Err(JSONValidationError)
+            }
         }
     }
 
@@ -62,6 +84,7 @@ impl Message {
     fn parse_untyped_json(&self) -> Result<Value, MessageError> {
         info!("Parsing JSON into untyped structure");
         let json_string = self.read_file()?;
+        info!("{}", json_string);
         // from_value is a generic function, so request Value (generic json) specifically
         serde_json::from_str::<Value>(&json_string)
             .map_err(|_| MessageError::JSONDecodeError)
@@ -70,7 +93,7 @@ impl Message {
 
 
 #[derive(Debug, Deserialize, Serialize)]
-struct PipelineParam {
+pub struct PipelineParam {
     id: String,
     target_genomes: Vec<TargetGenome>,
     nxf_params_file: NxfParamsFile,
@@ -94,13 +117,13 @@ struct NxfParamsFile {
 }
 
 #[derive(Debug, Deserialize, Serialize)]
-struct GlobusDetails {
+pub struct GlobusDetails {
     guest_collection_id: String,
     dir_path_on_guest_collection: String,
 }
 
 #[derive(Debug, Deserialize, Serialize)]
 pub struct JobRequest {
-    pipeline_param: PipelineParam,
-    globus_details: GlobusDetails,
+    pub pipeline_param: PipelineParam,
+    pub globus_details: GlobusDetails,
 }
diff --git a/src/request/read.rs b/src/request/read.rs
new file mode 100644
index 0000000..fbcef37
--- /dev/null
+++ b/src/request/read.rs
@@ -0,0 +1,9 @@
+use std::{fs, io};
+use std::io::Error;
+use std::path::{Path, PathBuf};
+
+pub fn get_message_paths(dir: &Path) -> Result<Vec<PathBuf>, Error> {
+    fs::read_dir(dir)?
+        .map(|res| res.map(|e| e.path()))
+        .collect::<Result<Vec<PathBuf>, io::Error>>()
+}
\ No newline at end of file
diff --git a/src/request/schema.rs b/src/request/schema.rs
index ed20978..f93d6f6 100644
--- a/src/request/schema.rs
+++ b/src/request/schema.rs
@@ -4,8 +4,8 @@ use std::sync::Arc;
 
 use anyhow::anyhow;
 use jsonschema::{JSONSchema, SchemaResolver, SchemaResolverError};
-use log::{info, warn};
-use serde_json::{json, Value};
+use log::{info};
+use serde_json::{Value};
 use url::Url;
 
 pub fn load_schema(schema_dir: &Path) -> JSONSchema {
@@ -34,8 +34,8 @@ fn compile_schema(schema: &Value, schema_dir: &Path) -> JSONSchema {
 }
 
 /*
-Set up a resolver that will work with local JSON schema
-The local schema contain relative references to local files in the same directory
+Set up a resolver that will work with local JSON validate
+The local validate contain relative references to local files in the same directory
 In the future we should change to online schemas with absolute references
 */
 struct LocalResolver {
-- 
GitLab