use crate::config::Config; use anyhow::Error as AnyhowError; use dtos::*; use libangelshark::{AcmRunner, Message, ParallelIterator}; use log::debug; use std::convert::Infallible; use warp::{ body, hyper::{header, StatusCode}, path, post, reply::{self, with}, Filter, Rejection, Reply, }; mod dtos; pub mod extensions; /// GET / -> Name and version # of app. pub fn index() -> impl Filter + Clone { path::end().and_then(handle_version) } /// POST /ossi with JSON inputs -> JSON outputs pub fn ossi(config: &Config) -> impl Filter + Clone { let runner = config.runner.clone(); path("ossi") .and(post()) .and(warp::query::()) .and(json_body()) .and(with_runner(runner)) .and_then(handle_ossi) .with(with::header(header::PRAGMA, "no-cache")) .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) .with(with::header(header::X_FRAME_OPTIONS, "DENY")) } /// For passing runner to handlers. fn with_runner( runner: AcmRunner, ) -> impl Filter + Clone { warp::any().map(move || runner.clone()) } /// JSON request body filter with content length limit. fn json_body() -> impl Filter,), Error = Rejection> + Clone { body::content_length_limit(1024 * 16).and(body::json()) } /// Handle version requests. async fn handle_version() -> Result { Ok(reply::json(&Version { daemon_version: env!("CARGO_PKG_VERSION"), })) } /// Handle OSSI requests. async fn handle_ossi( query: Query, requests: Vec, mut runner: AcmRunner, ) -> Result { debug!("{:?}", query); debug!("{:?}", requests); // Queue request inputs on runner. for (job_name, input) in requests .into_iter() .map(|r| -> Vec<(String, Message)> { r.into() }) .flatten() { runner.queue_input(&job_name, &input); } // Collect runner results and convert to responses. let responses: Vec, AnyhowError>> = if query.no_cache.unwrap_or_default() { // Run without cache. runner .run() .map(|(name, output)| { let output: Vec = output? .into_iter() .filter_map(move |o| { if o.command == "logoff" { None } else { Some(Response::from((name.clone(), o))) } }) .collect(); Ok(output) }) .collect() } else { // Run with cache. runner .run_cached() .map(|(name, output)| { let output: Vec = output? .into_iter() .filter_map(move |o| { if o.command == "logoff" { None } else { Some(Response::from((name.clone(), o))) } }) .collect(); Ok(output) }) .collect() }; // Handle errors from runner. if query.panicky.unwrap_or_default() { // Return an internal error if anything went wrong. let responses: Result>, _> = responses.into_iter().collect(); match responses { Err(e) => Ok(reply::with_status( reply::json(&Error { reason: e.to_string(), }), StatusCode::INTERNAL_SERVER_ERROR, )), Ok(r) => Ok(reply::with_status( reply::json(&r.into_iter().flatten().collect::>()), StatusCode::OK, )), } } else { // Discard errors and return just good data. let responses: Vec = responses .into_iter() .filter_map(|r| r.ok()) .flatten() .collect(); Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) } }