diff options
Diffstat (limited to 'libangelshark/src')
| -rw-r--r-- | libangelshark/src/acm.rs | 230 | ||||
| -rw-r--r-- | libangelshark/src/lib.rs | 7 | ||||
| -rw-r--r-- | libangelshark/src/message.rs | 194 | ||||
| -rw-r--r-- | libangelshark/src/runner.rs | 77 | 
4 files changed, 508 insertions, 0 deletions
| diff --git a/libangelshark/src/acm.rs b/libangelshark/src/acm.rs new file mode 100644 index 0000000..a9ffae1 --- /dev/null +++ b/libangelshark/src/acm.rs @@ -0,0 +1,230 @@ +use crate::Message; +use anyhow::{anyhow, Context, Result}; +use cached::{proc_macro::cached, Return, TimedCache}; +use ssh2::{KeyboardInteractivePrompt, Prompt, Session, Stream}; +use std::{ +    fmt::Debug, +    io::{BufRead, BufReader, Read, Write}, +    net::{Ipv4Addr, TcpStream}, +}; + +const DEFAULT_PORT: u16 = 5022; +const OSSI_LOGOFF: &[u8] = b"clogoff\nt\ny\n"; +const OSSI_MAN_TERM: &[u8] = b"ossiem\n"; +const OSSI_TERM: &[u8] = b"ossie\n"; +const TERM: &str = "vt100"; +const TERM_DIMS: (u32, u32, u32, u32) = (81, 25, 0, 0); +const TIMEOUT_MS: u32 = 30000; // Thirty second read/write timeout. + +/// Represents a Communication Manager and its login information. Executes +/// collections of [Message]s on an ACM over SSH. +#[derive(Clone)] +pub struct Acm { +    addr: Ipv4Addr, +    port: Option<u16>, +    user: String, +    pass: String, +} + +impl Debug for Acm { +    /// Formats an ACM's configuration information for debugging. Masks passwords. +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +        f.debug_struct("Acm") +            .field("addr", &self.addr) +            .field("port", &self.port.unwrap_or(DEFAULT_PORT)) +            .field("user", &self.user) +            .field("pass", &"********") +            .finish() +    } +} + +impl Default for Acm { +    fn default() -> Self { +        Self { +            addr: Ipv4Addr::new(127, 0, 0, 1), +            port: Default::default(), +            user: Default::default(), +            pass: Default::default(), +        } +    } +} + +impl Acm { +    /// Adds an IPv4 address to ACM config. +    pub fn with_addr(&mut self, addr: Ipv4Addr) -> &mut Self { +        self.addr = addr; +        self +    } + +    /// Adds a port to ACM config. +    pub fn with_port(&mut self, port: u16) -> &mut Self { +        self.port = Some(port); +        self +    } + +    /// Adds a login usermame to ACM config. +    pub fn with_user(&mut self, user: &str) -> &mut Self { +        self.user = user.into(); +        self +    } + +    /// Adds a login password to ACM config. +    pub fn with_pass(&mut self, pass: &str) -> &mut Self { +        self.pass = pass.into(); +        self +    } + +    /// Opens a readable and writable SSH stream into an ACM's Site Administration Terminal. +    fn open_stream(&self, term: &[u8]) -> Result<Stream> { +        // Initialize SSH session. +        let stream = TcpStream::connect((self.addr, self.port.unwrap_or(DEFAULT_PORT))) +            .with_context(|| "Failed to open TCP stream to host. Make sure the config is correct and the host is otherwise reachable.")?; +        let mut session = Session::new().with_context(|| "Failed to start SSH session.")?; +        session.set_tcp_stream(stream); +        session.set_timeout(TIMEOUT_MS); +        session +            .handshake() +            .with_context(|| "SSH handshake failed.")?; +        session +            .userauth_keyboard_interactive(&self.user, &mut SshPrompter::new(&self.pass)) +            .with_context(|| "Interactive SSH keyboard authentication failed.")?; + +        // Open shell on SSH channel. +        let mut channel = session +            .channel_session() +            .with_context(|| "Failed to open SSH channel on SSH session.")?; +        channel +            .request_pty(TERM, None, Some(TERM_DIMS)) +            .with_context(|| "Failed to open PTY on SSH channel.")?; +        channel +            .shell() +            .with_context(|| "Failed to open shell on SSH channel.")?; +        channel +            .write_all(term) +            .with_context(|| "Failed to send OSSI term.")?; + +        // Waits until OSSI terminator is read to return channel. If this times +        // out, something went wrong with login or ACM is unreachable. +        let mut lines = BufReader::new(channel.stream(0)).lines(); +        while let Some(Ok(line)) = lines.next() { +            if line == "t" { +                return Ok(channel.stream(0)); +            } +        } + +        Err(anyhow!("Never reached OSSI term prompt.")) +    } + +    /// Runs a given collection of [Message]s (OSSI commands) on the ACM and returns their resulting output. +    pub fn run(&self, inputs: &[Message]) -> Result<Vec<Message>> { +        let inputs: String = inputs.iter().map(Message::to_string).collect(); +        let mut stream = self.open_stream(OSSI_TERM)?; +        write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?; +        stream +            .write_all(OSSI_LOGOFF) +            .with_context(|| "Failed to write LOGOFF to OSSI stream.")?; +        Message::from_output(stream) +    } + +    /// Like [Self::run], but caches results with a timed cache of thirty minutes. +    pub fn run_cached(&self, inputs: &[Message]) -> Result<Vec<Message>> { +        Ok(run_cached(self, inputs)?.value) +    } + +    /// Like [Self::run], but instead of running [Message]s, it returns the manual pages for the provided OSSI commands. +    pub fn manual(&self, inputs: &[Message]) -> Result<String> { +        let inputs: String = inputs.iter().map(Message::to_string).collect(); +        let mut stream = self.open_stream(OSSI_MAN_TERM)?; +        write!(stream, "{}", inputs).with_context(|| "Failed to write inputs to OSSI stream.")?; +        stream +            .write_all(OSSI_LOGOFF) +            .with_context(|| "Failed to write LOGOFF to OSSI stream.")?; +        let mut output = String::new(); +        stream +            .read_to_string(&mut output) +            .with_context(|| "Failed to read manual pages to string.")?; +        Ok(output) +    } + +    /// Reads from `readable`, parsing lines as `asa.cfg`-formatted ACM logins. +    /// Returns a collection of the parsed logins and their associated +    /// names/labels. The spec looks like this: +    /// +    /// ```text +    /// (name user:pass@addr:optional_port) +    /// ACM01 admin:secret@192.168.1.1:5022 +    /// ACM02 acdmin:secret@192.168.1.2 +    /// ACM03 acdmin:secret@192.168.1.3:5023 +    /// ``` +    /// +    /// The port is optional. If it is not provided, the default SAT port of +    /// 5022 will be used. +    pub fn from_logins(readable: impl Read) -> Result<Vec<(String, Self)>> { +        let mut acms = Vec::new(); + +        for line in BufReader::new(readable).lines() { +            let line = line.with_context(|| "Failed to read line of config.")?; +            let mut acm = Self::default(); + +            if let Some((name, config)) = line.split_once(' ') { +                if let Some((creds, dest)) = config.split_once('@') { +                    if let Some((user, pass)) = creds.split_once(':') { +                        if let Some((addr, port)) = dest.split_once(':') { +                            acm.with_port( +                                port.parse() +                                    .with_context(|| "Failed to parse ACM socket port.")?, +                            ) +                            .with_addr( +                                addr.parse() +                                    .with_context(|| "Failed to parse ACM IP address.")?, +                            ); +                        } else { +                            acm.with_addr( +                                dest.parse() +                                    .with_context(|| "Failed to parse ACM IP address.")?, +                            ); +                        } + +                        acm.with_user(user).with_pass(pass); +                        acms.push((name.into(), acm)); +                    } +                } +            } +        } +        Ok(acms) +    } +} + +/// This memoized function is a timed cache where entries are evicted after +/// thirty minutes. Errors are not cached, only successes. +#[cached( +    result = true, +    type = "TimedCache<String, Return<Vec<Message>>>", +    create = "{ TimedCache::with_lifespan(1800) }", +    convert = r#"{ format!("{}{:?}{:?}", acm.addr, acm.port, inputs) }"# +)] +fn run_cached(acm: &Acm, inputs: &[Message]) -> Result<Return<Vec<Message>>> { +    Ok(Return::new(acm.run(inputs)?)) +} + +/// Used internally for password-based SSH authentication. +struct SshPrompter<'a> { +    pass: &'a str, +} + +impl<'a> SshPrompter<'a> { +    fn new(pass: &'a str) -> Self { +        Self { pass } +    } +} + +impl<'a> KeyboardInteractivePrompt for SshPrompter<'a> { +    fn prompt<'b>( +        &mut self, +        _username: &str, +        _instructions: &str, +        _prompts: &[Prompt<'b>], +    ) -> Vec<String> { +        vec![self.pass.to_owned()] +    } +} diff --git a/libangelshark/src/lib.rs b/libangelshark/src/lib.rs new file mode 100644 index 0000000..4afe31a --- /dev/null +++ b/libangelshark/src/lib.rs @@ -0,0 +1,7 @@ +mod acm; +mod message; +mod runner; + +pub use acm::*; +pub use message::*; +pub use runner::*; diff --git a/libangelshark/src/message.rs b/libangelshark/src/message.rs new file mode 100644 index 0000000..0dac534 --- /dev/null +++ b/libangelshark/src/message.rs @@ -0,0 +1,194 @@ +use anyhow::{Context, Result}; +use std::{ +    fmt::{Display, Formatter, Result as FmtResult}, +    io::{BufRead, BufReader, Read}, +}; + +/// OSSI Messaging Delimiters +const ACM_D: &str = "a"; +const COMMAND_D: &str = "c"; +const DATA_D: &str = "d"; +const ERROR_D: &str = "e"; +const FIELD_D: &str = "f"; +const NEW_DATA_D: &str = "n"; +const TERMINATOR_D: &str = "t"; +const TAB: &str = "\t"; + +/// An OSSI protocol message. Used for input and output to and from the ACM. The +/// OSSI protocol is proprietary, and little documented. Here is a brief +/// overview. Every message consists of a single command and a +/// terminator used to separate messages. The message may also carry optional +/// tab-separated fields and datas when used for input. When used for output, it may +/// optionally carry an error. Example: +/// +/// ```text +/// (input) +/// clist object +/// t +/// (output) +/// clist object +/// fhex_code\thex_code\thex_code +/// dentry\tentry\tentry +/// n +/// dentry\tentry\tentry +/// t +/// ``` +#[derive(Debug, Default, Clone)] +pub struct Message { +    pub command: String, +    pub fields: Option<Vec<String>>, +    pub datas: Option<Vec<Vec<String>>>, +    pub error: Option<String>, +} + +impl Message { +    /// Creates a new [Message] from its basic part: the command. +    pub fn new(command: &str) -> Self { +        Self { +            command: command.into(), +            ..Default::default() +        } +    } + +    fn add_fields(&mut self, fields: Vec<String>) -> &mut Self { +        if !fields.is_empty() { +            if let Some(ref mut existing) = self.fields { +                existing.extend(fields.into_iter()); +            } else { +                self.fields = Some(fields); +            } +        } + +        self +    } + +    fn add_data_entry(&mut self, data: Vec<String>) -> &mut Self { +        if !data.is_empty() { +            if let Some(ref mut existing) = self.datas { +                existing.push(data); +            } else { +                self.datas = Some(vec![data]); +            } +        } + +        self +    } + +    /// Reads from `readable`, parsing lines as Angelshark-formatted OSSI input. +    /// This closely follows the OSSI spec, but also parses ACM names/labels on +    /// lines beginning with `'a'`. The spec looks like this: +    /// +    /// ```text +    /// aACM01 +    /// clist object +    /// fhex_code\thex_code\thex_code +    /// dentry\tentry\tentry +    /// t +    /// ... +    /// ``` +    pub fn from_input(readable: impl Read) -> Result<Vec<(String, Self)>> { +        let mut data = Vec::new(); +        let mut input = Self::default(); +        let mut inputs = Vec::new(); +        let mut names: Vec<String> = Vec::new(); + +        for line in BufReader::new(readable).lines() { +            let line = line.with_context(|| "Failed to read line of input.")?; +            let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..)); + +            match (delim, content) { +                (ACM_D, Some(a)) => { +                    names.extend(a.split(TAB).map(String::from)); +                } +                (COMMAND_D, Some(c)) => { +                    input.command = c.into(); +                } +                (FIELD_D, Some(f)) => { +                    input.add_fields(f.split(TAB).map(String::from).collect()); +                } +                (DATA_D, Some(d)) => { +                    data.extend(d.split(TAB).map(String::from)); +                } +                (TERMINATOR_D, _) => { +                    input.add_data_entry(data); +                    inputs.extend(names.into_iter().map(|n| (n, input.clone()))); +                    names = Vec::new(); +                    input = Message::default(); +                    data = Vec::new(); +                } +                _ => { +                    // Skip blank lines and unknown identifiers. +                } +            } +        } + +        Ok(inputs) +    } + +    /// Reads `readable` and parses lines as ACM OSSI output. This should exactly follow the OSSI spec. +    pub fn from_output(readable: impl Read) -> Result<Vec<Self>> { +        let mut data = Vec::new(); +        let mut output = Self::default(); +        let mut outputs = Vec::new(); + +        for line in BufReader::new(readable).lines() { +            let line = line.with_context(|| "Failed to read line of output.")?; +            let (delim, content) = (line.get(0..1).unwrap_or_default(), line.get(1..)); + +            match (delim, content) { +                (COMMAND_D, Some(c)) => { +                    output.command = c.into(); +                } +                (ERROR_D, Some(e)) => { +                    output.error = Some(e.into()); +                } +                (FIELD_D, Some(f)) => { +                    output.add_fields(f.split(TAB).map(String::from).collect()); +                } +                (DATA_D, Some(d)) => { +                    data.extend(d.split(TAB).map(String::from)); +                } +                (NEW_DATA_D, _) => { +                    output.add_data_entry(data); +                    data = Vec::new(); +                } +                (TERMINATOR_D, _) => { +                    output.add_data_entry(data); +                    data = Vec::new(); +                    outputs.push(output); +                    output = Self::default(); +                } +                _ => { +                    // Ignore unknown identifiers and blank lines. +                } +            } +        } + +        Ok(outputs) +    } +} + +impl Display for Message { +    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { +        let mut message = format!("c{}\n", self.command); + +        if let Some(error) = &self.error { +            message = message + &format!("e{}\n", error); +        } + +        if let Some(fields) = &self.fields { +            message = message + &format!("f{}\n", fields.join(TAB)); +        } + +        if let Some(datas) = &self.datas { +            message = message +                + &datas +                    .iter() +                    .map(|d| format!("d{}\n", d.join(TAB))) +                    .collect::<Vec<String>>() +                    .join("n\n"); +        } + +        writeln!(f, "{}\nt", message) +    } +} diff --git a/libangelshark/src/runner.rs b/libangelshark/src/runner.rs new file mode 100644 index 0000000..6d41bd2 --- /dev/null +++ b/libangelshark/src/runner.rs @@ -0,0 +1,77 @@ +use crate::{Acm, Message}; +use anyhow::Result; +use rayon::iter::IntoParallelIterator; +pub use rayon::iter::ParallelIterator; +use std::collections::HashMap; + +/// Allows for more convenient running of OSSI [Message]s on one or more [Acm]s, +/// parallelizing over the ACMs and (optionally) caching results for faster future runs. +/// +/// This is the intended high-level use of Angelshark. It holds a collection of +/// "jobs", which are tagged with ACM names/labels and their associated logins ([Acm]s) and [Message]s). +#[derive(Default, Debug, Clone)] +pub struct AcmRunner(HashMap<String, (Acm, Vec<Message>)>); + +impl AcmRunner { +    /// Constructs a new [AcmRunner] from tagged [Acm]s and [Message]s. +    pub fn new(acms: Vec<(String, Acm)>, inputs: Vec<(String, Message)>) -> Self { +        let mut runner = AcmRunner::default(); +        for (name, acm) in acms { +            runner.register_acm(&name, acm); +        } +        for (name, input) in inputs { +            runner.queue_input(&name, &input); +        } +        runner +    } + +    /// Registers an [Acm] as `job_name` in the runner. +    pub fn register_acm(&mut self, job_name: &str, acm: Acm) -> &mut Self { +        self.0.insert(job_name.into(), (acm, Vec::new())); +        self +    } + +    /// Queues a [Message] to be run on an [Acm] registered as `job_name`. +    pub fn queue_input(&mut self, job_name: &str, input: &Message) -> &mut Self { +        if let Some((_, inputs)) = self.0.get_mut(job_name) { +            inputs.push(input.clone()); +        } +        self +    } + +    /// Runs the queued [Message] inputs on the registered [Acm]s and returns +    /// the results. The results are returned as an iterator. The iterator must +    /// be in some way consumed, collected, or iterated over before the runner +    /// starts running commands, i.e. it is lazy. Once this begins, results are +    /// computed in parallel over the ACMs. The order of outputs is undefined. +    pub fn run(self) -> impl ParallelIterator<Item = RunOutput> { +        self.0 +            .into_par_iter() +            .filter(|(_, (_, inputs))| !inputs.is_empty()) +            .map(|(job_name, (acm, inputs))| (job_name, acm.run(&inputs))) +    } + +    /// Functionally equivalent to [Self::run] but caches results for 30 minutes +    /// to make future lookups faster. +    pub fn run_cached(self) -> impl ParallelIterator<Item = RunOutput> { +        self.0 +            .into_par_iter() +            .filter(|(_, (_, inputs))| !inputs.is_empty()) +            .map(|(job_name, (acm, inputs))| (job_name, acm.run_cached(&inputs))) +    } + +    /// Functionally equivalent to [Self::run] but returns manual pages for +    /// inputs instead of executing them. +    pub fn manuals(self) -> impl ParallelIterator<Item = ManualOutput> { +        self.0 +            .into_par_iter() +            .filter(|(_, (_, inputs))| !inputs.is_empty()) +            .map(|(job_name, (acm, inputs))| (job_name, acm.manual(&inputs))) +    } +} + +/// Every resulting entry of [AcmRunner::run] +pub type RunOutput = (String, Result<Vec<Message>>); + +/// Every resulting entry of [AcmRunner::manuals] +pub type ManualOutput = (String, Result<String>); |