summaryrefslogtreecommitdiff
path: root/libangelshark
diff options
context:
space:
mode:
authorCarpenter, Adam (CORP) <Adam.Carpenter@adp.com>2021-10-12 14:07:10 -0400
committerCarpenter, Adam (CORP) <Adam.Carpenter@adp.com>2021-10-12 14:07:10 -0400
commit2f67f82a64f4eabfbe758655099f48d8afa07fc3 (patch)
tree8aaad4e02f09635930b13db9851b3696ac345343 /libangelshark
parent154367ad2f598b99ca9ccdec175eddcdfa09ef76 (diff)
downloadaltruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.tar.xz
altruistic-angelshark-2f67f82a64f4eabfbe758655099f48d8afa07fc3.zip
feat: init upload
Diffstat (limited to 'libangelshark')
-rw-r--r--libangelshark/Cargo.toml21
-rw-r--r--libangelshark/src/acm.rs230
-rw-r--r--libangelshark/src/lib.rs7
-rw-r--r--libangelshark/src/message.rs194
-rw-r--r--libangelshark/src/runner.rs77
5 files changed, 529 insertions, 0 deletions
diff --git a/libangelshark/Cargo.toml b/libangelshark/Cargo.toml
new file mode 100644
index 0000000..029bd77
--- /dev/null
+++ b/libangelshark/Cargo.toml
@@ -0,0 +1,21 @@
+[package]
+name = "libangelshark"
+version = "0.1.0"
+edition = "2018"
+authors = ["Adam T. Carpenter <adam.carpenter@adp.com>"]
+description = "A Communication Manager automation library and command runner."
+
+[dependencies.anyhow]
+version = "1"
+
+[dependencies.rayon]
+version = "1"
+
+[dependencies.ssh2]
+version = "0.9"
+features = ["vendored-openssl"]
+
+[dependencies.cached]
+version = "0.25"
+default-features = false
+features = ["proc_macro"]
diff --git a/libangelshark/src/acm.rs b/libangelshark/src/acm.rs
new file mode 100644
index 0000000..a9ffae1
--- /dev/null
+++ b/libangelshark/src/acm.rs
@@ -0,0 +1,230 @@
+use crate::Message;
+use anyhow::{anyhow, Context, Result};
+use cached::{proc_macro::cached, Return, TimedCache};
+use ssh2::{KeyboardInteractivePrompt, Prompt, Session, Stream};
+use std::{
+ fmt::Debug,
+ io::{BufRead, BufReader, Read, Write},
+ net::{Ipv4Addr, TcpStream},
+};
+
+const DEFAULT_PORT: u16 = 5022;
+const OSSI_LOGOFF: &[u8] = b"clogoff\nt\ny\n";
+const OSSI_MAN_TERM: &[u8] = b"ossiem\n";
+const OSSI_TERM: &[u8] = b"ossie\n";
+const TERM: &str = "vt100";
+const TERM_DIMS: (u32, u32, u32, u32) = (81, 25, 0, 0);
+const TIMEOUT_MS: u32 = 30000; // Thirty second read/write timeout.
+
+/// Represents a Communication Manager and its login information. Executes
+/// collections of [Message]s on an ACM over SSH.
+#[derive(Clone)]
+pub struct Acm {
+ addr: Ipv4Addr,
+ port: Option<u16>,
+ user: String,
+ pass: String,
+}
+
+impl Debug for Acm {
+ /// Formats an ACM's configuration information for debugging. Masks passwords.
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Acm")
+ .field("addr", &self.addr)
+ .field("port", &self.port.unwrap_or(DEFAULT_PORT))
+ .field("user", &self.user)
+ .field("pass", &"********")
+ .finish()
+ }
+}
+
+impl Default for Acm {
+ fn default() -> Self {
+ Self {
+ addr: Ipv4Addr::new(127, 0, 0, 1),
+ port: Default::default(),
+ user: Default::default(),
+ pass: Default::default(),
+ }
+ }
+}
+
+impl Acm {
+ /// Adds an IPv4 address to ACM config.
+ pub fn with_addr(&mut self, addr: Ipv4Addr) -> &mut Self {
+ self.addr = addr;
+ self
+ }
+
+ /// Adds a port to ACM config.
+ pub fn with_port(&mut self, port: u16) -> &mut Self {
+ self.port = Some(port);
+ self
+ }
+
+ /// Adds a login usermame to ACM config.
+ pub fn with_user(&mut self, user: &str) -> &mut Self {
+ self.user = user.into();
+ self
+ }
+
+ /// Adds a login password to ACM config.
+ pub fn with_pass(&mut self, pass: &str) -> &mut Self {
+ self.pass = pass.into();
+ self
+ }
+
+ /// Opens a readable and writable SSH stream into an ACM's Site Administration Terminal.
+ fn open_stream(&self, term: &[u8]) -> Result<Stream> {
+ // Initialize SSH session.
+ let stream = TcpStream::connect((self.addr, self.port.unwrap_or(DEFAULT_PORT)))
+ .with_context(|| "Failed to open TCP stream to host. Make sure the config is correct and the host is otherwise reachable.")?;
+ let mut session = Session::new().with_context(|| "Failed to start SSH session.")?;
+ session.set_tcp_stream(stream);
+ session.set_timeout(TIMEOUT_MS);
+ session
+ .handshake()
+ .with_context(|| "SSH handshake failed.")?;
+ session
+ .userauth_keyboard_interactive(&self.user, &mut SshPrompter::new(&self.pass))
+ .with_context(|| "Interactive SSH keyboard authentication failed.")?;
+
+ // Open shell on SSH channel.
+ let mut channel = session
+ .channel_session()
+ .with_context(|| "Failed to open SSH channel on SSH session.")?;
+ channel
+ .request_pty(TERM, None, Some(TERM_DIMS))
+ .with_context(|| "Failed to open PTY on SSH channel.")?;
+ channel
+ .shell()
+ .with_context(|| "Failed to open shell on SSH channel.")?;
+ channel
+ .write_all(term)
+ .with_context(|| "Failed to send OSSI term.")?;
+
+ // Waits until OSSI terminator is read to return channel. If this times
+ // out, something went wrong with login or ACM is unreachable.
+ let mut lines = BufReader::new(channel.stream(0)).lines();
+ while let Some(Ok(line)) = lines.next() {
+ if line == "t" {
+ return Ok(channel.stream(0));
+ }
+ }
+
+ Err(anyhow!("Never reached OSSI term prompt."))
+ }
+
+ /// Runs a given collection of [Message]s (OSSI commands) on the ACM and returns their resulting output.
+ pub fn run(&self, inputs: &[Message]) -> Result<Vec<Message>> {
+ let inputs: String = inputs.iter().map(Message::to_string).collect();
+ let mut stream = self.open_stream(OSSI_TERM)?;
+ write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?;
+ stream
+ .write_all(OSSI_LOGOFF)
+ .with_context(|| "Failed to write LOGOFF to OSSI stream.")?;
+ Message::from_output(stream)
+ }
+
+ /// Like [Self::run], but caches results with a timed cache of thirty minutes.
+ pub fn run_cached(&self, inputs: &[Message]) -> Result<Vec<Message>> {
+ Ok(run_cached(self, inputs)?.value)
+ }
+
+ /// Like [Self::run], but instead of running [Message]s, it returns the manual pages for the provided OSSI commands.
+ pub fn manual(&self, inputs: &[Message]) -> Result<String> {
+ let inputs: String = inputs.iter().map(Message::to_string).collect();
+ let mut stream = self.open_stream(OSSI_MAN_TERM)?;
+ write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?;
+ stream
+ .write_all(OSSI_LOGOFF)
+ .with_context(|| "Failed to write LOGOFF to OSSI stream.")?;
+ let mut output = String::new();
+ stream
+ .read_to_string(&mut output)
+ .with_context(|| "Failed to read manual pages to string.")?;
+ Ok(output)
+ }
+
+ /// Reads from `readable`, parsing lines as `asa.cfg`-formatted ACM logins.
+ /// Returns a collection of the parsed logins and their associated
+ /// names/labels. The spec looks like this:
+ ///
+ /// ```text
+ /// (name user:pass@addr:optional_port)
+ /// ACM01 admin:secret@192.168.1.1:5022
+ /// ACM02 acdmin:secret@192.168.1.2
+ /// ACM03 acdmin:secret@192.168.1.3:5023
+ /// ```
+ ///
+ /// The port is optional. If it is not provided, the default SAT port of
+ /// 5022 will be used.
+ pub fn from_logins(readable: impl Read) -> Result<Vec<(String, Self)>> {
+ let mut acms = Vec::new();
+
+ for line in BufReader::new(readable).lines() {
+ let line = line.with_context(|| "Failed to read line of config.")?;
+ let mut acm = Self::default();
+
+ if let Some((name, config)) = line.split_once(' ') {
+ if let Some((creds, dest)) = config.split_once('@') {
+ if let Some((user, pass)) = creds.split_once(':') {
+ if let Some((addr, port)) = dest.split_once(':') {
+ acm.with_port(
+ port.parse()
+ .with_context(|| "Failed to parse ACM socket port.")?,
+ )
+ .with_addr(
+ addr.parse()
+ .with_context(|| "Failed to parse ACM IP address.")?,
+ );
+ } else {
+ acm.with_addr(
+ dest.parse()
+ .with_context(|| "Failed to parse ACM IP address.")?,
+ );
+ }
+
+ acm.with_user(user).with_pass(pass);
+ acms.push((name.into(), acm));
+ }
+ }
+ }
+ }
+ Ok(acms)
+ }
+}
+
+/// This memoized function is a timed cache where entries are evicted after
+/// thirty minutes. Errors are not cached, only successes.
+#[cached(
+ result = true,
+ type = "TimedCache<String, Return<Vec<Message>>>",
+ create = "{ TimedCache::with_lifespan(1800) }",
+ convert = r#"{ format!("{}{:?}{:?}", acm.addr, acm.port, inputs) }"#
+)]
+fn run_cached(acm: &Acm, inputs: &[Message]) -> Result<Return<Vec<Message>>> {
+ Ok(Return::new(acm.run(inputs)?))
+}
+
+/// Used internally for password-based SSH authentication.
+struct SshPrompter<'a> {
+ pass: &'a str,
+}
+
+impl<'a> SshPrompter<'a> {
+ fn new(pass: &'a str) -> Self {
+ Self { pass }
+ }
+}
+
+impl<'a> KeyboardInteractivePrompt for SshPrompter<'a> {
+ fn prompt<'b>(
+ &mut self,
+ _username: &str,
+ _instructions: &str,
+ _prompts: &[Prompt<'b>],
+ ) -> Vec<String> {
+ vec![self.pass.to_owned()]
+ }
+}
diff --git a/libangelshark/src/lib.rs b/libangelshark/src/lib.rs
new file mode 100644
index 0000000..4afe31a
--- /dev/null
+++ b/libangelshark/src/lib.rs
@@ -0,0 +1,7 @@
+mod acm;
+mod message;
+mod runner;
+
+pub use acm::*;
+pub use message::*;
+pub use runner::*;
diff --git a/libangelshark/src/message.rs b/libangelshark/src/message.rs
new file mode 100644
index 0000000..0dac534
--- /dev/null
+++ b/libangelshark/src/message.rs
@@ -0,0 +1,194 @@
+use anyhow::{Context, Result};
+use std::{
+ fmt::{Display, Formatter, Result as FmtResult},
+ io::{BufRead, BufReader, Read},
+};
+
+/// OSSI Messaging Delimiters
+const ACM_D: &str = "a";
+const COMMAND_D: &str = "c";
+const DATA_D: &str = "d";
+const ERROR_D: &str = "e";
+const FIELD_D: &str = "f";
+const NEW_DATA_D: &str = "n";
+const TERMINATOR_D: &str = "t";
+const TAB: &str = "\t";
+
+/// An OSSI protocol message. Used for input and output to and from the ACM. The
+/// OSSI protocol is proprietary, and little documented. Here is a brief
+/// overview. Every message consists of a single command and a
+/// terminator used to separate messages. The message may also carry optional
+/// tab-separated fields and datas when used for input. When used for output, it may
+/// optionally carry an error. Example:
+///
+/// ```text
+/// (input)
+/// clist object
+/// t
+/// (output)
+/// clist object
+/// fhex_code\thex_code\thex_code
+/// dentry\tentry\tentry
+/// n
+/// dentry\tentry\tentry
+/// t
+/// ```
+#[derive(Debug, Default, Clone)]
+pub struct Message {
+ pub command: String,
+ pub fields: Option<Vec<String>>,
+ pub datas: Option<Vec<Vec<String>>>,
+ pub error: Option<String>,
+}
+
+impl Message {
+ /// Creates a new [Message] from its basic part: the command.
+ pub fn new(command: &str) -> Self {
+ Self {
+ command: command.into(),
+ ..Default::default()
+ }
+ }
+
+ fn add_fields(&mut self, fields: Vec<String>) -> &mut Self {
+ if !fields.is_empty() {
+ if let Some(ref mut existing) = self.fields {
+ existing.extend(fields.into_iter());
+ } else {
+ self.fields = Some(fields);
+ }
+ }
+
+ self
+ }
+
+ fn add_data_entry(&mut self, data: Vec<String>) -> &mut Self {
+ if !data.is_empty() {
+ if let Some(ref mut existing) = self.datas {
+ existing.push(data);
+ } else {
+ self.datas = Some(vec![data]);
+ }
+ }
+
+ self
+ }
+
+ /// Reads from `readable`, parsing lines as Angelshark-formatted OSSI input.
+ /// This closely follows the OSSI spec, but also parses ACM names/labels on
+ /// lines beginning with `'a'`. The spec looks like this:
+ ///
+ /// ```text
+ /// aACM01
+ /// clist object
+ /// fhex_code\thex_code\thex_code
+ /// dentry\tentry\tentry
+ /// t
+ /// ...
+ /// ```
+ pub fn from_input(readable: impl Read) -> Result<Vec<(String, Self)>> {
+ let mut data = Vec::new();
+ let mut input = Self::default();
+ let mut inputs = Vec::new();
+ let mut names: Vec<String> = Vec::new();
+
+ for line in BufReader::new(readable).lines() {
+ let line = line.with_context(|| "Failed to read line of input.")?;
+ let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..));
+
+ match (delim, content) {
+ (ACM_D, Some(a)) => {
+ names.extend(a.split(TAB).map(String::from));
+ }
+ (COMMAND_D, Some(c)) => {
+ input.command = c.into();
+ }
+ (FIELD_D, Some(f)) => {
+ input.add_fields(f.split(TAB).map(String::from).collect());
+ }
+ (DATA_D, Some(d)) => {
+ data.extend(d.split(TAB).map(String::from));
+ }
+ (TERMINATOR_D, _) => {
+ input.add_data_entry(data);
+ inputs.extend(names.into_iter().map(|n| (n, input.clone())));
+ names = Vec::new();
+ input = Message::default();
+ data = Vec::new();
+ }
+ _ => {
+ // Skip blank lines and unknown identifiers.
+ }
+ }
+ }
+
+ Ok(inputs)
+ }
+
+ /// Reads `readable` and parses lines as ACM OSSI output. This should exactly follow the OSSI spec.
+ pub fn from_output(readable: impl Read) -> Result<Vec<Self>> {
+ let mut data = Vec::new();
+ let mut output = Self::default();
+ let mut outputs = Vec::new();
+
+ for line in BufReader::new(readable).lines() {
+ let line = line.with_context(|| "Failed to read line of output.")?;
+ let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..));
+
+ match (delim, content) {
+ (COMMAND_D, Some(c)) => {
+ output.command = c.into();
+ }
+ (ERROR_D, Some(e)) => {
+ output.error = Some(e.into());
+ }
+ (FIELD_D, Some(f)) => {
+ output.add_fields(f.split(TAB).map(String::from).collect());
+ }
+ (DATA_D, Some(d)) => {
+ data.extend(d.split(TAB).map(String::from));
+ }
+ (NEW_DATA_D, _) => {
+ output.add_data_entry(data);
+ data = Vec::new();
+ }
+ (TERMINATOR_D, _) => {
+ output.add_data_entry(data);
+ data = Vec::new();
+ outputs.push(output);
+ output = Self::default();
+ }
+ _ => {
+ // Ignore unknown identifiers and blank lines.
+ }
+ }
+ }
+
+ Ok(outputs)
+ }
+}
+
+impl Display for Message {
+ fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
+ let mut message = format!("c{}\n", self.command);
+
+ if let Some(error) = &self.error {
+ message = message + &format!("e{}\n", error);
+ }
+
+ if let Some(fields) = &self.fields {
+ message = message + &format!("f{}\n", fields.join(TAB));
+ }
+
+ if let Some(datas) = &self.datas {
+ message = message
+ + &datas
+ .iter()
+ .map(|d| format!("d{}\n", d.join(TAB)))
+ .collect::<Vec<String>>()
+ .join("n\n");
+ }
+
+ writeln!(f, "{}\nt", message)
+ }
+}
diff --git a/libangelshark/src/runner.rs b/libangelshark/src/runner.rs
new file mode 100644
index 0000000..6d41bd2
--- /dev/null
+++ b/libangelshark/src/runner.rs
@@ -0,0 +1,77 @@
+use crate::{Acm, Message};
+use anyhow::Result;
+use rayon::iter::IntoParallelIterator;
+pub use rayon::iter::ParallelIterator;
+use std::collections::HashMap;
+
+/// Allows for more convenient running of OSSI [Message]s on one or more [Acm]s,
+/// parallelizing over the ACMs and (optionally) caching results for faster future runs.
+///
+/// This is the intended high-level use of Angelshark. It holds a collection of
+/// "jobs", which are tagged with ACM names/labels and their associated logins ([Acm]s) and [Message]s).
+#[derive(Default, Debug, Clone)]
+pub struct AcmRunner(HashMap<String, (Acm, Vec<Message>)>);
+
+impl AcmRunner {
+ /// Constructs a new [AcmRunner] from tagged [Acm]s and [Message]s.
+ pub fn new(acms: Vec<(String, Acm)>, inputs: Vec<(String, Message)>) -> Self {
+ let mut runner = AcmRunner::default();
+ for (name, acm) in acms {
+ runner.register_acm(&name, acm);
+ }
+ for (name, input) in inputs {
+ runner.queue_input(&name, &input);
+ }
+ runner
+ }
+
+ /// Registers an [Acm] as `job_name` in the runner.
+ pub fn register_acm(&mut self, job_name: &str, acm: Acm) -> &mut Self {
+ self.0.insert(job_name.into(), (acm, Vec::new()));
+ self
+ }
+
+ /// Queues a [Message] to be run on an [Acm] registered as `job_name`.
+ pub fn queue_input(&mut self, job_name: &str, input: &Message) -> &mut Self {
+ if let Some((_, inputs)) = self.0.get_mut(job_name) {
+ inputs.push(input.clone());
+ }
+ self
+ }
+
+ /// Runs the queued [Message] inputs on the registered [Acm]s and returns
+ /// the results. The results are returned as an iterator. The iterator must
+ /// be in some way consumed, collected, or iterated over before the runner
+ /// starts running commands, i.e. it is lazy. Once this begins, results are
+ /// computed in parallel over the ACMs. The order of outputs is undefined.
+ pub fn run(self) -> impl ParallelIterator<Item = RunOutput> {
+ self.0
+ .into_par_iter()
+ .filter(|(_, (_, inputs))| !inputs.is_empty())
+ .map(|(job_name, (acm, inputs))| (job_name, acm.run(&inputs)))
+ }
+
+ /// Functionally equivalent to [Self::run] but caches results for 30 minutes
+ /// to make future lookups faster.
+ pub fn run_cached(self) -> impl ParallelIterator<Item = RunOutput> {
+ self.0
+ .into_par_iter()
+ .filter(|(_, (_, inputs))| !inputs.is_empty())
+ .map(|(job_name, (acm, inputs))| (job_name, acm.run_cached(&inputs)))
+ }
+
+ /// Functionally equivalent to [Self::run] but returns manual pages for
+ /// inputs instead of executing them.
+ pub fn manuals(self) -> impl ParallelIterator<Item = ManualOutput> {
+ self.0
+ .into_par_iter()
+ .filter(|(_, (_, inputs))| !inputs.is_empty())
+ .map(|(job_name, (acm, inputs))| (job_name, acm.manual(&inputs)))
+ }
+}
+
+/// Every resulting entry of [AcmRunner::run]
+pub type RunOutput = (String, Result<Vec<Message>>);
+
+/// Every resulting entry of [AcmRunner::manuals]
+pub type ManualOutput = (String, Result<String>);