Skip to content
Snippets Groups Projects
Unverified Commit 55def08d authored by Benjamin Wingfield's avatar Benjamin Wingfield Committed by GitHub
Browse files

Merge pull request #1 from ebi-gdp/dev

v0.0.1 release
parents 9d6d4c6f b38bead8
No related branches found
No related tags found
No related merge requests found
Showing with 195 additions and 196 deletions
name: build development images
on:
push:
branches:
- dev
env:
REGISTRY_USERNAME: ${{ secrets.DOCKER_USERNAME }}
REGISTRY_TOKEN: ${{ secrets.DOCKER_TOKEN }}
DOCKER_REGISTRY_LOGIN: "dockerhub.ebi.ac.uk"
SINGULARITY_REGISTRY_LOGIN: "oras://dockerhub.ebi.ac.uk"
DOCKER_REGISTRY: "dockerhub.ebi.ac.uk/gdp-public/hattivatti"
SINGULARITY_REGISTRY: "oras://dockerhub.ebi.ac.uk/gdp-public/hattivatti/singularity"
SINGULARITY_VERSION: "3.8.3"
jobs:
build_docker_dev:
name: build docker development image
runs-on: ubuntu-latest
steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
with:
platforms: arm64
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
with:
registry: ${{ env.DOCKER_REGISTRY_LOGIN }}
username: ${{ env.REGISTRY_USERNAME }}
password: ${{ env.REGISTRY_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v3
with:
push: true
platforms: "linux/amd64,linux/arm64"
tags: ${{ env.DOCKER_REGISTRY }}/hattivatti:dev
build_singularity_dev:
needs: build_docker_dev
name: build singularity development image
runs-on: ubuntu-latest
steps:
- name: Check out pipeline code
uses: actions/checkout@v3
- name: Cache singularity setup
id: cache-singularity
uses: actions/cache@v3
with:
path: /opt/hostedtoolcache/singularity/${{ env.SINGULARITY_VERSION }}/x64
key: ${{ runner.os }}-singularity-${{ env.SINGULARITY_VERSION }}
- name: Set up Singularity
uses: eWaterCycle/setup-singularity@v7
if: steps.cache-singularity.outputs.cache-hit != 'true'
with:
singularity-version: ${{ env.SINGULARITY_VERSION }}
- name: Add singularity to path
if: steps.cache-singularity.outputs.cache-hit == 'true'
run: |
echo "/opt/hostedtoolcache/singularity/${{ env.SINGULARITY_VERSION }}/x64/bin" >> $GITHUB_PATH
- name: Build singularity image
run: singularity build img.sif "docker://${DOCKER_REGISTRY}/hattivatti:dev"
- name: Push singularity image
run: |
echo "$REGISTRY_TOKEN" | singularity remote login -u "$REGISTRY_USERNAME" --password-stdin "$SINGULARITY_REGISTRY_LOGIN"
singularity push img.sif "${SINGULARITY_REGISTRY}/hattivatti:dev"
......@@ -2,9 +2,13 @@ name: Rust
on:
push:
branches: [ "main" ]
branches:
- main
- dev
pull_request:
branches: [ "main" ]
branches:
- main
- dev
env:
CARGO_TERM_COLOR: always
......@@ -12,7 +16,8 @@ env:
jobs:
build:
runs-on: ubuntu-latest
runs-on: ubuntu-20.04
container: rust:buster
steps:
- uses: actions/checkout@v3
......
/target
/docs
.DS_Store
.idea
FROM --platform=linux/amd64 rust:bookworm AS build
WORKDIR /opt/build
COPY . ./
RUN cargo build --target x86_64-unknown-linux-gnu
FROM --platform=linux/amd64 debian:stable-slim
COPY --from=build /opt/build/target/x86_64-unknown-linux-gnu/debug/hattivatti /opt/
CMD ["/opt/hattivatti"]
\ No newline at end of file
# hattivatti
# `hattivatti`
> A Finnish [job submitter](https://github.com/ebi-gdp/jobsubmitter).
`hattivatti` submits [`pgsc_calc`](https://github.com/PGScatalog/pgsc_calc) jobs
to [Puhti HPC](https://docs.csc.fi/computing/systems-puhti/) at CSC. Jobs are
configured to execute in a secure way because genomes are sensitive
data. `hattivatti` is a proof of concept for testing sensitive data submission
to CSC.
## Run `hattivatti`
See [Releases](https://github.com/ebi-gdp/hattivatti/releases) for most recent
stable versions of `hattivatti`. The development version can be run with:
```
$ git clone https://github.com/ebi-gdp/hattivatti.git --branch dev
$ cargo run
```
## Documentation
```
$ cargo doc --open
```
## Deployment notes
Puhti is currently on RHEL 7 with an old version of glibc.
Github actions builds with rust-buster to match glibc version (2.28).
### Cronjob
cron shell doesn't load much:
```
$ # load 'module' command
$ source /appl/profile/zz-csc-env.sh
```
### Set environment variables
Sensitive variables:
```
$ export AWS_ACCESS_KEY_ID=<...>
$ export AWS_SECRET_ACCESS_KEY=<...>
$ export CALLBACK_TOKEN=<...>
```
Configuration variables:
```
$ export RUST_LOG=info
$ export NXF_SINGULARITY_CACHEDIR=<path>
```
### Configure `globus-file-handler-cli`
Download the jar and set path as a parameter (`--globus-jar-path`)
The spring config directory **must be in the same directory as the jar** with the name `config/`. The spring config contains secrets for the transfer.
### Clone pgsc_calc
```
$ cd /scratch/projec_XXXXXX/
$ nextflow clone https://github.com/PGScatalog/pgsc_calc.git
```
### Run hattivatti
```
$ hattivatti --schema-dir repo/data/schemas --work-dir work --globus-jar-path globus.jar
```
### Backup database (optional)
After hattivatti executes the database will have no connections.
```
$ module load allas
$ rclone copy work/hattivatti.db s3allas://bucket/hattivatti/hattivatti.db
```
### Dependencies
* `curl` (callback)
* `globus-file-handler-cli` (data transfer)
* `nextflow` (`pgsc_calc`)
* `java 16` (nextflow and data transfer)
* `parallel` (data transfer)
* `SLURM` (submitting jobs)
Note: `pgsc_calc` is executed using the `singularity` profile. All bioinformatics software is automatically fetched from a container registry. Images are cached after first fetch.
......@@ -5,5 +5,6 @@ CREATE TABLE IF NOT EXISTS job (
valid INTEGER,
staged INTEGER DEFAULT 0,
submitted INTEGER DEFAULT 0,
slurm_id TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
);
......@@ -39,11 +39,28 @@
"type": "string",
"description": "A globus collection ID",
"format": "uuid"
},
"files": {
"type": "array",
"items": {
"type": "object",
"properties": {
"filename": {
"type": "string"
},
"file_size": {
"type": "integer",
"minimum": 0
}
},
"required": ["filename", "file_size"]
}
}
},
"required": [
"dir_path_on_guest_collection",
"guest_collection_id"
"guest_collection_id",
"files"
]
}
},
......
File moved
......@@ -101,7 +101,8 @@
},
"required": [
"sampleset",
"chrom"
"chrom",
"pgen"
],
"dependentRequired": {
"pgen": [
......@@ -126,4 +127,4 @@
]
}
}
}
\ No newline at end of file
}
......@@ -7,15 +7,18 @@ callback_exit_handler() \{
# report the status of the job to the INTERVENE backend
exit_code=$?
# todo: don't change the url, change the message content
if [ $exit_code -eq 0 ]; then
url="http://example.com/good/{name}"
body='\{"status": "COMPLETED", "pipelineId": "{name}", "outdir": ""}'
else
url="http://example.com/bad/{name}"
body='\{"status": "FAILED", "pipelineId": "{name}", "outdir": ""}'
fi
# do a callback
curl -s "$url"
url="https://dev.intervenegeneticscores.org/pipeline-manager/pipeline/csc/notify"
# escaping json is a pain
echo $body > callback.txt
curl -X POST -H "Content-Type: application/json" -H "Authorization: Basic $CALLBACK_TOKEN" --data @callback.txt $url
exit $exit_code
}
......
......@@ -8,9 +8,5 @@ export NXF_EXECUTOR=local
export NXF_WORK=$TMPDIR/work
export NXF_HOME=$TMPDIR/home
export CAPSULE_LOG=none
# transfer.sh configuration
export GLOBUS_BASE_URL={globus_base_url}
export GLOBUS_GUEST_COLLECTION_ID={guest_collection_id}
export JOB_MESSAGE='{message}'
# ------------------------------------------------------------------------------
......@@ -4,16 +4,24 @@
# step 0. load dependencies
# (curl, jq, and singularity/apptainer are preloaded on Puhti)
module load nextflow/22.10.1
module load parallel
# step 1: transfer data in parallel
parallel --col-sep ' ' \
-a {work_dir}/{name}/transfer.txt \
java -Xmx256m \
-jar {globus_path} \
--spring.config.location={globus_parent_path}/config/ \
-s \{1} -d $TMPDIR --file_size \{2}
# the job _must be_ scheduled on a node with local storage
# it's cleaned up by the system after the job finishes
# can't use --chdir, $TMPDIR is set when the job starts
cd $TMPDIR
set_tmpdir() \{
cd $TMPDIR
}
# step 1: transfer data using HTTPS + curl
# (assume transfer.sh is on PATH)
# just dumps files into current working directory
transfer.sh
set_tmpdir
# step 2: run pgsc_calc and calculate some scores
nextflow run {pgsc_calc_dir} -profile singularity \
......@@ -30,11 +38,3 @@ nextflow run {pgsc_calc_dir} -profile singularity \
# (automatically executed in callback_exit_handler by trap)
# ------------------------------------------------------------------------------
cat << EOF
|\__/,| (`\
_.|o o |_ ) )
-(((---(((--------
EOF
#!/usr/bin/env bash
set -euxo pipefail
# A simple way to download a set of files from an INTERVENE globus collection
# using curl and HTTPS. Doesn't retry, resume, or do anything fancy.
# All configuration is done with environment variables.
# This script is kept here as a reference when deploying hattivatti to Puhti.
check_environment_variables () {
# GLOBUS_BASE_URL https://docs.globus.org/globus-connect-server/v5/https-access-collections/#determining_the_collection_https_base_url
# GLOBUS_SECRET_TOKEN: globus sdk client token
# GLOBUS_GUEST_COLLECTION_ID: uuid, from message
# JOB_MESSAGE: a validated JSON message sent by the backend
env_vars=("GLOBUS_BASE_URL" "GLOBUS_SECRET_TOKEN" "GLOBUS_GUEST_COLLECTION_ID" "JOB_MESSAGE")
for var in "${env_vars[@]}"; do
echo "Checking environment variable $var"
# print indirect reference, only errors correctly with set -u
echo "${!var} OK"
done
}
grab_access_token () {
GLOBUS_ACCESS_TOKEN=$(curl -s -X POST -H "Authorization: Basic $GLOBUS_SECRET_TOKEN" \
-d "scope=https://auth.globus.org/scopes/$GLOBUS_GUEST_COLLECTION_ID/https&grant_type=client_credentials" \
https://auth.globus.org/v2/oauth2/token \
| jq -r '.access_token')
}
download_files () {
GLOBUS_HEADER="\"Authorization: Bearer $GLOBUS_ACCESS_TOKEN\""
# step 1: extract a list of file names and extensions from the job message
# e.g. test.pgen
FILE_PATHS=$(mktemp)
jq -r '.pipeline_param.target_genomes[] | (.pgen, .pvar, .psam)' <(echo "$JOB_MESSAGE") > $FILE_PATHS
DIRECTORY=$(echo $JOB_MESSAGE | jq -r '.globus_details.dir_path_on_guest_collection + "/"')
CURL_CMD="curl -s -f -O -L -X GET \
-H "$GLOBUS_HEADER" \
--url "${GLOBUS_BASE_URL}/${DIRECTORY}/{PATH}" \
-w "%{json}" >> transfer_log.json"
DOWNLOAD_PATH=$(mktemp)
# step 2: print a list of curl commands to a temporary file
xargs -I {PATH} echo $CURL_CMD < $FILE_PATHS > $DOWNLOAD_PATH
# step 3: run the curl commands to stage the files locally over HTTPS
source $DOWNLOAD_PATH
}
main () {
# clean up old transfer logs before starting downloads
rm -f transfer_log.json
check_environment_variables
grab_access_token
download_files
# clean up temporary files when exiting the script
# mktemp is quite secure so not really needed, just being tidy
trap 'rm -f $FILE_PATHS $DOWNLOAD_PATH' EXIT
}
main
//! All job state is stored in a SQLite database
/// Connect to a SQLite database
pub mod open;
pub mod job;
/// Stream and validate job request messages
pub mod ingest;
\ No newline at end of file
......@@ -4,6 +4,10 @@ use rusqlite::Connection;
use crate::request::message::AllasMessage;
/// Load an AllasMessage into a database
///
/// The AllasMessage is stored in a JSON column and the schema will automatically extract the
/// INTERVENE ID and add an insertion timestamp
pub fn ingest_message(conn: &Connection, message: &AllasMessage) -> Result<()> {
info!("Adding {} to db", &message.key);
let json = &message.content;
......
//! Job loading, updating, and submission to SLURM scheduler
//!
//! Takes care of deserialising unsubmitted jobs from the database into a JobRequest.
//! Also responsible for updating the database once JobRequests are staged (rendered templates written
//! to disk) or submitted (sbatch system command run).
pub mod load;
pub mod update;
pub mod state;
\ No newline at end of file
......@@ -4,6 +4,7 @@ use serde_json::Result as JsonResult;
use crate::slurm::job_request::JobRequest;
/// Fetch and deserialise valid unsubmitted jobs from the database
pub fn get_valid_jobs(conn: &Connection, dry_run: bool) -> Option<Vec<JobRequest>> {
let mut stmt = conn.prepare("SELECT manifest FROM job WHERE valid == 1 AND staged == 0 AND submitted == 0").expect("");
let rows = stmt.query_map([], |row| row.get(0)).expect("");
......@@ -24,6 +25,7 @@ pub fn get_valid_jobs(conn: &Connection, dry_run: bool) -> Option<Vec<JobRequest
}
}
/// Deserialise validated JSON into a [JobRequest]
fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> {
let mut jobs: Vec<JobRequest> = Vec::new();
for string in json_strings {
......@@ -33,6 +35,7 @@ fn deserialise(json_strings: Vec<String>) -> JsonResult<Vec<JobRequest>> {
Ok(jobs)
}
/// Release (commit transaction) or rollback (abort transaction) messages to the database
fn release_or_rollback(conn: &Connection, dry_run: bool) {
match dry_run {
true => {
......
......@@ -3,8 +3,14 @@ pub enum JobState {
Submitted
}
/// A simple way to keep track of job state.
///
/// Currently only two states are supported: staged (rendered templates written to disk) and
/// submitted (after sbatch system command exits 0). Other job states could include things like
/// INITIALISED (request received) or PENDING (parsing squeue output) in the future.
impl JobState {
/// db columns are all lower case, enum used in sql statement
/// TODO: migrate to a single enum column called "state"
pub fn to_string(&self) -> &str {
match self {
JobState::Staged => "staged",
......
use std::path::Path;
use std::process::Command;
use log::info;
use rusqlite::Connection;
......@@ -16,8 +17,19 @@ impl JobRequest {
let job_id = self.run_sbatch(job);
info!("SLURM job id: {job_id}");
let state = JobState::Submitted;
// todo: store SLURM job id in table too
self.update(conn, state);
self.update_slurm(conn, job_id).expect("update OK");
}
fn update_slurm(&self, conn: &Connection, slurm_id: String) -> rusqlite::Result<()> {
let id = &self.pipeline_param.id.to_string();
info!("Updating {id} with slurm ID {slurm_id}");
conn
.execute("UPDATE job SET slurm_id = ? WHERE intervene_id = ?",
&[&slurm_id, &id])
.expect("Update");
Ok(())
}
fn update(&self, conn: &Connection, state: JobState) {
......@@ -34,16 +46,18 @@ impl JobRequest {
fn run_sbatch(&self, job_path: JobPath) -> String {
let wd = job_path.path.parent().unwrap();
let output = Command::new("sbatch")
.arg("--parsable")
.arg("--output")
.arg(wd)
.arg("--error")
.arg(wd)
.arg(job_path.path)
.output()
.expect("sbatch");
String::from_utf8(output.stdout).expect("job id")
let output_path = wd.join(Path::new("%j.out"));
let output_str = output_path.to_str().unwrap();
info!("Output path: {output_str}");
let job_script_path = job_path.path.to_str().unwrap();
let arguments = vec!["--parsable", "--output", output_str, job_script_path];
let mut sbatch = Command::new("sbatch");
let cmd = sbatch.args(&arguments);
info!("{:?}", &cmd);
let output = cmd.output().expect("failed to execute process").stdout;
String::from_utf8(output).expect("job id")
}
}
use log::info;
use crate::WorkingDirectory;
/// Open a connection to an existing database, or create a new one if it doesn't exist
pub fn open_db(wd: &WorkingDirectory) -> rusqlite::Result<rusqlite::Connection> {
let path = &wd.path.join("hattivatti.db");
if !path.exists() { info!("Creating new database {}", path.display()) }
let conn = rusqlite::Connection::open(&path)?;
/// A SQLite database schema that stores job status
static SCHEMA: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/db/schema.sql"));
conn.execute(SCHEMA, [], )?;
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment