diff options
Diffstat (limited to 'angelsharkd/src')
-rw-r--r-- | angelsharkd/src/routes/extensions/mod.rs | 8 | ||||
-rw-r--r-- | angelsharkd/src/routes/extensions/simple_search.rs | 182 |
2 files changed, 153 insertions, 37 deletions
diff --git a/angelsharkd/src/routes/extensions/mod.rs b/angelsharkd/src/routes/extensions/mod.rs index b1c5aa3..adcea3f 100644 --- a/angelsharkd/src/routes/extensions/mod.rs +++ b/angelsharkd/src/routes/extensions/mod.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use crate::config::Config; use warp::{path, Filter, Rejection, Reply}; @@ -15,13 +13,11 @@ pub fn filter(config: &Config) -> impl Filter<Extract = impl Reply, Error = Reje let filters = default().or(default()); #[cfg(feature = "simple_search")] - let runner = config.runner.clone(); - #[cfg(feature = "simple_search")] - let haystack = simple_search::Haystack::new(); + let haystack = simple_search::Haystack::new(config.runner.clone()); #[cfg(feature = "simple_search")] let filters = filters .or(simple_search::search(haystack.clone())) - .or(simple_search::refresh(runner, haystack)); + .or(simple_search::refresh(haystack)); #[cfg(feature = "simple_deprov")] let filters = filters.or(simple_deprov::filter()); diff --git a/angelsharkd/src/routes/extensions/simple_search.rs b/angelsharkd/src/routes/extensions/simple_search.rs index 7b547d3..963cb69 100644 --- a/angelsharkd/src/routes/extensions/simple_search.rs +++ b/angelsharkd/src/routes/extensions/simple_search.rs @@ -1,15 +1,23 @@ -use libangelshark::AcmRunner; +use anyhow::{anyhow, Context, Error}; +use libangelshark::{AcmRunner, Message, ParallelIterator}; +use log::{error, info}; use std::{ + collections::HashMap, convert::Infallible, sync::{Arc, Mutex}, }; use warp::{ body::{content_length_limit, json}, - get, path, post, Filter, Rejection, Reply, + get, + hyper::StatusCode, + path, post, reply, Filter, Rejection, Reply, }; /// Collection of search terms -type Needle = Vec<String>; +type Needles = Vec<String>; + +/// Collection of ACM extension types with ROOMs (if applicable) and ACM names +type HaystackEntries = Vec<Vec<String>>; pub fn search(haystack: Haystack) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { // TODO: discourage caching response thru headers @@ -18,57 +26,169 @@ pub fn search(haystack: Haystack) -> impl Filter<Extract = impl Reply, Error = R .and(post()) .and(content_length_limit(1024 * 16)) .and(json()) - .and_then(move |terms: Needle| handle_search(haystack.clone(), terms)) + .and_then(move |terms: Needles| handle_search(haystack.to_owned(), terms)) } -pub fn refresh( - runner: AcmRunner, - haystack: Haystack, -) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { +pub fn refresh(haystack: Haystack) -> impl Filter<Extract = impl Reply, Error = Rejection> + Clone { path!("search" / "refresh") .and(get()) - .and_then(move || handle_refresh(haystack.clone(), runner.clone())) + .and_then(move || handle_refresh(haystack.to_owned())) } -async fn handle_search(haystack: Haystack, needle: Needle) -> Result<impl Reply, Infallible> { - Ok(haystack.search(Vec::new())) +async fn handle_search(haystack: Haystack, needle: Needles) -> Result<impl Reply, Infallible> { + // Ok(haystack.search(Vec::new())?) + // if let Ok(matches = haystack.search(needle); + match haystack.search(needle) { + Ok(matches) => Ok(reply::with_status(reply::json(&matches), StatusCode::OK)), + Err(e) => Ok(reply::with_status( + reply::json(&e.to_string()), + StatusCode::INTERNAL_SERVER_ERROR, + )), + } } -async fn handle_refresh(haystack: Haystack, runner: AcmRunner) -> Result<impl Reply, Infallible> { - haystack.refresh(); +async fn handle_refresh(haystack: Haystack) -> Result<impl Reply, Infallible> { + // Run refresh as a background task and immediately return. + tokio::spawn(async move { + if let Err(e) = haystack.refresh() { + error!("{}", e.to_string()); // TODO: use logger + } else { + info!("Search haystack refreshed."); + } + }); + Ok("Refresh scheduled") } /// A lazy-loaded, asynchronously-refreshed exension-type haystack cache. #[derive(Clone)] pub struct Haystack { - inner: Arc<Mutex<String>>, + entries: Arc<Mutex<HaystackEntries>>, + runner: AcmRunner, } impl Haystack { - pub fn new() -> Self { + pub fn new(runner: AcmRunner) -> Self { Self { - inner: Arc::new(Mutex::new(String::with_capacity(0))), + entries: Arc::new(Mutex::new(Vec::new())), + runner, } } - pub fn search(&self, needle: Needle) -> String { - if let Ok(matches) = self.inner.lock() { - matches.clone() - } else { - String::from("stale") - } + pub fn search(&self, needles: Needles) -> Result<HaystackEntries, Error> { + let needles: Vec<_> = needles.iter().map(|n| n.to_lowercase()).collect(); + + let entries = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + + let matches = entries + .iter() + .filter(|entry| { + let entry_str = entry.join("").to_lowercase(); + needles + .iter() + .all(|needle| entry_str.contains(needle.as_str())) + }) + .cloned() + .collect(); + Ok(matches) } - pub fn refresh(&self) { - let inner = self.inner.clone(); - tokio::spawn(async move { - std::thread::sleep_ms(10000); // slow generation here + pub fn refresh(&self) -> Result<(), Error> { + let mut runner = self.runner.to_owned(); // TODO: This alone allows multiple refreshers to be run at once. Do we want that? + + // Queue jobs in ACM runner + for acm in &[ + "01", "02", "03", "04", "05", "06", "07", "08", "09", "10", "11", + ] { + runner + .queue_input(acm, &Message::new("list extension-type")) + .queue_input( + acm, + &Message { + command: String::from("list station"), + fields: Some(vec![String::from("8005ff00"), String::from("0031ff00")]), + datas: None, + error: None, + }, + ); + } + + // Run jobs and collect output. Filter out uneeded commands, combine errors. + let output: Result<Vec<(String, Vec<Message>)>, Error> = runner + .run() + .map(|(name, output)| { + let output: Vec<Message> = output? + .into_iter() + .filter(|m| m.command != "logoff") + .collect(); + Ok((name, output)) + }) + .collect(); + let output = output.with_context(|| "Failed to run refresh commands on ACM(s).")?; + + // Log any ACM errors encountered + for error in output + .iter() + .map(|(_, messages)| messages) + .flatten() + .filter_map(|m| m.error.as_ref()) + { + error!("ACM error: {}", error); + } + + // Build a map of station number-to-rooms + let rooms: HashMap<String, String> = output + .iter() + .map(|(_, messages)| { + messages + .iter() + .filter(|message| message.command == "list station") + .filter_map(|message| message.datas.to_owned()) + .flatten() + .filter_map(|stat| Some((stat.get(0)?.to_owned(), stat.get(1)?.to_owned()))) + }) + .flatten() + .collect(); + + // Build the haystack from the room map and extension-type output + let haystack: HaystackEntries = output + .into_iter() + .map(|(acm_name, messages)| { + let rooms = &rooms; + let acm_name = format!("CM{}", acm_name); + + messages + .into_iter() + .filter(|message| message.command == "list extension-type") + .filter_map(|message| message.datas) + .flatten() + .map(move |mut extension| { + let room = extension + .get(0) + .map(|num| rooms.get(num)) + .flatten() + .map(|room| room.to_owned()) + .unwrap_or_else(String::new); + + extension.push(acm_name.to_owned()); + extension.push(room); + extension + }) + }) + .flatten() + .collect(); - if let Ok(mut handle) = inner.lock() { - *handle = String::from("fresh"); - eprintln!("evicted"); - } - }); + // Propogate the new data to the shared haystack entries + let mut lock = self + .entries + .lock() + .map_err(|e| anyhow!(e.to_string())) + .with_context(|| "Failed to get haystack inner data lock.")?; + *lock = haystack; + Ok(()) } } |