diff options
author | Carpenter, Adam (CORP) <Adam.Carpenter@adp.com> | 2021-10-12 14:07:10 -0400 |
---|---|---|
committer | Carpenter, Adam (CORP) <Adam.Carpenter@adp.com> | 2021-10-12 14:07:10 -0400 |
commit | 2f67f82a64f4eabfbe758655099f48d8afa07fc3 (patch) | |
tree | 8aaad4e02f09635930b13db9851b3696ac345343 | |
parent | 154367ad2f598b99ca9ccdec175eddcdfa09ef76 (diff) | |
download | altruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.tar.xz altruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.zip |
feat: init upload
-rw-r--r-- | .gitignore | 8 | ||||
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | angelsharkcli/Cargo.toml | 22 | ||||
-rw-r--r-- | angelsharkcli/src/main.rs | 165 | ||||
-rw-r--r-- | angelsharkd/Cargo.toml | 30 | ||||
-rw-r--r-- | angelsharkd/src/config.rs | 49 | ||||
-rw-r--r-- | angelsharkd/src/main.rs | 59 | ||||
-rw-r--r-- | angelsharkd/src/routes/dtos.rs | 67 | ||||
-rw-r--r-- | angelsharkd/src/routes/mod.rs | 137 | ||||
-rw-r--r-- | libangelshark/Cargo.toml | 21 | ||||
-rw-r--r-- | libangelshark/src/acm.rs | 230 | ||||
-rw-r--r-- | libangelshark/src/lib.rs | 7 | ||||
-rw-r--r-- | libangelshark/src/message.rs | 194 | ||||
-rw-r--r-- | libangelshark/src/runner.rs | 77 |
14 files changed, 1068 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..92bb828 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +*.cfg +*.csv +*.json +*.tsv +*.txt +/target +Cargo.lock +angelshark.toml
\ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..c8a87b2 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,2 @@ +[workspace] +members = ["libangelshark", "angelsharkcli", "angelsharkd"]
\ No newline at end of file diff --git a/angelsharkcli/Cargo.toml b/angelsharkcli/Cargo.toml new file mode 100644 index 0000000..5e399f8 --- /dev/null +++ b/angelsharkcli/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "angelsharkcli" +version = "0.1.0" +edition = "2018" +authors = ["Adam T. Carpenter <adam.carpenter@adp.com>"] +description = "A command-line interface into one or more Communication Managers" + +[dependencies.libangelshark] +path = "../libangelshark" + +[dependencies.csv] +version = "1" + +[dependencies.serde_json] +version = "1" + +[dependencies.clap] +version = "2" +default-features = false + +[dependencies.anyhow] +version = "1" diff --git a/angelsharkcli/src/main.rs b/angelsharkcli/src/main.rs new file mode 100644 index 0000000..9d1ac36 --- /dev/null +++ b/angelsharkcli/src/main.rs @@ -0,0 +1,165 @@ +use anyhow::{anyhow, Context, Error, Result}; +use clap::{App, Arg, ArgMatches, SubCommand}; +use csv::{QuoteStyle, WriterBuilder}; +use libangelshark::{Acm, AcmRunner, Message, ParallelIterator}; +use std::{ + fs::File, + io::{stdin, stdout, BufWriter, Write}, +}; + +fn main() -> Result<()> { + // Parse arguments. + let args = parse_args(); + + // Collect logins. + let path = args.value_of("config").unwrap_or("./asa.cfg"); + let logins_file = + File::open(path).with_context(|| format!("Failed to open logins file: {}", path))?; + let acms = Acm::from_logins(logins_file).with_context(|| "Failed to parse logins.")?; + let inputs = Message::from_input(stdin()).with_context(|| "Failed to read input.")?; + + match args.subcommand() { + ("test", _) => { + // Echo the parsed logins and input. + println!("{:?}", acms); + println!("{:?}", inputs); + } + ("man", _) => { + // Print manual pages for the given input. + AcmRunner::new(acms, inputs) + .manuals() + .for_each(|(name, output)| match output { + Err(e) => eprintln!( + "{}", + anyhow!(e).context(format!("angelsharkcli: manual ({})", name)) + ), + Ok(o) => println!("{}", o), + }); + } + ("print", Some(args)) => { + // Run the input and print the output. + let format = args.value_of("format").unwrap_or("tsv"); + let header_row = args.is_present("header_row"); + let prefix = args.value_of("prefix").unwrap_or_default(); + let to_file = args.is_present("to_file"); + + AcmRunner::new(acms, inputs) + .run() + .filter_map(|(name, output)| match output { + Err(e) => { + eprintln!("angelsharkcli: runner ({}): {}", name, e); + None + } + Ok(messages) => Some((name, messages)), + }) + .try_for_each(|(name, outputs)| { + for (command, datas) in outputs.into_iter().filter_map(|message| { + if message.command == "logoff" { + None + } else if let Some(error) = &message.error { + eprintln!("angelsharkcli: ossi ({}): {}", name, error); + None + } else if let Some(datas) = message.datas { + if header_row { + Some(( + message.command, + vec![message.fields.unwrap_or_default()] + .into_iter() + .chain(datas.into_iter()) + .collect(), + )) + } else { + Some((message.command, datas)) + } + } else { + None + } + }) { + let writer: BufWriter<Box<dyn Write>> = BufWriter::new(if to_file { + let filename = format!( + "./{}angelshark -- {} -- {}.{}", + prefix, name, command, format + ); + let file = File::create(&filename).with_context(|| { + format!("Failed to create output file: {}", filename) + })?; + Box::new(file) + } else { + Box::new(stdout()) + }); + + match format { + "json" => { + serde_json::to_writer_pretty(writer, &datas) + .with_context(|| "Failed to write JSON.")?; + } + "csv" => { + let mut writer = WriterBuilder::new() + .quote_style(QuoteStyle::Always) + .from_writer(writer); + + for data in datas { + writer + .write_record(&data) + .with_context(|| "Failed to write CSV.")?; + } + } + _ => { + let mut writer = WriterBuilder::new() + .delimiter(b'\t') + .quote_style(QuoteStyle::Never) + .from_writer(writer); + + for data in datas { + writer + .write_record(&data) + .with_context(|| "Failed to write TSV.")?; + } + } + } + } + + Result::<(), Error>::Ok(()) + })?; + } + _ => { + // Just run the input and print any errors encountered. + AcmRunner::new(acms, inputs) + .run() + .for_each(|(name, output)| match output { + Err(e) => { + eprintln!( + "{}", + anyhow!(e).context(format!("angelsharkcli: runner ({})", name)) + ); + } + Ok(o) => { + for msg in o { + if let Some(e) = msg.error { + eprintln!( + "{}", + anyhow!(e) + .context(format!("angelsharkcli: ossi ({})", name)) + ); + } + } + } + }); + } + } + + Ok(()) +} + +fn parse_args() -> ArgMatches<'static> { + let app = App::new("Altruistic Angelshark CLI") + .author(env!("CARGO_PKG_AUTHORS")) + .about(env!("CARGO_PKG_DESCRIPTION")) + .version(env!("CARGO_PKG_VERSION")) + .long_about("\nReads STDIN and parses all lines as commands to be fed to one or more ACMs. When it reaches EOF, it stops parsing and starts executing the command(s) on the ACM(s). What it does with the output can be configured with subcommands and flags. If you like keeping your commands in a file, consider using the `<` to read it on STDIN. The default behavior is to run commands but print no output (for quick changes). Errors are printed on STDERR.") + .arg(Arg::with_name("config").long("login-file").short("l").default_value("./asa.cfg").help("Set ACM login configuration file")) + .subcommand(SubCommand::with_name("test").about("Prints parsed logins and inputs but does not run anything").long_about("Does not execute commands entered, instead prints out the ACM logins and inputs it read (useful for debugging)")) + .subcommand(SubCommand::with_name("man").about("Prints command manual pages via `ossim` term").long_about("Reads commands on STDIN and prints their SAT manual pages on STDOUT")) + .subcommand(SubCommand::with_name("print").about("Prints command output to STDOUT (or files) in a useful format").long_about("Runs commands on input and writes *their data entries* to STDOUT in variety of formats (and optionally to files)").arg(Arg::with_name("prefix").long("prefix").short("p").takes_value(true).requires("to_file").help("Prepend a prefix to all output filenames")).arg(Arg::with_name("to_file").short("t").long("to-file").help("Write output to separate files instead of STDOUT")).arg(Arg::with_name("header_row").short("h").long("header-row").help("Prepend header entry of hexadecimal field addresses to output")).arg(Arg::with_name("format").short("f").long("format").possible_values(&["csv", "json", "tsv"]).default_value("tsv").help("Format data should be printed in"))); + app.get_matches_safe().unwrap_or_else(|e| e.exit()) +} diff --git a/angelsharkd/Cargo.toml b/angelsharkd/Cargo.toml new file mode 100644 index 0000000..9a5c750 --- /dev/null +++ b/angelsharkd/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "angelsharkd" +version = "0.1.0" +edition = "2018" +authors = ["Adam T. Carpenter <adam.carpenter@adp.com>"] +description = "A HTTP interface into one or more Communication Managers" + +[dependencies.libangelshark] +path = "../libangelshark" + +[dependencies.tokio] +version = "1" +features = ["macros", "rt-multi-thread", "signal"] + +[dependencies.warp] +version = "0.3" +default-features = false + +[dependencies.log] +version = "0.4" + +[dependencies.env_logger] +version = "0.8" + +[dependencies.serde] +version = "1" +features = ["derive"] + +[dependencies.anyhow] +version = "1" diff --git a/angelsharkd/src/config.rs b/angelsharkd/src/config.rs new file mode 100644 index 0000000..b0866be --- /dev/null +++ b/angelsharkd/src/config.rs @@ -0,0 +1,49 @@ +use anyhow::{Context, Result}; +use libangelshark::{Acm, AcmRunner}; +use std::{ + env, + fs::File, + net::{Ipv4Addr, SocketAddrV4}, +}; + +#[derive(Clone)] +pub struct Config { + pub bind_addr: SocketAddrV4, + pub debug_mode: bool, + pub runner: AcmRunner, + pub origin: String, +} + +impl Config { + pub fn init() -> Result<Self> { + let debug_mode = cfg!(debug_assertions) || env::var_os("ANGELSHARKD_DEBUG").is_some(); + + let bind_addr: SocketAddrV4 = env::var("ANGELSHARKD_ADDR") + .map(|addr| addr.parse()) + .unwrap_or_else(|_| Ok(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080))) + .with_context(|| "Failed to parse socket bind address.")?; + + let origin = env::var("ANGELSHARKD_ORIGIN").unwrap_or_default(); + + let logins = if let Ok(path) = env::var("ANGELSHARKD_LOGINS") { + File::open(path) + } else { + File::open("./asa.cfg") + } + .with_context(|| "Failed to open logins file.")?; + + let mut runner = AcmRunner::default(); + for (job_name, acm) in + Acm::from_logins(logins).with_context(|| "Failed to parse logins.")? + { + runner.register_acm(&job_name, acm); + } + + Ok(Self { + bind_addr, + origin, + debug_mode, + runner, + }) + } +} diff --git a/angelsharkd/src/main.rs b/angelsharkd/src/main.rs new file mode 100644 index 0000000..22e4161 --- /dev/null +++ b/angelsharkd/src/main.rs @@ -0,0 +1,59 @@ +use crate::config::Config; +use anyhow::{Context, Result}; +use log::{debug, error, info, LevelFilter}; +use tokio::{signal, task}; +use warp::{hyper::Method, Filter}; + +mod config; +mod routes; + +#[tokio::main] +async fn main() -> Result<()> { + // Init config. + let config = Config::init().with_context(|| "Failed to initialize config.")?; + + // Init logging. + env_logger::Builder::new() + .filter( + None, + if config.debug_mode { + LevelFilter::Debug + } else { + LevelFilter::Info + }, + ) + .init(); + + if config.debug_mode { + debug!("**** DEBUGGING MODE ENABLED ****"); + } + + let routes = routes::index() + .or(routes::ossi(&config)) + .with(if config.debug_mode { + warp::cors() + .allow_any_origin() + .allow_methods(&[Method::GET, Method::POST]) + } else { + warp::cors() + .allow_origin(config.origin.as_str()) + .allow_methods(&[Method::GET, Method::POST]) + }) + .with(warp::log("angelsharkd")); + + // Create server with shutdown signal. + let (addr, server) = warp::serve(routes).bind_with_graceful_shutdown(config.bind_addr, async { + signal::ctrl_c() + .await + .expect("Failed to install CTRL+C signal handler."); + }); + + // Run server to completion. + info!("Starting server on {} ...", addr); + if let Err(e) = task::spawn(server).await { + error!("Server died unexpectedly: {}", e.to_string()); + } + info!("Stopping server..."); + + Ok(()) +} diff --git a/angelsharkd/src/routes/dtos.rs b/angelsharkd/src/routes/dtos.rs new file mode 100644 index 0000000..fe3f184 --- /dev/null +++ b/angelsharkd/src/routes/dtos.rs @@ -0,0 +1,67 @@ +use libangelshark::Message; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Debug)] +pub struct Error { + pub reason: String, +} + +#[derive(Serialize, Debug)] +pub struct Version { + pub daemon_version: &'static str, +} + +#[derive(Deserialize, Debug)] +pub struct Query { + pub no_cache: Option<bool>, + pub panicky: Option<bool>, +} + +#[derive(Deserialize, Debug)] +pub struct Request { + pub acms: Vec<String>, + pub command: String, + pub fields: Option<Vec<String>>, + pub datas: Option<Vec<Vec<String>>>, +} + +impl From<Request> for Vec<(String, Message)> { + fn from(val: Request) -> Self { + val.acms + .iter() + .map(|name| { + ( + name.to_owned(), + Message { + command: val.command.clone(), + fields: val.fields.clone(), + datas: val.datas.clone(), + error: None, + }, + ) + }) + .collect() + } +} + +#[derive(Debug, Serialize)] +pub struct Response { + pub acm: String, + pub command: String, + pub error: String, + pub fields: Vec<String>, + pub datas: Vec<Vec<String>>, +} + +impl From<(String, Message)> for Response { + fn from(msg: (String, Message)) -> Self { + let (acm, msg) = msg; + Self { + acm, + command: msg.command, + fields: msg.fields.unwrap_or_default(), + datas: msg.datas.unwrap_or_default(), + error: msg.error.unwrap_or_default(), + } + } +} diff --git a/angelsharkd/src/routes/mod.rs b/angelsharkd/src/routes/mod.rs new file mode 100644 index 0000000..40d024d --- /dev/null +++ b/angelsharkd/src/routes/mod.rs @@ -0,0 +1,137 @@ +use crate::config::Config; +use anyhow::Error as AnyhowError; +use dtos::*; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::debug; +use std::convert::Infallible; +use warp::{ + body, + hyper::{header, StatusCode}, + path, post, + reply::{self, with}, + Filter, Rejection, Reply, +}; + +mod dtos; + +/// GET / -> Name and version # of app. +pub fn index() -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { + path::end().and_then(handle_version) +} + +/// POST /ossi with JSON inputs -> JSON outputs +pub fn ossi(config: &Config) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { + let runner = config.runner.clone(); + path("ossi") + .and(post()) + .and(warp::query::<Query>()) + .and(json_body()) + .and(with_runner(runner)) + .and_then(handle_ossi) + .with(with::header(header::PRAGMA, "no-cache")) + .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) + .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// For passing runner to handlers. +fn with_runner( + runner: AcmRunner, +) -> impl Filter<Extract = (AcmRunner,), Error = Infallible> + Clone { + warp::any().map(move || runner.clone()) +} + +/// JSON request body filter with content length limit. +fn json_body() -> impl Filter<Extract = (Vec<Request>,), Error = Rejection> + Clone { + body::content_length_limit(1024 * 16).and(body::json()) +} + +/// Handle version requests. +async fn handle_version() -> Result<impl Reply, Infallible> { + Ok(reply::json(&Version { + daemon_version: env!("CARGO_PKG_VERSION"), + })) +} + +/// Handle OSSI requests. +async fn handle_ossi( + query: Query, + requests: Vec<Request>, + mut runner: AcmRunner, +) -> Result<impl Reply, Infallible> { + debug!("{:?}", query); + debug!("{:?}", requests); + + // Queue request inputs on runner. + for (job_name, input) in requests + .into_iter() + .map(|r| -> Vec<(String, Message)> { r.into() }) + .flatten() + { + runner.queue_input(&job_name, &input); + } + + // Collect runner results and convert to responses. + let responses: Vec<Result<Vec<Response>, AnyhowError>> = if query.no_cache.unwrap_or_default() { + // Run without cache. + runner + .run() + .map(|(name, output)| { + let output: Vec<Response> = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + } else { + // Run with cache. + runner + .run_cached() + .map(|(name, output)| { + let output: Vec<Response> = output? + .into_iter() + .filter_map(move |o| { + if o.command == "logoff" { + None + } else { + Some(Response::from((name.clone(), o))) + } + }) + .collect(); + Ok(output) + }) + .collect() + }; + + // Handle errors from runner. + if query.panicky.unwrap_or_default() { + // Return an internal error if anything went wrong. + let responses: Result<Vec<Vec<Response>>, _> = responses.into_iter().collect(); + match responses { + Err(e) => Ok(reply::with_status( + reply::json(&Error { + reason: e.to_string(), + }), + StatusCode::INTERNAL_SERVER_ERROR, + )), + Ok(r) => Ok(reply::with_status( + reply::json(&r.into_iter().flatten().collect::<Vec<Response>>()), + StatusCode::OK, + )), + } + } else { + // Discard errors and return just good data. + let responses: Vec<Response> = responses + .into_iter() + .filter_map(|r| r.ok()) + .flatten() + .collect(); + Ok(reply::with_status(reply::json(&responses), StatusCode::OK)) + } +} diff --git a/libangelshark/Cargo.toml b/libangelshark/Cargo.toml new file mode 100644 index 0000000..029bd77 --- /dev/null +++ b/libangelshark/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libangelshark" +version = "0.1.0" +edition = "2018" +authors = ["Adam T. Carpenter <adam.carpenter@adp.com>"] +description = "A Communication Manager automation library and command runner." + +[dependencies.anyhow] +version = "1" + +[dependencies.rayon] +version = "1" + +[dependencies.ssh2] +version = "0.9" +features = ["vendored-openssl"] + +[dependencies.cached] +version = "0.25" +default-features = false +features = ["proc_macro"] diff --git a/libangelshark/src/acm.rs b/libangelshark/src/acm.rs new file mode 100644 index 0000000..a9ffae1 --- /dev/null +++ b/libangelshark/src/acm.rs @@ -0,0 +1,230 @@ +use crate::Message; +use anyhow::{anyhow, Context, Result}; +use cached::{proc_macro::cached, Return, TimedCache}; +use ssh2::{KeyboardInteractivePrompt, Prompt, Session, Stream}; +use std::{ + fmt::Debug, + io::{BufRead, BufReader, Read, Write}, + net::{Ipv4Addr, TcpStream}, +}; + +const DEFAULT_PORT: u16 = 5022; +const OSSI_LOGOFF: &[u8] = b"clogoff\nt\ny\n"; +const OSSI_MAN_TERM: &[u8] = b"ossiem\n"; +const OSSI_TERM: &[u8] = b"ossie\n"; +const TERM: &str = "vt100"; +const TERM_DIMS: (u32, u32, u32, u32) = (81, 25, 0, 0); +const TIMEOUT_MS: u32 = 30000; // Thirty second read/write timeout. + +/// Represents a Communication Manager and its login information. Executes +/// collections of [Message]s on an ACM over SSH. +#[derive(Clone)] +pub struct Acm { + addr: Ipv4Addr, + port: Option<u16>, + user: String, + pass: String, +} + +impl Debug for Acm { + /// Formats an ACM's configuration information for debugging. Masks passwords. + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Acm") + .field("addr", &self.addr) + .field("port", &self.port.unwrap_or(DEFAULT_PORT)) + .field("user", &self.user) + .field("pass", &"********") + .finish() + } +} + +impl Default for Acm { + fn default() -> Self { + Self { + addr: Ipv4Addr::new(127, 0, 0, 1), + port: Default::default(), + user: Default::default(), + pass: Default::default(), + } + } +} + +impl Acm { + /// Adds an IPv4 address to ACM config. + pub fn with_addr(&mut self, addr: Ipv4Addr) -> &mut Self { + self.addr = addr; + self + } + + /// Adds a port to ACM config. + pub fn with_port(&mut self, port: u16) -> &mut Self { + self.port = Some(port); + self + } + + /// Adds a login usermame to ACM config. + pub fn with_user(&mut self, user: &str) -> &mut Self { + self.user = user.into(); + self + } + + /// Adds a login password to ACM config. + pub fn with_pass(&mut self, pass: &str) -> &mut Self { + self.pass = pass.into(); + self + } + + /// Opens a readable and writable SSH stream into an ACM's Site Administration Terminal. + fn open_stream(&self, term: &[u8]) -> Result<Stream> { + // Initialize SSH session. + let stream = TcpStream::connect((self.addr, self.port.unwrap_or(DEFAULT_PORT))) + .with_context(|| "Failed to open TCP stream to host. Make sure the config is correct and the host is otherwise reachable.")?; + let mut session = Session::new().with_context(|| "Failed to start SSH session.")?; + session.set_tcp_stream(stream); + session.set_timeout(TIMEOUT_MS); + session + .handshake() + .with_context(|| "SSH handshake failed.")?; + session + .userauth_keyboard_interactive(&self.user, &mut SshPrompter::new(&self.pass)) + .with_context(|| "Interactive SSH keyboard authentication failed.")?; + + // Open shell on SSH channel. + let mut channel = session + .channel_session() + .with_context(|| "Failed to open SSH channel on SSH session.")?; + channel + .request_pty(TERM, None, Some(TERM_DIMS)) + .with_context(|| "Failed to open PTY on SSH channel.")?; + channel + .shell() + .with_context(|| "Failed to open shell on SSH channel.")?; + channel + .write_all(term) + .with_context(|| "Failed to send OSSI term.")?; + + // Waits until OSSI terminator is read to return channel. If this times + // out, something went wrong with login or ACM is unreachable. + let mut lines = BufReader::new(channel.stream(0)).lines(); + while let Some(Ok(line)) = lines.next() { + if line == "t" { + return Ok(channel.stream(0)); + } + } + + Err(anyhow!("Never reached OSSI term prompt.")) + } + + /// Runs a given collection of [Message]s (OSSI commands) on the ACM and returns their resulting output. + pub fn run(&self, inputs: &[Message]) -> Result<Vec<Message>> { + let inputs: String = inputs.iter().map(Message::to_string).collect(); + let mut stream = self.open_stream(OSSI_TERM)?; + write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?; + stream + .write_all(OSSI_LOGOFF) + .with_context(|| "Failed to write LOGOFF to OSSI stream.")?; + Message::from_output(stream) + } + + /// Like [Self::run], but caches results with a timed cache of thirty minutes. + pub fn run_cached(&self, inputs: &[Message]) -> Result<Vec<Message>> { + Ok(run_cached(self, inputs)?.value) + } + + /// Like [Self::run], but instead of running [Message]s, it returns the manual pages for the provided OSSI commands. + pub fn manual(&self, inputs: &[Message]) -> Result<String> { + let inputs: String = inputs.iter().map(Message::to_string).collect(); + let mut stream = self.open_stream(OSSI_MAN_TERM)?; + write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?; + stream + .write_all(OSSI_LOGOFF) + .with_context(|| "Failed to write LOGOFF to OSSI stream.")?; + let mut output = String::new(); + stream + .read_to_string(&mut output) + .with_context(|| "Failed to read manual pages to string.")?; + Ok(output) + } + + /// Reads from `readable`, parsing lines as `asa.cfg`-formatted ACM logins. + /// Returns a collection of the parsed logins and their associated + /// names/labels. The spec looks like this: + /// + /// ```text + /// (name user:pass@addr:optional_port) + /// ACM01 admin:secret@192.168.1.1:5022 + /// ACM02 acdmin:secret@192.168.1.2 + /// ACM03 acdmin:secret@192.168.1.3:5023 + /// ``` + /// + /// The port is optional. If it is not provided, the default SAT port of + /// 5022 will be used. + pub fn from_logins(readable: impl Read) -> Result<Vec<(String, Self)>> { + let mut acms = Vec::new(); + + for line in BufReader::new(readable).lines() { + let line = line.with_context(|| "Failed to read line of config.")?; + let mut acm = Self::default(); + + if let Some((name, config)) = line.split_once(' ') { + if let Some((creds, dest)) = config.split_once('@') { + if let Some((user, pass)) = creds.split_once(':') { + if let Some((addr, port)) = dest.split_once(':') { + acm.with_port( + port.parse() + .with_context(|| "Failed to parse ACM socket port.")?, + ) + .with_addr( + addr.parse() + .with_context(|| "Failed to parse ACM IP address.")?, + ); + } else { + acm.with_addr( + dest.parse() + .with_context(|| "Failed to parse ACM IP address.")?, + ); + } + + acm.with_user(user).with_pass(pass); + acms.push((name.into(), acm)); + } + } + } + } + Ok(acms) + } +} + +/// This memoized function is a timed cache where entries are evicted after +/// thirty minutes. Errors are not cached, only successes. +#[cached( + result = true, + type = "TimedCache<String, Return<Vec<Message>>>", + create = "{ TimedCache::with_lifespan(1800) }", + convert = r#"{ format!("{}{:?}{:?}", acm.addr, acm.port, inputs) }"# +)] +fn run_cached(acm: &Acm, inputs: &[Message]) -> Result<Return<Vec<Message>>> { + Ok(Return::new(acm.run(inputs)?)) +} + +/// Used internally for password-based SSH authentication. +struct SshPrompter<'a> { + pass: &'a str, +} + +impl<'a> SshPrompter<'a> { + fn new(pass: &'a str) -> Self { + Self { pass } + } +} + +impl<'a> KeyboardInteractivePrompt for SshPrompter<'a> { + fn prompt<'b>( + &mut self, + _username: &str, + _instructions: &str, + _prompts: &[Prompt<'b>], + ) -> Vec<String> { + vec![self.pass.to_owned()] + } +} diff --git a/libangelshark/src/lib.rs b/libangelshark/src/lib.rs new file mode 100644 index 0000000..4afe31a --- /dev/null +++ b/libangelshark/src/lib.rs @@ -0,0 +1,7 @@ +mod acm; +mod message; +mod runner; + +pub use acm::*; +pub use message::*; +pub use runner::*; diff --git a/libangelshark/src/message.rs b/libangelshark/src/message.rs new file mode 100644 index 0000000..0dac534 --- /dev/null +++ b/libangelshark/src/message.rs @@ -0,0 +1,194 @@ +use anyhow::{Context, Result}; +use std::{ + fmt::{Display, Formatter, Result as FmtResult}, + io::{BufRead, BufReader, Read}, +}; + +/// OSSI Messaging Delimiters +const ACM_D: &str = "a"; +const COMMAND_D: &str = "c"; +const DATA_D: &str = "d"; +const ERROR_D: &str = "e"; +const FIELD_D: &str = "f"; +const NEW_DATA_D: &str = "n"; +const TERMINATOR_D: &str = "t"; +const TAB: &str = "\t"; + +/// An OSSI protocol message. Used for input and output to and from the ACM. The +/// OSSI protocol is proprietary, and little documented. Here is a brief +/// overview. Every message consists of a single command and a +/// terminator used to separate messages. The message may also carry optional +/// tab-separated fields and datas when used for input. When used for output, it may +/// optionally carry an error. Example: +/// +/// ```text +/// (input) +/// clist object +/// t +/// (output) +/// clist object +/// fhex_code\thex_code\thex_code +/// dentry\tentry\tentry +/// n +/// dentry\tentry\tentry +/// t +/// ``` +#[derive(Debug, Default, Clone)] +pub struct Message { + pub command: String, + pub fields: Option<Vec<String>>, + pub datas: Option<Vec<Vec<String>>>, + pub error: Option<String>, +} + +impl Message { + /// Creates a new [Message] from its basic part: the command. + pub fn new(command: &str) -> Self { + Self { + command: command.into(), + ..Default::default() + } + } + + fn add_fields(&mut self, fields: Vec<String>) -> &mut Self { + if !fields.is_empty() { + if let Some(ref mut existing) = self.fields { + existing.extend(fields.into_iter()); + } else { + self.fields = Some(fields); + } + } + + self + } + + fn add_data_entry(&mut self, data: Vec<String>) -> &mut Self { + if !data.is_empty() { + if let Some(ref mut existing) = self.datas { + existing.push(data); + } else { + self.datas = Some(vec![data]); + } + } + + self + } + + /// Reads from `readable`, parsing lines as Angelshark-formatted OSSI input. + /// This closely follows the OSSI spec, but also parses ACM names/labels on + /// lines beginning with `'a'`. The spec looks like this: + /// + /// ```text + /// aACM01 + /// clist object + /// fhex_code\thex_code\thex_code + /// dentry\tentry\tentry + /// t + /// ... + /// ``` + pub fn from_input(readable: impl Read) -> Result<Vec<(String, Self)>> { + let mut data = Vec::new(); + let mut input = Self::default(); + let mut inputs = Vec::new(); + let mut names: Vec<String> = Vec::new(); + + for line in BufReader::new(readable).lines() { + let line = line.with_context(|| "Failed to read line of input.")?; + let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..)); + + match (delim, content) { + (ACM_D, Some(a)) => { + names.extend(a.split(TAB).map(String::from)); + } + (COMMAND_D, Some(c)) => { + input.command = c.into(); + } + (FIELD_D, Some(f)) => { + input.add_fields(f.split(TAB).map(String::from).collect()); + } + (DATA_D, Some(d)) => { + data.extend(d.split(TAB).map(String::from)); + } + (TERMINATOR_D, _) => { + input.add_data_entry(data); + inputs.extend(names.into_iter().map(|n| (n, input.clone()))); + names = Vec::new(); + input = Message::default(); + data = Vec::new(); + } + _ => { + // Skip blank lines and unknown identifiers. + } + } + } + + Ok(inputs) + } + + /// Reads `readable` and parses lines as ACM OSSI output. This should exactly follow the OSSI spec. + pub fn from_output(readable: impl Read) -> Result<Vec<Self>> { + let mut data = Vec::new(); + let mut output = Self::default(); + let mut outputs = Vec::new(); + + for line in BufReader::new(readable).lines() { + let line = line.with_context(|| "Failed to read line of output.")?; + let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..)); + + match (delim, content) { + (COMMAND_D, Some(c)) => { + output.command = c.into(); + } + (ERROR_D, Some(e)) => { + output.error = Some(e.into()); + } + (FIELD_D, Some(f)) => { + output.add_fields(f.split(TAB).map(String::from).collect()); + } + (DATA_D, Some(d)) => { + data.extend(d.split(TAB).map(String::from)); + } + (NEW_DATA_D, _) => { + output.add_data_entry(data); + data = Vec::new(); + } + (TERMINATOR_D, _) => { + output.add_data_entry(data); + data = Vec::new(); + outputs.push(output); + output = Self::default(); + } + _ => { + // Ignore unknown identifiers and blank lines. + } + } + } + + Ok(outputs) + } +} + +impl Display for Message { + fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { + let mut message = format!("c{}\n", self.command); + + if let Some(error) = &self.error { + message = message + &format!("e{}\n", error); + } + + if let Some(fields) = &self.fields { + message = message + &format!("f{}\n", fields.join(TAB)); + } + + if let Some(datas) = &self.datas { + message = message + + &datas + .iter() + .map(|d| format!("d{}\n", d.join(TAB))) + .collect::<Vec<String>>() + .join("n\n"); + } + + writeln!(f, "{}\nt", message) + } +} diff --git a/libangelshark/src/runner.rs b/libangelshark/src/runner.rs new file mode 100644 index 0000000..6d41bd2 --- /dev/null +++ b/libangelshark/src/runner.rs @@ -0,0 +1,77 @@ +use crate::{Acm, Message}; +use anyhow::Result; +use rayon::iter::IntoParallelIterator; +pub use rayon::iter::ParallelIterator; +use std::collections::HashMap; + +/// Allows for more convenient running of OSSI [Message]s on one or more [Acm]s, +/// parallelizing over the ACMs and (optionally) caching results for faster future runs. +/// +/// This is the intended high-level use of Angelshark. It holds a collection of +/// "jobs", which are tagged with ACM names/labels and their associated logins ([Acm]s) and [Message]s). +#[derive(Default, Debug, Clone)] +pub struct AcmRunner(HashMap<String, (Acm, Vec<Message>)>); + +impl AcmRunner { + /// Constructs a new [AcmRunner] from tagged [Acm]s and [Message]s. + pub fn new(acms: Vec<(String, Acm)>, inputs: Vec<(String, Message)>) -> Self { + let mut runner = AcmRunner::default(); + for (name, acm) in acms { + runner.register_acm(&name, acm); + } + for (name, input) in inputs { + runner.queue_input(&name, &input); + } + runner + } + + /// Registers an [Acm] as `job_name` in the runner. + pub fn register_acm(&mut self, job_name: &str, acm: Acm) -> &mut Self { + self.0.insert(job_name.into(), (acm, Vec::new())); + self + } + + /// Queues a [Message] to be run on an [Acm] registered as `job_name`. + pub fn queue_input(&mut self, job_name: &str, input: &Message) -> &mut Self { + if let Some((_, inputs)) = self.0.get_mut(job_name) { + inputs.push(input.clone()); + } + self + } + + /// Runs the queued [Message] inputs on the registered [Acm]s and returns + /// the results. The results are returned as an iterator. The iterator must + /// be in some way consumed, collected, or iterated over before the runner + /// starts running commands, i.e. it is lazy. Once this begins, results are + /// computed in parallel over the ACMs. The order of outputs is undefined. + pub fn run(self) -> impl ParallelIterator<Item = RunOutput> { + self.0 + .into_par_iter() + .filter(|(_, (_, inputs))| !inputs.is_empty()) + .map(|(job_name, (acm, inputs))| (job_name, acm.run(&inputs))) + } + + /// Functionally equivalent to [Self::run] but caches results for 30 minutes + /// to make future lookups faster. + pub fn run_cached(self) -> impl ParallelIterator<Item = RunOutput> { + self.0 + .into_par_iter() + .filter(|(_, (_, inputs))| !inputs.is_empty()) + .map(|(job_name, (acm, inputs))| (job_name, acm.run_cached(&inputs))) + } + + /// Functionally equivalent to [Self::run] but returns manual pages for + /// inputs instead of executing them. + pub fn manuals(self) -> impl ParallelIterator<Item = ManualOutput> { + self.0 + .into_par_iter() + .filter(|(_, (_, inputs))| !inputs.is_empty()) + .map(|(job_name, (acm, inputs))| (job_name, acm.manual(&inputs))) + } +} + +/// Every resulting entry of [AcmRunner::run] +pub type RunOutput = (String, Result<Vec<Message>>); + +/// Every resulting entry of [AcmRunner::manuals] +pub type ManualOutput = (String, Result<String>); |