From 2f67f82a64f4eabfbe758655099f48d8afa07fc3 Mon Sep 17 00:00:00 2001 From: "Carpenter, Adam (CORP)" Date: Tue, 12 Oct 2021 14:07:10 -0400 Subject: feat: init upload --- angelsharkd/src/routes/dtos.rs | 67 ++++++++++++++++++++ angelsharkd/src/routes/mod.rs | 137 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 angelsharkd/src/routes/dtos.rs create mode 100644 angelsharkd/src/routes/mod.rs (limited to 'angelsharkd/src/routes') diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs new file mode 100644 index 0000000..fe3f184 --- /dev/null +++ b/angelsharkd/src/routes/dtos.rs @@ -0,0 +1,67 @@ +use libangelshark::Message; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct Error { + pub reason: String, +} + +#[derive(Serialize, Debug)] +pub struct Version { + pub daemon_version: &'static str, +} + +#[derive(Deserialize, Debug)] +pub struct Query { + pub no_cache: Option, + pub panicky: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Request { + pub acms: Vec, + pub command: String, + pub fields: Option>, + pub datas: Option>>, +} + +impl From for Vec<(String, Message)> { + fn from(val: Request) -> Self { + val.acms + .iter() + .map(|name| { + ( + name.to_owned(), + Message { + command: val.command.clone(), + fields: val.fields.clone(), + datas: val.datas.clone(), + error: None, + }, + ) + }) + .collect() + } +} + +#[derive(Debug, Serialize)] +pub struct Response { + pub acm: String, + pub command: String, + pub error: String, + pub fields: Vec, + pub datas: Vec>, +} + +impl From<(String, Message)> for Response { + fn from(msg: (String, Message)) -> Self { + let (acm, msg) = msg; + Self { + acm, + command: msg.command, + fields: msg.fields.unwrap_or_default(), + datas: msg.datas.unwrap_or_default(), + error: msg.error.unwrap_or_default(), + } + } +} diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs new file mode 100644 index 0000000..40d024d --- /dev/null +++ b/angelsharkd/src/routes/mod.rs @@ -0,0 +1,137 @@ +use crate::config::Config; +use anyhow::Error as AnyhowError; +use dtos::*; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::debug; +use std::convert::Infallible; +use warp::{ + body, + hyper::{header, StatusCode}, + path, post, + reply::{self, with}, + Filter, Rejection, Reply, +}; + +mod dtos; + +/// GET / -> Name and version # of app. +pub fn index() -> impl Filter + Clone { + path::end().and_then(handle_version) +} + +/// POST /ossi with JSON inputs -> JSON outputs +pub fn ossi(config: &Config) -> impl Filter + Clone { + let runner = config.runner.clone(); + path("ossi") + .and(post()) + .and(warp::query::()) + .and(json_body()) + .and(with_runner(runner)) + .and_then(handle_ossi) + .with(with::header(header::PRAGMA, "no-cache")) + .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) + .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// For passing runner to handlers. +fn with_runner( + runner: AcmRunner, +) -> impl Filter + Clone { + warp::any().map(move || runner.clone()) +} + +/// JSON request body filter with content length limit. +fn json_body() -> impl Filter,), Error = Rejection> + Clone { + body::content_length_limit(1024 * 16).and(body::json()) +} + +/// Handle version requests. +async fn handle_version() -> Result { + Ok(reply::json(&Version { + daemon_version: env!("CARGO_PKG_VERSION"), + })) +} + +/// Handle OSSI requests. +async fn handle_ossi( + query: Query, + requests: Vec, + mut runner: AcmRunner, +) -> Result { + debug!("{:?}", query); + debug!("{:?}", requests); + + // Queue request inputs on runner. + for (job_name, input) in requests + .into_iter() + .map(|r| -> Vec<(String, Message)> { r.into() }) + .flatten() + { + runner.queue_input(&job_name, &input); + } + + // Collect runner results and convert to responses. + let responses: Vec, AnyhowError>> = if query.no_cache.unwrap_or_default() { + // Run without cache. + runner + .run() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + } else { + // Run with cache. + runner + .run_cached() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + }; + + // Handle errors from runner. + if query.panicky.unwrap_or_default() { + // Return an internal error if anything went wrong. + let responses: Result>, _> = responses.into_iter().collect(); + match responses { + Err(e) => Ok(reply::with_status( + reply::json(&Error { + reason: e.to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )), + Ok(r) => Ok(reply::with_status( + reply::json(&r.into_iter().flatten().collect::>()), + StatusCode::OK, + )), + } + } else { + // Discard errors and return just good data. + let responses: Vec = responses + .into_iter() + .filter_map(|r| r.ok()) + .flatten() + .collect(); + Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) + } +} -- cgit v1.2.3