diff options
Diffstat (limited to 'angelsharkd/src')
| -rw-r--r-- | angelsharkd/src/config.rs | 49 | ||||
| -rw-r--r-- | angelsharkd/src/main.rs | 59 | ||||
| -rw-r--r-- | angelsharkd/src/routes/dtos.rs | 67 | ||||
| -rw-r--r-- | angelsharkd/src/routes/mod.rs | 137 | 
4 files changed, 312 insertions, 0 deletions
| diff --git a/angelsharkd/src/config.rs b/angelsharkd/src/config.rs new file mode 100644 index 0000000..b0866be --- /dev/null +++ b/angelsharkd/src/config.rs @@ -0,0 +1,49 @@ +use anyhow::{Context, Result}; +use libangelshark::{Acm, AcmRunner}; +use std::{ +    env, +    fs::File, +    net::{Ipv4Addr, SocketAddrV4}, +}; + +#[derive(Clone)] +pub struct Config { +    pub bind_addr: SocketAddrV4, +    pub debug_mode: bool, +    pub runner: AcmRunner, +    pub origin: String, +} + +impl Config { +    pub fn init() -> Result<Self> { +        let debug_mode = cfg!(debug_assertions) || env::var_os("ANGELSHARKD_DEBUG").is_some(); + +        let bind_addr: SocketAddrV4 = env::var("ANGELSHARKD_ADDR") +            .map(|addr| addr.parse()) +            .unwrap_or_else(|_| Ok(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))) +            .with_context(|| "Failed to parse socket bind address.")?; + +        let origin = env::var("ANGELSHARKD_ORIGIN").unwrap_or_default(); + +        let logins = if let Ok(path) = env::var("ANGELSHARKD_LOGINS") { +            File::open(path) +        } else { +            File::open("./asa.cfg") +        } +        .with_context(|| "Failed to open logins file.")?; + +        let mut runner = AcmRunner::default(); +        for (job_name, acm) in +            Acm::from_logins(logins).with_context(|| "Failed to parse logins.")? +        { +            runner.register_acm(&job_name, acm); +        } + +        Ok(Self { +            bind_addr, +            origin, +            debug_mode, +            runner, +        }) +    } +} diff --git a/angelsharkd/src/main.rs b/angelsharkd/src/main.rs new file mode 100644 index 0000000..22e4161 --- /dev/null +++ b/angelsharkd/src/main.rs @@ -0,0 +1,59 @@ +use crate::config::Config; +use anyhow::{Context, Result}; +use log::{debug, error, info, LevelFilter}; +use tokio::{signal, task}; +use warp::{hyper::Method, Filter}; + +mod config; +mod routes; + +#[tokio::main] +async fn main() -> Result<()> { +    // Init config. +    let config = Config::init().with_context(|| "Failed to initialize config.")?; + +    // Init logging. +    env_logger::Builder::new() +        .filter( +            None, +            if config.debug_mode { +                LevelFilter::Debug +            } else { +                LevelFilter::Info +            }, +        ) +        .init(); + +    if config.debug_mode { +        debug!("**** DEBUGGING MODE ENABLED ****"); +    } + +    let routes = routes::index() +        .or(routes::ossi(&config)) +        .with(if config.debug_mode { +            warp::cors() +                .allow_any_origin() +                .allow_methods(&[Method::GET, Method::POST]) +        } else { +            warp::cors() +                .allow_origin(config.origin.as_str()) +                .allow_methods(&[Method::GET, Method::POST]) +        }) +        .with(warp::log("angelsharkd")); + +    // Create server with shutdown signal. +    let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(config.bind_addr, async { +        signal::ctrl_c() +            .await +            .expect("Failed to install CTRL+C signal handler."); +    }); + +    // Run server to completion. +    info!("Starting server on {} ...", addr); +    if let Err(e) = task::spawn(server).await { +        error!("Server died unexpectedly: {}", e.to_string()); +    } +    info!("Stopping server..."); + +    Ok(()) +} diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs new file mode 100644 index 0000000..fe3f184 --- /dev/null +++ b/angelsharkd/src/routes/dtos.rs @@ -0,0 +1,67 @@ +use libangelshark::Message; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct Error { +    pub reason: String, +} + +#[derive(Serialize, Debug)] +pub struct Version { +    pub daemon_version: &'static str, +} + +#[derive(Deserialize, Debug)] +pub struct Query { +    pub no_cache: Option<bool>, +    pub panicky: Option<bool>, +} + +#[derive(Deserialize, Debug)] +pub struct Request { +    pub acms: Vec<String>, +    pub command: String, +    pub fields: Option<Vec<String>>, +    pub datas: Option<Vec<Vec<String>>>, +} + +impl From<Request> for Vec<(String, Message)> { +    fn from(val: Request) -> Self { +        val.acms +            .iter() +            .map(|name| { +                ( +                    name.to_owned(), +                    Message { +                        command: val.command.clone(), +                        fields: val.fields.clone(), +                        datas: val.datas.clone(), +                        error: None, +                    }, +                ) +            }) +            .collect() +    } +} + +#[derive(Debug, Serialize)] +pub struct Response { +    pub acm: String, +    pub command: String, +    pub error: String, +    pub fields: Vec<String>, +    pub datas: Vec<Vec<String>>, +} + +impl From<(String, Message)> for Response { +    fn from(msg: (String, Message)) -> Self { +        let (acm, msg) = msg; +        Self { +            acm, +            command: msg.command, +            fields: msg.fields.unwrap_or_default(), +            datas: msg.datas.unwrap_or_default(), +            error: msg.error.unwrap_or_default(), +        } +    } +} diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs new file mode 100644 index 0000000..40d024d --- /dev/null +++ b/angelsharkd/src/routes/mod.rs @@ -0,0 +1,137 @@ +use crate::config::Config; +use anyhow::Error as AnyhowError; +use dtos::*; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::debug; +use std::convert::Infallible; +use warp::{ +    body, +    hyper::{header, StatusCode}, +    path, post, +    reply::{self, with}, +    Filter, Rejection, Reply, +}; + +mod dtos; + +/// GET / -> Name and version # of app. +pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { +    path::end().and_then(handle_version) +} + +/// POST /ossi with JSON inputs -> JSON outputs +pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { +    let runner = config.runner.clone(); +    path("ossi") +        .and(post()) +        .and(warp::query::<Query>()) +        .and(json_body()) +        .and(with_runner(runner)) +        .and_then(handle_ossi) +        .with(with::header(header::PRAGMA, "no-cache")) +        .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) +        .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// For passing runner to handlers. +fn with_runner( +    runner: AcmRunner, +) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone { +    warp::any().map(move || runner.clone()) +} + +/// JSON request body filter with content length limit. +fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone { +    body::content_length_limit(1024 * 16).and(body::json()) +} + +/// Handle version requests. +async fn handle_version() -> Result<impl Reply, Infallible> { +    Ok(reply::json(&Version { +        daemon_version: env!("CARGO_PKG_VERSION"), +    })) +} + +/// Handle OSSI requests. +async fn handle_ossi( +    query: Query, +    requests: Vec<Request>, +    mut runner: AcmRunner, +) -> Result<impl Reply, Infallible> { +    debug!("{:?}", query); +    debug!("{:?}", requests); + +    // Queue request inputs on runner. +    for (job_name, input) in requests +        .into_iter() +        .map(|r| -> Vec<(String, Message)> { r.into() }) +        .flatten() +    { +        runner.queue_input(&job_name, &input); +    } + +    // Collect runner results and convert to responses. +    let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() { +        // Run without cache. +        runner +            .run() +            .map(|(name, output)| { +                let output: Vec<Response> = output? +                    .into_iter() +                    .filter_map(move |o| { +                        if o.command == "logoff" { +                            None +                        } else { +                            Some(Response::from((name.clone(), o))) +                        } +                    }) +                    .collect(); +                Ok(output) +            }) +            .collect() +    } else { +        // Run with cache. +        runner +            .run_cached() +            .map(|(name, output)| { +                let output: Vec<Response> = output? +                    .into_iter() +                    .filter_map(move |o| { +                        if o.command == "logoff" { +                            None +                        } else { +                            Some(Response::from((name.clone(), o))) +                        } +                    }) +                    .collect(); +                Ok(output) +            }) +            .collect() +    }; + +    // Handle errors from runner. +    if query.panicky.unwrap_or_default() { +        // Return an internal error if anything went wrong. +        let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect(); +        match responses { +            Err(e) => Ok(reply::with_status( +                reply::json(&Error { +                    reason: e.to_string(), +                }), +                StatusCode::INTERNAL_SERVER_ERROR, +            )), +            Ok(r) => Ok(reply::with_status( +                reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()), +                StatusCode::OK, +            )), +        } +    } else { +        // Discard errors and return just good data. +        let responses: Vec<Response> = responses +            .into_iter() +            .filter_map(|r| r.ok()) +            .flatten() +            .collect(); +        Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) +    } +} |