summaryrefslogtreecommitdiff
path: root/angelsharkd/src/routes
diff options
context:
space:
mode:
Diffstat (limited to 'angelsharkd/src/routes')
-rw-r--r--angelsharkd/src/routes/dtos.rs67
-rw-r--r--angelsharkd/src/routes/mod.rs137
2 files changed, 204 insertions, 0 deletions
diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs
new file mode 100644
index 0000000..fe3f184
--- /dev/null
+++ b/angelsharkd/src/routes/dtos.rs
@@ -0,0 +1,67 @@
+use libangelshark::Message;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Debug)]
+pub struct Error {
+ pub reason: String,
+}
+
+#[derive(Serialize, Debug)]
+pub struct Version {
+ pub daemon_version: &'static str,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Query {
+ pub no_cache: Option<bool>,
+ pub panicky: Option<bool>,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Request {
+ pub acms: Vec<String>,
+ pub command: String,
+ pub fields: Option<Vec<String>>,
+ pub datas: Option<Vec<Vec<String>>>,
+}
+
+impl From<Request> for Vec<(String, Message)> {
+ fn from(val: Request) -> Self {
+ val.acms
+ .iter()
+ .map(|name| {
+ (
+ name.to_owned(),
+ Message {
+ command: val.command.clone(),
+ fields: val.fields.clone(),
+ datas: val.datas.clone(),
+ error: None,
+ },
+ )
+ })
+ .collect()
+ }
+}
+
+#[derive(Debug, Serialize)]
+pub struct Response {
+ pub acm: String,
+ pub command: String,
+ pub error: String,
+ pub fields: Vec<String>,
+ pub datas: Vec<Vec<String>>,
+}
+
+impl From<(String, Message)> for Response {
+ fn from(msg: (String, Message)) -> Self {
+ let (acm, msg) = msg;
+ Self {
+ acm,
+ command: msg.command,
+ fields: msg.fields.unwrap_or_default(),
+ datas: msg.datas.unwrap_or_default(),
+ error: msg.error.unwrap_or_default(),
+ }
+ }
+}
diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs
new file mode 100644
index 0000000..40d024d
--- /dev/null
+++ b/angelsharkd/src/routes/mod.rs
@@ -0,0 +1,137 @@
+use crate::config::Config;
+use anyhow::Error as AnyhowError;
+use dtos::*;
+use libangelshark::{AcmRunner, Message, ParallelIterator};
+use log::debug;
+use std::convert::Infallible;
+use warp::{
+ body,
+ hyper::{header, StatusCode},
+ path, post,
+ reply::{self, with},
+ Filter, Rejection, Reply,
+};
+
+mod dtos;
+
+/// GET / -> Name and version # of app.
+pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ path::end().and_then(handle_version)
+}
+
+/// POST /ossi with JSON inputs -> JSON outputs
+pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ let runner = config.runner.clone();
+ path("ossi")
+ .and(post())
+ .and(warp::query::<Query>())
+ .and(json_body())
+ .and(with_runner(runner))
+ .and_then(handle_ossi)
+ .with(with::header(header::PRAGMA, "no-cache"))
+ .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0"))
+ .with(with::header(header::X_FRAME_OPTIONS, "DENY"))
+}
+
+/// For passing runner to handlers.
+fn with_runner(
+ runner: AcmRunner,
+) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone {
+ warp::any().map(move || runner.clone())
+}
+
+/// JSON request body filter with content length limit.
+fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone {
+ body::content_length_limit(1024 * 16).and(body::json())
+}
+
+/// Handle version requests.
+async fn handle_version() -> Result<impl Reply, Infallible> {
+ Ok(reply::json(&Version {
+ daemon_version: env!("CARGO_PKG_VERSION"),
+ }))
+}
+
+/// Handle OSSI requests.
+async fn handle_ossi(
+ query: Query,
+ requests: Vec<Request>,
+ mut runner: AcmRunner,
+) -> Result<impl Reply, Infallible> {
+ debug!("{:?}", query);
+ debug!("{:?}", requests);
+
+ // Queue request inputs on runner.
+ for (job_name, input) in requests
+ .into_iter()
+ .map(|r| -> Vec<(String, Message)> { r.into() })
+ .flatten()
+ {
+ runner.queue_input(&job_name, &input);
+ }
+
+ // Collect runner results and convert to responses.
+ let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() {
+ // Run without cache.
+ runner
+ .run()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ } else {
+ // Run with cache.
+ runner
+ .run_cached()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ };
+
+ // Handle errors from runner.
+ if query.panicky.unwrap_or_default() {
+ // Return an internal error if anything went wrong.
+ let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect();
+ match responses {
+ Err(e) => Ok(reply::with_status(
+ reply::json(&Error {
+ reason: e.to_string(),
+ }),
+ StatusCode::INTERNAL_SERVER_ERROR,
+ )),
+ Ok(r) => Ok(reply::with_status(
+ reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()),
+ StatusCode::OK,
+ )),
+ }
+ } else {
+ // Discard errors and return just good data.
+ let responses: Vec<Response> = responses
+ .into_iter()
+ .filter_map(|r| r.ok())
+ .flatten()
+ .collect();
+ Ok(reply::with_status(reply::json(&responses), StatusCode::OK))
+ }
+}