From ee28abae97acf6d096e01386b062b2e36451db40 Mon Sep 17 00:00:00 2001
From: Benjamin Wingfield <bwingfield@ebi.ac.uk>
Date: Tue, 20 Jun 2023 17:46:00 +0100
Subject: [PATCH] load jobs into db

---
 data/db/setup.sql      |  3 +--
 src/db.rs              |  3 ++-
 src/db/job.rs          | 35 +++++++++++++++++++++++++++++++++++
 src/main.rs            | 28 ++++++++++++++--------------
 src/request.rs         |  3 ++-
 src/request/job.rs     | 37 +++++++++++++++++++++++++++++++++++++
 src/request/message.rs | 39 +--------------------------------------
 7 files changed, 92 insertions(+), 56 deletions(-)
 create mode 100644 src/db/job.rs
 create mode 100644 src/request/job.rs

diff --git a/data/db/setup.sql b/data/db/setup.sql
index ecca090..6fcd48c 100644
--- a/data/db/setup.sql
+++ b/data/db/setup.sql
@@ -3,7 +3,6 @@ CREATE TABLE IF NOT EXISTS job (
     intervene_id TEXT AS (json_extract(Manifest, '$.pipeline_param.id')) UNIQUE,
     manifest TEXT,
     valid INTEGER,
-    valid_status TEXT,
     submitted INTEGER,
-    created_at TEXT
+    created_at TEXT DEFAULT CURRENT_TIMESTAMP
 );
diff --git a/src/db.rs b/src/db.rs
index 9272d74..e2e891a 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -1 +1,2 @@
-pub mod open;
\ No newline at end of file
+pub mod open;
+pub mod job;
\ No newline at end of file
diff --git a/src/db/job.rs b/src/db/job.rs
new file mode 100644
index 0000000..4b1a25d
--- /dev/null
+++ b/src/db/job.rs
@@ -0,0 +1,35 @@
+use std::fs;
+use std::path::Path;
+use log::{info, warn};
+use rusqlite::Connection;
+use crate::request::job::JobRequest;
+use anyhow::Result;
+
+
+use crate::request::message::MessageError;
+
+// once a message is read, start
+pub fn add_job(conn: &Connection, job: Result<JobRequest, MessageError>, path: &Path) -> Result<()> {
+    info!("Adding job to db");
+    // read raw message content again to store in db
+    let json: String = fs::read_to_string(path)?;
+    let valid: bool = job.is_ok();
+    let is_submitted: bool = false;
+
+    conn.execute(
+        "INSERT INTO job (manifest, valid, submitted) VALUES (?1, ?2, ?3)",
+        (json, valid, is_submitted)
+    )?;
+
+    cleanup_manifest(path);
+
+    Ok(())
+}
+
+// once committed the the database, the original manifest is deleted
+fn cleanup_manifest(path: &Path) {
+    match fs::remove_file(path) {
+        Ok(()) => info!("{} deleted", path.display()),
+        Err(err) => warn!("Couldn't delete {}, {}", path.display(), err)
+    }
+}
diff --git a/src/main.rs b/src/main.rs
index 0b1ec04..5b83822 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,14 +1,14 @@
-use std::fs;
 use std::path::{Path, PathBuf};
 
-use log::{info, warn};
-use serde::{Deserialize, Serialize};
-use serde_json::json;
-
-use clap::Parser;
-use rusqlite::{Connection, OpenFlags};
-use crate::request::message::{JobRequest, Message};
 use anyhow::Result;
+use clap::Parser;
+use log::info;
+use rusqlite::Connection;
+
+use request::job::JobRequest;
+
+use crate::db::job::add_job;
+use crate::request::message::{Message, MessageError};
 
 mod db;
 mod request;
