diff options
Diffstat (limited to 'angelsharkd/src/routes/mod.rs')
-rw-r--r-- | angelsharkd/src/routes/mod.rs | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs new file mode 100644 index 0000000..40d024d --- /dev/null +++ b/angelsharkd/src/routes/mod.rs @@ -0,0 +1,137 @@ +use crate::config::Config; +use anyhow::Error as AnyhowError; +use dtos::*; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::debug; +use std::convert::Infallible; +use warp::{ + body, + hyper::{header, StatusCode}, + path, post, + reply::{self, with}, + Filter, Rejection, Reply, +}; + +mod dtos; + +/// GET / -> Name and version # of app. +pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { + path::end().and_then(handle_version) +} + +/// POST /ossi with JSON inputs -> JSON outputs +pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { + let runner = config.runner.clone(); + path("ossi") + .and(post()) + .and(warp::query::<Query>()) + .and(json_body()) + .and(with_runner(runner)) + .and_then(handle_ossi) + .with(with::header(header::PRAGMA, "no-cache")) + .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) + .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// For passing runner to handlers. +fn with_runner( + runner: AcmRunner, +) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone { + warp::any().map(move || runner.clone()) +} + +/// JSON request body filter with content length limit. +fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone { + body::content_length_limit(1024 * 16).and(body::json()) +} + +/// Handle version requests. +async fn handle_version() -> Result<impl Reply, Infallible> { + Ok(reply::json(&Version { + daemon_version: env!("CARGO_PKG_VERSION"), + })) +} + +/// Handle OSSI requests. +async fn handle_ossi( + query: Query, + requests: Vec<Request>, + mut runner: AcmRunner, +) -> Result<impl Reply, Infallible> { + debug!("{:?}", query); + debug!("{:?}", requests); + + // Queue request inputs on runner. + for (job_name, input) in requests + .into_iter() + .map(|r| -> Vec<(String, Message)> { r.into() }) + .flatten() + { + runner.queue_input(&job_name, &input); + } + + // Collect runner results and convert to responses. + let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() { + // Run without cache. + runner + .run() + .map(|(name, output)| { + let output: Vec<Response> = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + } else { + // Run with cache. + runner + .run_cached() + .map(|(name, output)| { + let output: Vec<Response> = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + }; + + // Handle errors from runner. + if query.panicky.unwrap_or_default() { + // Return an internal error if anything went wrong. + let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect(); + match responses { + Err(e) => Ok(reply::with_status( + reply::json(&Error { + reason: e.to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )), + Ok(r) => Ok(reply::with_status( + reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()), + StatusCode::OK, + )), + } + } else { + // Discard errors and return just good data. + let responses: Vec<Response> = responses + .into_iter() + .filter_map(|r| r.ok()) + .flatten() + .collect(); + Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) + } +} |