summaryrefslogtreecommitdiff
path: root/angelsharkd/src/routes/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'angelsharkd/src/routes/mod.rs')
-rw-r--r--angelsharkd/src/routes/mod.rs137
1 files changed, 137 insertions, 0 deletions
diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs
new file mode 100644
index 0000000..40d024d
--- /dev/null
+++ b/angelsharkd/src/routes/mod.rs
@@ -0,0 +1,137 @@
+use crate::config::Config;
+use anyhow::Error as AnyhowError;
+use dtos::*;
+use libangelshark::{AcmRunner, Message, ParallelIterator};
+use log::debug;
+use std::convert::Infallible;
+use warp::{
+ body,
+ hyper::{header, StatusCode},
+ path, post,
+ reply::{self, with},
+ Filter, Rejection, Reply,
+};
+
+mod dtos;
+
+/// GET / -> Name and version # of app.
+pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ path::end().and_then(handle_version)
+}
+
+/// POST /ossi with JSON inputs -> JSON outputs
+pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ let runner = config.runner.clone();
+ path("ossi")
+ .and(post())
+ .and(warp::query::<Query>())
+ .and(json_body())
+ .and(with_runner(runner))
+ .and_then(handle_ossi)
+ .with(with::header(header::PRAGMA, "no-cache"))
+ .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0"))
+ .with(with::header(header::X_FRAME_OPTIONS, "DENY"))
+}
+
+/// For passing runner to handlers.
+fn with_runner(
+ runner: AcmRunner,
+) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone {
+ warp::any().map(move || runner.clone())
+}
+
+/// JSON request body filter with content length limit.
+fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone {
+ body::content_length_limit(1024 * 16).and(body::json())
+}
+
+/// Handle version requests.
+async fn handle_version() -> Result<impl Reply, Infallible> {
+ Ok(reply::json(&Version {
+ daemon_version: env!("CARGO_PKG_VERSION"),
+ }))
+}
+
+/// Handle OSSI requests.
+async fn handle_ossi(
+ query: Query,
+ requests: Vec<Request>,
+ mut runner: AcmRunner,
+) -> Result<impl Reply, Infallible> {
+ debug!("{:?}", query);
+ debug!("{:?}", requests);
+
+ // Queue request inputs on runner.
+ for (job_name, input) in requests
+ .into_iter()
+ .map(|r| -> Vec<(String, Message)> { r.into() })
+ .flatten()
+ {
+ runner.queue_input(&job_name, &input);
+ }
+
+ // Collect runner results and convert to responses.
+ let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() {
+ // Run without cache.
+ runner
+ .run()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ } else {
+ // Run with cache.
+ runner
+ .run_cached()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ };
+
+ // Handle errors from runner.
+ if query.panicky.unwrap_or_default() {
+ // Return an internal error if anything went wrong.
+ let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect();
+ match responses {
+ Err(e) => Ok(reply::with_status(
+ reply::json(&Error {
+ reason: e.to_string(),
+ }),
+ StatusCode::INTERNAL_SERVER_ERROR,
+ )),
+ Ok(r) => Ok(reply::with_status(
+ reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()),
+ StatusCode::OK,
+ )),
+ }
+ } else {
+ // Discard errors and return just good data.
+ let responses: Vec<Response> = responses
+ .into_iter()
+ .filter_map(|r| r.ok())
+ .flatten()
+ .collect();
+ Ok(reply::with_status(reply::json(&responses), StatusCode::OK))
+ }
+}