@@ -28,7 +28,7 @@ struct Args {
     #[arg(short, long)]
     message_dir: PathBuf,
     #[arg(short, long)]
-    db_path: String
+    db_path: String,
 }
 
 fn main() {
@@ -37,20 +37,20 @@ fn main() {
 
     let args = Args::parse();
 
-    let conn = db::open::open_db(Path::new( &args.db_path))
+    let conn: Connection = db::open::open_db(Path::new(&args.db_path))
         .expect("Database connection");
 
     let schema = request::schema::load_schema(args.schema_dir.as_path());
 
     let messages: Result<Vec<Message>> = request::message::from_dir(args.message_dir.as_path());
 
-    let mut list: Vec<JobRequest> = Vec::new();
-
     for message in messages.unwrap() {
-        let x: JobRequest = message.read(&schema).unwrap();
-        info!("{:#?}", x.pipeline_param);
+        let job: Result<JobRequest, MessageError> = message.read(&schema);
+        let _ = add_job(&conn, job, message.path.as_path());
     }
 
+    // todo: select messages where valid = 1 and submitted = 0
+
 
     //let x = m.read();
     //println!("Valid serde? {:?}", x.is_ok());
diff --git a/src/request.rs b/src/request.rs
index 03761bf..755f29b 100644
--- a/src/request.rs
+++ b/src/request.rs
@@ -1,3 +1,4 @@
 pub mod read;
 pub mod message;
-pub mod schema;
\ No newline at end of file
+pub mod schema;
+pub mod job;
\ No newline at end of file
diff --git a/src/request/job.rs b/src/request/job.rs
new file mode 100644
index 0000000..7788920
--- /dev/null
+++ b/src/request/job.rs
@@ -0,0 +1,37 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct PipelineParam {
+    pub id: String,
+    target_genomes: Vec<TargetGenome>,
+    nxf_params_file: NxfParamsFile,
+    nxf_work: String,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+struct TargetGenome {
+    pvar: String,
+    pgen: String,
+    psam: String,
+    sampleset: String,
+    chrom: Option<String>,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+struct NxfParamsFile {
+    pgs_id: String,
+    format: String,
+    target_build: String,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct GlobusDetails {
+    guest_collection_id: String,
+    dir_path_on_guest_collection: String,
+}
+
+#[derive(Debug, Deserialize, Serialize)]
+pub struct JobRequest {
+    pub pipeline_param: PipelineParam,
+    globus_details: GlobusDetails,
+}
diff --git a/src/request/message.rs b/src/request/message.rs
index 9f87fed..6f82dbf 100644
--- a/src/request/message.rs
+++ b/src/request/message.rs
@@ -3,10 +3,10 @@ use std::path::{Path, PathBuf};
 
 use jsonschema::JSONSchema;
 use log::{info, warn};
-use serde::{Deserialize, Serialize};
 use serde_json::Value;
 use crate::request::read;
 use anyhow::Result;
+use crate::request::job::JobRequest;
 use crate::request::message::MessageError::JSONValidationError;
 
 #[derive(Debug)]
@@ -90,40 +90,3 @@ impl Message {
             .map_err(|_| MessageError::JSONDecodeError)
     }
 }
-
-
-#[derive(Debug, Deserialize, Serialize)]
-pub struct PipelineParam {
-    id: String,
-    target_genomes: Vec<TargetGenome>,
-    nxf_params_file: NxfParamsFile,
-    nxf_work: String,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-struct TargetGenome {
-    pvar: String,
-    pgen: String,
-    psam: String,
-    sampleset: String,
-    chrom: Option<String>,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-struct NxfParamsFile {
-    pgs_id: String,
-    format: String,
-    target_build: String,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-pub struct GlobusDetails {
-    guest_collection_id: String,
-    dir_path_on_guest_collection: String,
-}
-
-#[derive(Debug, Deserialize, Serialize)]
-pub struct JobRequest {
-    pub pipeline_param: PipelineParam,
-    pub globus_details: GlobusDetails,
-}
-- 
GitLab