From cd9f750990166dfa9b9fc0276749be307e26d008 Mon Sep 17 00:00:00 2001 From: "Carpenter, Adam (CORP)" Date: Tue, 30 Nov 2021 15:22:37 -0500 Subject: refactor: module structure and comments --- angelsharkd/src/routes/extensions/mod.rs | 9 +- angelsharkd/src/routes/extensions/simple_search.rs | 197 --------------------- .../src/routes/extensions/simple_search/README.md | 3 + .../src/routes/extensions/simple_search/mod.rs | 66 +++++++ .../src/routes/extensions/simple_search/types.rs | 166 +++++++++++++++++ 5 files changed, 242 insertions(+), 199 deletions(-) delete mode 100644 angelsharkd/src/routes/extensions/simple_search.rs create mode 100644 angelsharkd/src/routes/extensions/simple_search/README.md create mode 100644 angelsharkd/src/routes/extensions/simple_search/mod.rs create mode 100644 angelsharkd/src/routes/extensions/simple_search/types.rs (limited to 'angelsharkd/src/routes/extensions') diff --git a/angelsharkd/src/routes/extensions/mod.rs b/angelsharkd/src/routes/extensions/mod.rs index adcea3f..7f217d6 100644 --- a/angelsharkd/src/routes/extensions/mod.rs +++ b/angelsharkd/src/routes/extensions/mod.rs @@ -6,18 +6,22 @@ mod simple_deprov; #[cfg(feature = "simple_search")] mod simple_search; +/// The extension filter; consists of all compiled optional Angelshark extension +/// filters combined under `/extensions`. pub fn filter(config: &Config) -> impl Filter + Clone { // Note: this next line deals with the common edge case of having no // extensions loaded with feature flags. It ensures that the the type // checking is right when the return `.and()` is called below. let filters = default().or(default()); + // Block to enable simple_search extension feature. Instantiates a + // searchable haystack and configures filters to handle search requests. #[cfg(feature = "simple_search")] let haystack = simple_search::Haystack::new(config.runner.clone()); #[cfg(feature = "simple_search")] let filters = filters - .or(simple_search::search(haystack.clone())) - .or(simple_search::refresh(haystack)); + .or(simple_search::search_filter(haystack.clone())) + .or(simple_search::refresh_filter(haystack)); #[cfg(feature = "simple_deprov")] let filters = filters.or(simple_deprov::filter()); @@ -25,6 +29,7 @@ pub fn filter(config: &Config) -> impl Filter impl Filter + Clone { warp::path::end().map(|| "Angelshark extension route index. Enable extensions with feature switches and access them at `/extensions/`.") } diff --git a/angelsharkd/src/routes/extensions/simple_search.rs b/angelsharkd/src/routes/extensions/simple_search.rs deleted file mode 100644 index b728c1e..0000000 --- a/angelsharkd/src/routes/extensions/simple_search.rs +++ /dev/null @@ -1,197 +0,0 @@ -use anyhow::{anyhow, Context, Error}; -use libangelshark::{AcmRunner, Message, ParallelIterator}; -use log::{error, info}; -use std::{ - collections::HashMap, - convert::Infallible, - sync::{Arc, Mutex}, -}; -use warp::{ - body::{content_length_limit, json}, - get, - hyper::{header, StatusCode}, - path, post, - reply::{self, with}, - Filter, Rejection, Reply, -}; - -/// Collection of search terms -type Needles = Vec; - -/// Collection of ACM extension types with ROOMs (if applicable) and ACM names -type HaystackEntries = Vec>; - -pub fn search(haystack: Haystack) -> impl Filter + Clone { - path("search") - .and(post()) - .and(content_length_limit(1024 * 16)) - .and(json()) - .and_then(move |terms: Needles| handle_search(haystack.to_owned(), terms)) - .with(with::header(header::PRAGMA, "no-cache")) - .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) - .with(with::header(header::X_FRAME_OPTIONS, "DENY")) -} - -pub fn refresh(haystack: Haystack) -> impl Filter + Clone { - path!("search" / "refresh") - .and(get()) - .and_then(move || handle_refresh(haystack.to_owned())) -} - -async fn handle_search(haystack: Haystack, needle: Needles) -> Result { - // Ok(haystack.search(Vec::new())?) - // if let Ok(matches = haystack.search(needle); - match haystack.search(needle) { - Ok(matches) => Ok(reply::with_status(reply::json(&matches), StatusCode::OK)), - Err(e) => Ok(reply::with_status( - reply::json(&e.to_string()), - StatusCode::INTERNAL_SERVER_ERROR, - )), - } -} - -async fn handle_refresh(haystack: Haystack) -> Result { - // Run refresh as a background task and immediately return. - tokio::spawn(async move { - if let Err(e) = haystack.refresh() { - error!("{}", e.to_string()); // TODO: use logger - } else { - info!("Search haystack refreshed."); - } - }); - - Ok("Refresh scheduled") -} - -/// A lazy-loaded, asynchronously-refreshed exension-type haystack cache. -#[derive(Clone)] -pub struct Haystack { - entries: Arc>, - runner: AcmRunner, -} - -impl Haystack { - pub fn new(runner: AcmRunner) -> Self { - Self { - entries: Arc::new(Mutex::new(Vec::new())), - runner, - } - } - - pub fn search(&self, needles: Needles) -> Result { - let needles: Vec<_> = needles.iter().map(|n| n.to_lowercase()).collect(); - - let entries = self - .entries - .lock() - .map_err(|e| anyhow!(e.to_string())) - .with_context(|| "Failed to get haystack inner data lock.")?; - - let matches = entries - .iter() - .filter(|entry| { - let entry_str = entry.join("").to_lowercase(); - needles - .iter() - .all(|needle| entry_str.contains(needle.as_str())) - }) - .cloned() - .collect(); - Ok(matches) - } - - pub fn refresh(&self) -> Result<(), Error> { - let mut runner = self.runner.to_owned(); // TODO: This alone allows multiple refreshers to be run at once. Do we want that? - - // Queue jobs in ACM runner - for acm in &[ - "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", - ] { - runner - .queue_input(acm, &Message::new("list extension-type")) - .queue_input( - acm, - &Message { - command: String::from("list station"), - fields: Some(vec![String::from("8005ff00"), String::from("0031ff00")]), - datas: None, - error: None, - }, - ); - } - - // Run jobs and collect output. Filter out uneeded commands, combine errors. - let output: Result)>, Error> = runner - .run() - .map(|(name, output)| { - let output: Vec = output? - .into_iter() - .filter(|m| m.command != "logoff") - .collect(); - Ok((name, output)) - }) - .collect(); - let output = output.with_context(|| "Failed to run refresh commands on ACM(s).")?; - - // Log any ACM errors encountered - for error in output - .iter() - .map(|(_, messages)| messages) - .flatten() - .filter_map(|m| m.error.as_ref()) - { - error!("ACM error: {}", error); - } - - // Build a map of station number-to-rooms - let rooms: HashMap = output - .iter() - .map(|(_, messages)| { - messages - .iter() - .filter(|message| message.command == "list station") - .filter_map(|message| message.datas.to_owned()) - .flatten() - .filter_map(|stat| Some((stat.get(0)?.to_owned(), stat.get(1)?.to_owned()))) - }) - .flatten() - .collect(); - - // Build the haystack from the room map and extension-type output - let haystack: HaystackEntries = output - .into_iter() - .map(|(acm_name, messages)| { - let rooms = &rooms; - let acm_name = format!("CM{}", acm_name); - - messages - .into_iter() - .filter(|message| message.command == "list extension-type") - .filter_map(|message| message.datas) - .flatten() - .map(move |mut extension| { - let room = extension - .get(0) - .map(|num| rooms.get(num)) - .flatten() - .map(|room| room.to_owned()) - .unwrap_or_else(String::new); - - extension.push(acm_name.to_owned()); - extension.push(room); - extension - }) - }) - .flatten() - .collect(); - - // Propogate the new data to the shared haystack entries - let mut lock = self - .entries - .lock() - .map_err(|e| anyhow!(e.to_string())) - .with_context(|| "Failed to get haystack inner data lock.")?; - *lock = haystack; - Ok(()) - } -} diff --git a/angelsharkd/src/routes/extensions/simple_search/README.md b/angelsharkd/src/routes/extensions/simple_search/README.md new file mode 100644 index 0000000..527f686 --- /dev/null +++ b/angelsharkd/src/routes/extensions/simple_search/README.md @@ -0,0 +1,3 @@ +# Angelshark Simple Extension Search + +This extension implements very simple extension searching for end users. It allows for multi-keyword searches on a pre-downloaded collection of CM extension-type and station data. This data can be refreshed on-demand or periodically. Searches always return immediately, even when no data has been downloaded yet. diff --git a/angelsharkd/src/routes/extensions/simple_search/mod.rs b/angelsharkd/src/routes/extensions/simple_search/mod.rs new file mode 100644 index 0000000..5b5dc8e --- /dev/null +++ b/angelsharkd/src/routes/extensions/simple_search/mod.rs @@ -0,0 +1,66 @@ +use log::{error, info}; +use std::convert::Infallible; +pub use types::Haystack; +use types::*; +use warp::{ + body::{content_length_limit, json}, + get, + hyper::{header, StatusCode}, + path, post, + reply::{self, with}, + Filter, Rejection, Reply, +}; + +mod types; + +/// Returns a warp filter to handle HTTP POSTs for searching the haystack. +pub fn search_filter( + haystack: Haystack, +) -> impl Filter + Clone { + path("search") + .and(post()) + .and(content_length_limit(1024 * 16)) + .and(json()) + .and_then(move |terms: Needles| search(haystack.to_owned(), terms)) + .with(with::header(header::PRAGMA, "no-cache")) + .with(with::header(header::CACHE_CONTROL, "no-store, max-age=0")) + .with(with::header(header::X_FRAME_OPTIONS, "DENY")) +} + +/// Returns a warp filter to handle HTTP GETs for refreshing the haystack. +pub fn refresh_filter( + haystack: Haystack, +) -> impl Filter + Clone { + path!("search" / "refresh") + .and(get()) + .and_then(move || refresh(haystack.to_owned())) +} + +/// Runs the search request to find all needles in the haystack and converts the +/// results into a reply. +async fn search(haystack: Haystack, needles: Needles) -> Result { + // Ok(haystack.search(Vec::new())?) + // if let Ok(matches = haystack.search(needle); + match haystack.search(needles) { + Ok(matches) => Ok(reply::with_status(reply::json(&matches), StatusCode::OK)), + Err(e) => Ok(reply::with_status( + reply::json(&e.to_string()), + StatusCode::INTERNAL_SERVER_ERROR, + )), + } +} + +/// Immediately returns. Spawns an asynchronous task to complete the haystack +/// refresh in the background. +async fn refresh(haystack: Haystack) -> Result { + // Run refresh as a background task and immediately return. + tokio::spawn(async move { + if let Err(e) = haystack.refresh() { + error!("{}", e.to_string()); // TODO: use logger + } else { + info!("Search haystack refreshed."); + } + }); + + Ok("Refresh scheduled") +} diff --git a/angelsharkd/src/routes/extensions/simple_search/types.rs b/angelsharkd/src/routes/extensions/simple_search/types.rs new file mode 100644 index 0000000..184b1a6 --- /dev/null +++ b/angelsharkd/src/routes/extensions/simple_search/types.rs @@ -0,0 +1,166 @@ +use anyhow::{anyhow, Context, Error}; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::error; +use std::{ + collections::HashMap, + env, + sync::{Arc, Mutex}, +}; + +const ANGELSHARKD_EXT_SEARCH_ACMS: &str = "ANGELSHARKD_EXT_SEARCH_ACMS"; +const OSSI_STAT_NUMBER_FIELD: &str = "8005ff00"; +const OSSI_STAT_ROOM_FIELD: &str = "0031ff00"; +const OSSI_LIST_STAT_CMD: &str = "list station"; +const OSSI_LIST_EXT_CMD: &str = "list extension-type"; + +/// Collection of search terms +pub type Needles = Vec; + +/// Collection of ACM extension types with ROOMs (if applicable) and ACM names +type HaystackEntries = Vec>; + +/// Represents a searchable, refreshable collection of ACM extension data. +#[derive(Clone)] +pub struct Haystack { + entries: Arc>, + runner: AcmRunner, +} + +impl Haystack { + pub fn new(runner: AcmRunner) -> Self { + Self { + entries: Arc::new(Mutex::new(Vec::new())), + runner, + } + } + + /// Searches for haystack entries that contain all given needles and returns them. + pub fn search(&self, needles: Needles) -> Result { + let needles: Vec<_> = needles.iter().map(|n| n.to_lowercase()).collect(); + + let entries = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + + let matches = entries + .iter() + .filter(|entry| { + let entry_str = entry.join("").to_lowercase(); + needles + .iter() + .all(|needle| entry_str.contains(needle.as_str())) + }) + .cloned() + .collect(); + Ok(matches) + } + + /// Refreshes the haystack data by running relevant commands on a runner, + /// parsing the results, and updating the entries field with the fresh data. + /// TODO: Do we want simultaneous refreshes to be possible? + pub fn refresh(&self) -> Result<(), Error> { + let mut runner = self.runner.to_owned(); + + // Queue jobs in ACM runner + let configured_acms = env::var(ANGELSHARKD_EXT_SEARCH_ACMS).with_context(|| { + format!( + "{} var missing. Cannot refresh haystack.", + ANGELSHARKD_EXT_SEARCH_ACMS + ) + })?; + + // Generate jobs and queue on runner. + for acm in configured_acms.split_whitespace() { + runner + .queue_input(acm, &Message::new(OSSI_LIST_EXT_CMD)) + .queue_input( + acm, + &Message { + command: String::from(OSSI_LIST_STAT_CMD), + fields: Some(vec![ + String::from(OSSI_STAT_NUMBER_FIELD), + String::from(OSSI_STAT_ROOM_FIELD), + ]), + datas: None, + error: None, + }, + ); + } + + // Run jobs and collect output. Filter out uneeded commands, combine errors. + let output: Result)>, Error> = runner + .run() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter(|m| m.command != "logoff") + .collect(); + Ok((name, output)) + }) + .collect(); + let output = output.with_context(|| "Failed to run refresh commands on ACM(s).")?; + + // Log any ACM errors encountered + for error in output + .iter() + .map(|(_, messages)| messages) + .flatten() + .filter_map(|m| m.error.as_ref()) + { + error!("ACM error: {}", error); + } + + // Build a map of station number-to-rooms + let rooms: HashMap = output + .iter() + .map(|(_, messages)| { + messages + .iter() + .filter(|message| message.command == OSSI_LIST_STAT_CMD) + .filter_map(|message| message.datas.to_owned()) + .flatten() + .filter_map(|stat| Some((stat.get(0)?.to_owned(), stat.get(1)?.to_owned()))) + }) + .flatten() + .collect(); + + // Build the haystack from the room map and extension-type output + let haystack: HaystackEntries = output + .into_iter() + .map(|(acm_name, messages)| { + let rooms = &rooms; + let acm_name = format!("CM{}", acm_name); + + messages + .into_iter() + .filter(|message| message.command == OSSI_LIST_EXT_CMD) + .filter_map(|message| message.datas) + .flatten() + .map(move |mut extension| { + let room = extension + .get(0) + .map(|num| rooms.get(num)) + .flatten() + .map(|room| room.to_owned()) + .unwrap_or_else(String::new); + + extension.push(acm_name.to_owned()); + extension.push(room); + extension + }) + }) + .flatten() + .collect(); + + // Overwrite shared haystack entries with new data. + let mut lock = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + *lock = haystack; + Ok(()) + } +} -- cgit v1.2.3