From 2f67f82a64f4eabfbe758655099f48d8afa07fc3 Mon Sep 17 00:00:00 2001 From: "Carpenter, Adam (CORP)" Date: Tue, 12 Oct 2021 14:07:10 -0400 Subject: feat: init upload --- angelsharkd/Cargo.toml | 30 +++++++++ angelsharkd/src/config.rs | 49 +++++++++++++++ angelsharkd/src/main.rs | 59 ++++++++++++++++++ angelsharkd/src/routes/dtos.rs | 67 ++++++++++++++++++++ angelsharkd/src/routes/mod.rs | 137 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 342 insertions(+) create mode 100644 angelsharkd/Cargo.toml create mode 100644 angelsharkd/src/config.rs create mode 100644 angelsharkd/src/main.rs create mode 100644 angelsharkd/src/routes/dtos.rs create mode 100644 angelsharkd/src/routes/mod.rs (limited to 'angelsharkd') diff --git a/angelsharkd/Cargo.toml b/angelsharkd/Cargo.toml new file mode 100644 index 0000000..9a5c750 --- /dev/null +++ b/angelsharkd/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "angelsharkd" +version = "0.1.0" +edition = "2018" +authors = ["Adam T. Carpenter "] +description = "A HTTP interface into one or more Communication Managers" + +[dependencies.libangelshark] +path = "../libangelshark" + +[dependencies.tokio] +version = "1" +features = ["macros", "rt-multi-thread", "signal"] + +[dependencies.warp] +version = "0.3" +default-features = false + +[dependencies.log] +version = "0.4" + +[dependencies.env_logger] +version = "0.8" + +[dependencies.serde] +version = "1" +features = ["derive"] + +[dependencies.anyhow] +version = "1" diff --git a/angelsharkd/src/config.rs b/angelsharkd/src/config.rs new file mode 100644 index 0000000..b0866be --- /dev/null +++ b/angelsharkd/src/config.rs @@ -0,0 +1,49 @@ +use anyhow::{Context, Result}; +use libangelshark::{Acm, AcmRunner}; +use std::{ + env, + fs::File, + net::{Ipv4Addr, SocketAddrV4}, +}; + +#[derive(Clone)] +pub struct Config { + pub bind_addr: SocketAddrV4, + pub debug_mode: bool, + pub runner: AcmRunner, + pub origin: String, +} + +impl Config { + pub fn init() -> Result { + let debug_mode = cfg!(debug_assertions) || env::var_os("ANGELSHARKD_DEBUG").is_some(); + + let bind_addr: SocketAddrV4 = env::var("ANGELSHARKD_ADDR") + .map(|addr| addr.parse()) + .unwrap_or_else(|_| Ok(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))) + .with_context(|| "Failed to parse socket bind address.")?; + + let origin = env::var("ANGELSHARKD_ORIGIN").unwrap_or_default(); + + let logins = if let Ok(path) = env::var("ANGELSHARKD_LOGINS") { + File::open(path) + } else { + File::open("./asa.cfg") + } + .with_context(|| "Failed to open logins file.")?; + + let mut runner = AcmRunner::default(); + for (job_name, acm) in + Acm::from_logins(logins).with_context(|| "Failed to parse logins.")? + { + runner.register_acm(&job_name, acm); + } + + Ok(Self { + bind_addr, + origin, + debug_mode, + runner, + }) + } +} diff --git a/angelsharkd/src/main.rs b/angelsharkd/src/main.rs new file mode 100644 index 0000000..22e4161 --- /dev/null +++ b/angelsharkd/src/main.rs @@ -0,0 +1,59 @@ +use crate::config::Config; +use anyhow::{Context, Result}; +use log::{debug, error, info, LevelFilter}; +use tokio::{signal, task}; +use warp::{hyper::Method, Filter}; + +mod config; +mod routes; + +#[tokio::main] +async fn main() -> Result<()> { + // Init config. + let config = Config::init().with_context(|| "Failed to initialize config.")?; + + // Init logging. + env_logger::Builder::new() + .filter( + None, + if config.debug_mode { + LevelFilter::Debug + } else { + LevelFilter::Info + }, + ) + .init(); + + if config.debug_mode { + debug!("**** DEBUGGING MODE ENABLED ****"); + } + + let routes = routes::index() + .or(routes::ossi(&config)) + .with(if config.debug_mode { + warp::cors() + .allow_any_origin() + .allow_methods(&[Method::GET, Method::POST]) + } else { + warp::cors() + .allow_origin(config.origin.as_str()) + .allow_methods(&[Method::GET, Method::POST]) + }) + .with(warp::log("angelsharkd")); + + // Create server with shutdown signal. + let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(config.bind_addr, async { + signal::ctrl_c() + .await + .expect("Failed to install CTRL+C signal handler."); + }); + + // Run server to completion. + info!("Starting server on {} ...", addr); + if let Err(e) = task::spawn(server).await { + error!("Server died unexpectedly: {}", e.to_string()); + } + info!("Stopping server..."); + + Ok(()) +} diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs new file mode 100644 index 0000000..fe3f184 --- /dev/null +++ b/angelsharkd/src/routes/dtos.rs @@ -0,0 +1,67 @@ +use libangelshark::Message; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct Error { + pub reason: String, +} + +#[derive(Serialize, Debug)] +pub struct Version { + pub daemon_version: &'static str, +} + +#[derive(Deserialize, Debug)] +pub struct Query { + pub no_cache: Option, + pub panicky: Option, +} + +#[derive(Deserialize, Debug)] +pub struct Request { + pub acms: Vec, + pub command: String, + pub fields: Option>, + pub datas: Option>>, +} + +impl From for Vec<(String, Message)> { + fn from(val: Request) -> Self { + val.acms + .iter() + .map(|name| { + ( + name.to_owned(), + Message { + command: val.command.clone(), + fields: val.fields.clone(), + datas: val.datas.clone(), + error: None, + }, + ) + }) + .collect() + } +} + +#[derive(Debug, Serialize)] +pub struct Response { + pub acm: String, + pub command: String, + pub error: String, + pub fields: Vec, + pub datas: Vec>, +} + +impl From<(String, Message)> for Response { + fn from(msg: (String, Message)) -> Self { + let (acm, msg) = msg; + Self { + acm, + command: msg.command, + fields: msg.fields.unwrap_or_default(), + datas: msg.datas.unwrap_or_default(), + error: msg.error.unwrap_or_default(), + } + } +} diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs new file mode 100644 index 0000000..40d024d --- /dev/null +++ b/angelsharkd/src/routes/mod.rs @@ -0,0 +1,137 @@ +use crate::config::Config; +use anyhow::Error as AnyhowError; +use dtos::*; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::debug; +use std::convert::Infallible; +use warp::{ + body, + hyper::{header, StatusCode}, + path, post, + reply::{self, with}, + Filter, Rejection, Reply, +}; + +mod dtos; + +/// GET / -> Name and version # of app. +pub fn index() -> impl Filter + Clone { + path::end().and_then(handle_version) +} + +/// POST /ossi with JSON inputs -> JSON outputs +pub fn ossi(config: &Config) -> impl Filter + Clone { + let runner = config.runner.clone(); + path("ossi") + .and(post()) + .and(warp::query::()) + .and(json_body()) + .and(with_runner(runner)) + .and_then(handle_ossi) + .with(with::header(header::PRAGMA, "no-cache")) + .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) + .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// For passing runner to handlers. +fn with_runner( + runner: AcmRunner, +) -> impl Filter + Clone { + warp::any().map(move || runner.clone()) +} + +/// JSON request body filter with content length limit. +fn json_body() -> impl Filter,), Error = Rejection> + Clone { + body::content_length_limit(1024 * 16).and(body::json()) +} + +/// Handle version requests. +async fn handle_version() -> Result { + Ok(reply::json(&Version { + daemon_version: env!("CARGO_PKG_VERSION"), + })) +} + +/// Handle OSSI requests. +async fn handle_ossi( + query: Query, + requests: Vec, + mut runner: AcmRunner, +) -> Result { + debug!("{:?}", query); + debug!("{:?}", requests); + + // Queue request inputs on runner. + for (job_name, input) in requests + .into_iter() + .map(|r| -> Vec<(String, Message)> { r.into() }) + .flatten() + { + runner.queue_input(&job_name, &input); + } + + // Collect runner results and convert to responses. + let responses: Vec, AnyhowError>> = if query.no_cache.unwrap_or_default() { + // Run without cache. + runner + .run() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + } else { + // Run with cache. + runner + .run_cached() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + }; + + // Handle errors from runner. + if query.panicky.unwrap_or_default() { + // Return an internal error if anything went wrong. + let responses: Result>, _> = responses.into_iter().collect(); + match responses { + Err(e) => Ok(reply::with_status( + reply::json(&Error { + reason: e.to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )), + Ok(r) => Ok(reply::with_status( + reply::json(&r.into_iter().flatten().collect::>()), + StatusCode::OK, + )), + } + } else { + // Discard errors and return just good data. + let responses: Vec = responses + .into_iter() + .filter_map(|r| r.ok()) + .flatten() + .collect(); + Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) + } +} -- cgit v1.2.3