summaryrefslogtreecommitdiff
path: root/angelsharkd
diff options
context:
space:
mode:
authorCarpenter, Adam (CORP) <Adam.Carpenter@adp.com>2021-10-12 14:07:10 -0400
committerCarpenter, Adam (CORP) <Adam.Carpenter@adp.com>2021-10-12 14:07:10 -0400
commit2f67f82a64f4eabfbe758655099f48d8afa07fc3 (patch)
tree8aaad4e02f09635930b13db9851b3696ac345343 /angelsharkd
parent154367ad2f598b99ca9ccdec175eddcdfa09ef76 (diff)
downloadaltruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.tar.xz
altruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.zip
feat: init upload
Diffstat (limited to 'angelsharkd')
-rw-r--r--angelsharkd/Cargo.toml30
-rw-r--r--angelsharkd/src/config.rs49
-rw-r--r--angelsharkd/src/main.rs59
-rw-r--r--angelsharkd/src/routes/dtos.rs67
-rw-r--r--angelsharkd/src/routes/mod.rs137
5 files changed, 342 insertions, 0 deletions
diff --git a/angelsharkd/Cargo.toml b/angelsharkd/Cargo.toml
new file mode 100644
index 0000000..9a5c750
--- /dev/null
+++ b/angelsharkd/Cargo.toml
@@ -0,0 +1,30 @@
+[package]
+name = "angelsharkd"
+version = "0.1.0"
+edition = "2018"
+authors = ["Adam T. Carpenter <adam.carpenter@adp.com>"]
+description = "A HTTP interface into one or more Communication Managers"
+
+[dependencies.libangelshark]
+path = "../libangelshark"
+
+[dependencies.tokio]
+version = "1"
+features = ["macros", "rt-multi-thread", "signal"]
+
+[dependencies.warp]
+version = "0.3"
+default-features = false
+
+[dependencies.log]
+version = "0.4"
+
+[dependencies.env_logger]
+version = "0.8"
+
+[dependencies.serde]
+version = "1"
+features = ["derive"]
+
+[dependencies.anyhow]
+version = "1"
diff --git a/angelsharkd/src/config.rs b/angelsharkd/src/config.rs
new file mode 100644
index 0000000..b0866be
--- /dev/null
+++ b/angelsharkd/src/config.rs
@@ -0,0 +1,49 @@
+use anyhow::{Context, Result};
+use libangelshark::{Acm, AcmRunner};
+use std::{
+ env,
+ fs::File,
+ net::{Ipv4Addr, SocketAddrV4},
+};
+
+#[derive(Clone)]
+pub struct Config {
+ pub bind_addr: SocketAddrV4,
+ pub debug_mode: bool,
+ pub runner: AcmRunner,
+ pub origin: String,
+}
+
+impl Config {
+ pub fn init() -> Result<Self> {
+ let debug_mode = cfg!(debug_assertions) || env::var_os("ANGELSHARKD_DEBUG").is_some();
+
+ let bind_addr: SocketAddrV4 = env::var("ANGELSHARKD_ADDR")
+ .map(|addr| addr.parse())
+ .unwrap_or_else(|_| Ok(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080)))
+ .with_context(|| "Failed to parse socket bind address.")?;
+
+ let origin = env::var("ANGELSHARKD_ORIGIN").unwrap_or_default();
+
+ let logins = if let Ok(path) = env::var("ANGELSHARKD_LOGINS") {
+ File::open(path)
+ } else {
+ File::open("./asa.cfg")
+ }
+ .with_context(|| "Failed to open logins file.")?;
+
+ let mut runner = AcmRunner::default();
+ for (job_name, acm) in
+ Acm::from_logins(logins).with_context(|| "Failed to parse logins.")?
+ {
+ runner.register_acm(&job_name, acm);
+ }
+
+ Ok(Self {
+ bind_addr,
+ origin,
+ debug_mode,
+ runner,
+ })
+ }
+}
diff --git a/angelsharkd/src/main.rs b/angelsharkd/src/main.rs
new file mode 100644
index 0000000..22e4161
--- /dev/null
+++ b/angelsharkd/src/main.rs
@@ -0,0 +1,59 @@
+use crate::config::Config;
+use anyhow::{Context, Result};
+use log::{debug, error, info, LevelFilter};
+use tokio::{signal, task};
+use warp::{hyper::Method, Filter};
+
+mod config;
+mod routes;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // Init config.
+ let config = Config::init().with_context(|| "Failed to initialize config.")?;
+
+ // Init logging.
+ env_logger::Builder::new()
+ .filter(
+ None,
+ if config.debug_mode {
+ LevelFilter::Debug
+ } else {
+ LevelFilter::Info
+ },
+ )
+ .init();
+
+ if config.debug_mode {
+ debug!("**** DEBUGGING MODE ENABLED ****");
+ }
+
+ let routes = routes::index()
+ .or(routes::ossi(&config))
+ .with(if config.debug_mode {
+ warp::cors()
+ .allow_any_origin()
+ .allow_methods(&[Method::GET, Method::POST])
+ } else {
+ warp::cors()
+ .allow_origin(config.origin.as_str())
+ .allow_methods(&[Method::GET, Method::POST])
+ })
+ .with(warp::log("angelsharkd"));
+
+ // Create server with shutdown signal.
+ let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(config.bind_addr, async {
+ signal::ctrl_c()
+ .await
+ .expect("Failed to install CTRL+C signal handler.");
+ });
+
+ // Run server to completion.
+ info!("Starting server on {} ...", addr);
+ if let Err(e) = task::spawn(server).await {
+ error!("Server died unexpectedly: {}", e.to_string());
+ }
+ info!("Stopping server...");
+
+ Ok(())
+}
diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs
new file mode 100644
index 0000000..fe3f184
--- /dev/null
+++ b/angelsharkd/src/routes/dtos.rs
@@ -0,0 +1,67 @@
+use libangelshark::Message;
+use serde::{Deserialize, Serialize};
+
+#[derive(Serialize, Debug)]
+pub struct Error {
+ pub reason: String,
+}
+
+#[derive(Serialize, Debug)]
+pub struct Version {
+ pub daemon_version: &'static str,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Query {
+ pub no_cache: Option<bool>,
+ pub panicky: Option<bool>,
+}
+
+#[derive(Deserialize, Debug)]
+pub struct Request {
+ pub acms: Vec<String>,
+ pub command: String,
+ pub fields: Option<Vec<String>>,
+ pub datas: Option<Vec<Vec<String>>>,
+}
+
+impl From<Request> for Vec<(String, Message)> {
+ fn from(val: Request) -> Self {
+ val.acms
+ .iter()
+ .map(|name| {
+ (
+ name.to_owned(),
+ Message {
+ command: val.command.clone(),
+ fields: val.fields.clone(),
+ datas: val.datas.clone(),
+ error: None,
+ },
+ )
+ })
+ .collect()
+ }
+}
+
+#[derive(Debug, Serialize)]
+pub struct Response {
+ pub acm: String,
+ pub command: String,
+ pub error: String,
+ pub fields: Vec<String>,
+ pub datas: Vec<Vec<String>>,
+}
+
+impl From<(String, Message)> for Response {
+ fn from(msg: (String, Message)) -> Self {
+ let (acm, msg) = msg;
+ Self {
+ acm,
+ command: msg.command,
+ fields: msg.fields.unwrap_or_default(),
+ datas: msg.datas.unwrap_or_default(),
+ error: msg.error.unwrap_or_default(),
+ }
+ }
+}
diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs
new file mode 100644
index 0000000..40d024d
--- /dev/null
+++ b/angelsharkd/src/routes/mod.rs
@@ -0,0 +1,137 @@
+use crate::config::Config;
+use anyhow::Error as AnyhowError;
+use dtos::*;
+use libangelshark::{AcmRunner, Message, ParallelIterator};
+use log::debug;
+use std::convert::Infallible;
+use warp::{
+ body,
+ hyper::{header, StatusCode},
+ path, post,
+ reply::{self, with},
+ Filter, Rejection, Reply,
+};
+
+mod dtos;
+
+/// GET / -> Name and version # of app.
+pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ path::end().and_then(handle_version)
+}
+
+/// POST /ossi with JSON inputs -> JSON outputs
+pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone {
+ let runner = config.runner.clone();
+ path("ossi")
+ .and(post())
+ .and(warp::query::<Query>())
+ .and(json_body())
+ .and(with_runner(runner))
+ .and_then(handle_ossi)
+ .with(with::header(header::PRAGMA, "no-cache"))
+ .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0"))
+ .with(with::header(header::X_FRAME_OPTIONS, "DENY"))
+}
+
+/// For passing runner to handlers.
+fn with_runner(
+ runner: AcmRunner,
+) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone {
+ warp::any().map(move || runner.clone())
+}
+
+/// JSON request body filter with content length limit.
+fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone {
+ body::content_length_limit(1024 * 16).and(body::json())
+}
+
+/// Handle version requests.
+async fn handle_version() -> Result<impl Reply, Infallible> {
+ Ok(reply::json(&Version {
+ daemon_version: env!("CARGO_PKG_VERSION"),
+ }))
+}
+
+/// Handle OSSI requests.
+async fn handle_ossi(
+ query: Query,
+ requests: Vec<Request>,
+ mut runner: AcmRunner,
+) -> Result<impl Reply, Infallible> {
+ debug!("{:?}", query);
+ debug!("{:?}", requests);
+
+ // Queue request inputs on runner.
+ for (job_name, input) in requests
+ .into_iter()
+ .map(|r| -> Vec<(String, Message)> { r.into() })
+ .flatten()
+ {
+ runner.queue_input(&job_name, &input);
+ }
+
+ // Collect runner results and convert to responses.
+ let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() {
+ // Run without cache.
+ runner
+ .run()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ } else {
+ // Run with cache.
+ runner
+ .run_cached()
+ .map(|(name, output)| {
+ let output: Vec<Response> = output?
+ .into_iter()
+ .filter_map(move |o| {
+ if o.command == "logoff" {
+ None
+ } else {
+ Some(Response::from((name.clone(), o)))
+ }
+ })
+ .collect();
+ Ok(output)
+ })
+ .collect()
+ };
+
+ // Handle errors from runner.
+ if query.panicky.unwrap_or_default() {
+ // Return an internal error if anything went wrong.
+ let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect();
+ match responses {
+ Err(e) => Ok(reply::with_status(
+ reply::json(&Error {
+ reason: e.to_string(),
+ }),
+ StatusCode::INTERNAL_SERVER_ERROR,
+ )),
+ Ok(r) => Ok(reply::with_status(
+ reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()),
+ StatusCode::OK,
+ )),
+ }
+ } else {
+ // Discard errors and return just good data.
+ let responses: Vec<Response> = responses
+ .into_iter()
+ .filter_map(|r| r.ok())
+ .flatten()
+ .collect();
+ Ok(reply::with_status(reply::json(&responses), StatusCode::OK))
+ }
+}