From 0edcb7438bac442bb24a55f25974ca225e4d2e97 Mon Sep 17 00:00:00 2001 From: "Carpenter, Adam (CORP)" Date: Tue, 30 Nov 2021 12:02:38 -0500 Subject: feat: impl searching extension, tested, working --- angelsharkd/src/routes/extensions/mod.rs | 8 +- angelsharkd/src/routes/extensions/simple_search.rs | 182 +++++++++++++++++---- 2 files changed, 153 insertions(+), 37 deletions(-) (limited to 'angelsharkd/src/routes/extensions') diff --git a/angelsharkd/src/routes/extensions/mod.rs b/angelsharkd/src/routes/extensions/mod.rs index b1c5aa3..adcea3f 100644 --- a/angelsharkd/src/routes/extensions/mod.rs +++ b/angelsharkd/src/routes/extensions/mod.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use crate::config::Config; use warp::{path, Filter, Rejection, Reply}; @@ -15,13 +13,11 @@ pub fn filter(config: &Config) -> impl Filter; +type Needles = Vec; + +/// Collection of ACM extension types with ROOMs (if applicable) and ACM names +type HaystackEntries = Vec>; pub fn search(haystack: Haystack) -> impl Filter + Clone { // TODO: discourage caching response thru headers @@ -18,57 +26,169 @@ pub fn search(haystack: Haystack) -> impl Filter impl Filter + Clone { +pub fn refresh(haystack: Haystack) -> impl Filter + Clone { path!("search" / "refresh") .and(get()) - .and_then(move || handle_refresh(haystack.clone(), runner.clone())) + .and_then(move || handle_refresh(haystack.to_owned())) } -async fn handle_search(haystack: Haystack, needle: Needle) -> Result { - Ok(haystack.search(Vec::new())) +async fn handle_search(haystack: Haystack, needle: Needles) -> Result { + // Ok(haystack.search(Vec::new())?) + // if let Ok(matches = haystack.search(needle); + match haystack.search(needle) { + Ok(matches) => Ok(reply::with_status(reply::json(&matches), StatusCode::OK)), + Err(e) => Ok(reply::with_status( + reply::json(&e.to_string()), + StatusCode::INTERNAL_SERVER_ERROR, + )), + } } -async fn handle_refresh(haystack: Haystack, runner: AcmRunner) -> Result { - haystack.refresh(); +async fn handle_refresh(haystack: Haystack) -> Result { + // Run refresh as a background task and immediately return. + tokio::spawn(async move { + if let Err(e) = haystack.refresh() { + error!("{}", e.to_string()); // TODO: use logger + } else { + info!("Search haystack refreshed."); + } + }); + Ok("Refresh scheduled") } /// A lazy-loaded, asynchronously-refreshed exension-type haystack cache. #[derive(Clone)] pub struct Haystack { - inner: Arc>, + entries: Arc>, + runner: AcmRunner, } impl Haystack { - pub fn new() -> Self { + pub fn new(runner: AcmRunner) -> Self { Self { - inner: Arc::new(Mutex::new(String::with_capacity(0))), + entries: Arc::new(Mutex::new(Vec::new())), + runner, } } - pub fn search(&self, needle: Needle) -> String { - if let Ok(matches) = self.inner.lock() { - matches.clone() - } else { - String::from("stale") - } + pub fn search(&self, needles: Needles) -> Result { + let needles: Vec<_> = needles.iter().map(|n| n.to_lowercase()).collect(); + + let entries = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + + let matches = entries + .iter() + .filter(|entry| { + let entry_str = entry.join("").to_lowercase(); + needles + .iter() + .all(|needle| entry_str.contains(needle.as_str())) + }) + .cloned() + .collect(); + Ok(matches) } - pub fn refresh(&self) { - let inner = self.inner.clone(); - tokio::spawn(async move { - std::thread::sleep_ms(10000); // slow generation here + pub fn refresh(&self) -> Result<(), Error> { + let mut runner = self.runner.to_owned(); // TODO: This alone allows multiple refreshers to be run at once. Do we want that? + + // Queue jobs in ACM runner + for acm in &[ + "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", + ] { + runner + .queue_input(acm, &Message::new("list extension-type")) + .queue_input( + acm, + &Message { + command: String::from("list station"), + fields: Some(vec![String::from("8005ff00"), String::from("0031ff00")]), + datas: None, + error: None, + }, + ); + } + + // Run jobs and collect output. Filter out uneeded commands, combine errors. + let output: Result)>, Error> = runner + .run() + .map(|(name, output)| { + let output: Vec = output? + .into_iter() + .filter(|m| m.command != "logoff") + .collect(); + Ok((name, output)) + }) + .collect(); + let output = output.with_context(|| "Failed to run refresh commands on ACM(s).")?; + + // Log any ACM errors encountered + for error in output + .iter() + .map(|(_, messages)| messages) + .flatten() + .filter_map(|m| m.error.as_ref()) + { + error!("ACM error: {}", error); + } + + // Build a map of station number-to-rooms + let rooms: HashMap = output + .iter() + .map(|(_, messages)| { + messages + .iter() + .filter(|message| message.command == "list station") + .filter_map(|message| message.datas.to_owned()) + .flatten() + .filter_map(|stat| Some((stat.get(0)?.to_owned(), stat.get(1)?.to_owned()))) + }) + .flatten() + .collect(); + + // Build the haystack from the room map and extension-type output + let haystack: HaystackEntries = output + .into_iter() + .map(|(acm_name, messages)| { + let rooms = &rooms; + let acm_name = format!("CM{}", acm_name); + + messages + .into_iter() + .filter(|message| message.command == "list extension-type") + .filter_map(|message| message.datas) + .flatten() + .map(move |mut extension| { + let room = extension + .get(0) + .map(|num| rooms.get(num)) + .flatten() + .map(|room| room.to_owned()) + .unwrap_or_else(String::new); + + extension.push(acm_name.to_owned()); + extension.push(room); + extension + }) + }) + .flatten() + .collect(); - if let Ok(mut handle) = inner.lock() { - *handle = String::from("fresh"); - eprintln!("evicted"); - } - }); + // Propogate the new data to the shared haystack entries + let mut lock = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + *lock = haystack; + Ok(()) } } -- cgit v1.2.3