diff --git a/src/coordinator/coord_state.rs b/src/coordinator/coord_state.rs index 3f43353..0b5502b 100644 --- a/src/coordinator/coord_state.rs +++ b/src/coordinator/coord_state.rs @@ -16,7 +16,7 @@ use tracing::{debug, error, info, instrument}; use crate::config::AppConfig; use crate::coordinator::socket_listen; -use crate::remote_sources::TrackerState; +use crate::coordinator::tracker_state::TrackerState; use crate::{gstreamer_pipeline, remote_sources}; use crate::{joystick_source::joystick_loop, ui::GuiUpdate}; @@ -184,8 +184,6 @@ impl<'a> CoordState<'a> { self.pipeline.sink_frame.clone(), self.to_mec.clone(), self.tracker_connection_state.clone(), - self.tracker_state.clone(), - self.tracker_metrics.clone(), )); } diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs index c3fc3d0..8c7babd 100644 --- a/src/coordinator/mod.rs +++ b/src/coordinator/mod.rs @@ -20,11 +20,12 @@ mod coord_state; mod perf_state; mod process_box_string; mod remote_video_processor; +pub mod tracker_state; use crate::config::AppConfig; -use crate::remote_sources::TrackerState; -use crate::ui::GuiUpdate; +use crate::ui::{GuiUpdate, NormalizedBoxCoords}; pub use coord_state::{CoordState, SocketState}; +use tracker_state::TrackerState; const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2); @@ -41,10 +42,23 @@ pub enum ConnectionType { Automated, } +pub enum TrackerUpdate { + Clear, + Fail, + Update(TrackerUpdatePackage), +} + +pub struct TrackerUpdatePackage { + boxes: Vec, + time: Instant, + request_duration: Duration, +} + pub enum ApplicationEvent { CameraConnectionPress, SocketMessage(Message), MoveEvent(MoveEvent, ConnectionType), + TrackerUpdate(TrackerUpdate), EnableAutomatic(bool), } @@ -142,6 +156,60 @@ pub async fn start_coordinator( } } } + ApplicationEvent::TrackerUpdate(update) => match update { + TrackerUpdate::Clear => { + if let Ok(mut ts) = state.tracker_state.lock() { + ts.clear(); + } + { + let mut tm = state.tracker_metrics.lock().await; + tm.clear_times(); + } + } + TrackerUpdate::Fail => { + let mut tm = state.tracker_metrics.lock().await; + let fail_count: usize = tm.fail_count + 1; + tm.starting_connection(Some(fail_count)); + } + TrackerUpdate::Update(update) => { + let mut x_adj: i32 = 0; + let mut y_adj: i32 = 0; + + if let Ok(mut ts) = state.tracker_state.lock() { + ts.update_from_boxes(update.boxes); + ts.last_detect = update.time; + + match ts.calculate_tracking() { + Ok((x, y, _tracker_enabled)) => { + x_adj = x; + y_adj = y; + } + Err(e) => { + error!("Could not calculate the tracking!: {e}"); + } + } + } + + let me = MoveEvent { x: x_adj, y: y_adj }; + if let Err(e) = state + .to_mec + .send(ApplicationEvent::MoveEvent( + me.clone(), + ConnectionType::Automated, + )) + .await + { + error!("Could not send to MEC... even though in the MEC?! {e}"); + } + if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(me)).await { + error!("Could not send to MEC... even though in the MEC?! {e}"); + } + { + let mut tm = state.tracker_metrics.lock().await; + tm.insert_time(update.request_duration); + } + } + }, } } diff --git a/src/coordinator/perf_state.rs b/src/coordinator/perf_state.rs index ed89aca..ab53040 100644 --- a/src/coordinator/perf_state.rs +++ b/src/coordinator/perf_state.rs @@ -6,6 +6,7 @@ const DEGRADED_TRACKER_TIME: u128 = 150; #[derive(Debug)] pub struct TrackerMetrics { pub header_text: sync::Arc>, + pub fail_count: usize, tracker_times: VecDeque, } @@ -13,6 +14,7 @@ impl TrackerMetrics { pub fn new(text_reference: sync::Arc>) -> Self { let mut ret = TrackerMetrics { header_text: text_reference, + fail_count: 0, tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES), }; diff --git a/src/coordinator/process_box_string.rs b/src/coordinator/process_box_string.rs index 340b585..6e8866a 100644 --- a/src/coordinator/process_box_string.rs +++ b/src/coordinator/process_box_string.rs @@ -1,12 +1,6 @@ -use std::sync::{Arc, Mutex}; - -use super::TrackerState; use crate::ui::NormalizedBoxCoords; -pub fn process_incoming_string( - message: String, - identity_boxes: &Arc>, // This goes all the way back to the GUI thread for drawing boxes -) -> core::result::Result<(), String> { +pub fn process_incoming_string(message: String) -> Result, String> { let mut boxes: Vec = Vec::new(); for line in message.lines() { @@ -45,18 +39,5 @@ pub fn process_incoming_string( }); } - // Replace the memory address in the mutex guard with that of the created vec above - if let Ok(mut ib) = identity_boxes.lock() { - let mut old_ids: Vec = ib.identity_boxes.iter().map(|x| x.id).collect(); - old_ids.sort(); - let mut new_ids: Vec = boxes.iter().map(|x| x.id).collect(); - new_ids.sort(); - - ib.update_ids = new_ids == old_ids; - - // Replace the memory address in the mutex guard with that of the created vec above - ib.identity_boxes = boxes; - } - - Ok(()) + Ok(boxes) } diff --git a/src/coordinator/remote_video_processor.rs b/src/coordinator/remote_video_processor.rs index 759757b..199f95a 100644 --- a/src/coordinator/remote_video_processor.rs +++ b/src/coordinator/remote_video_processor.rs @@ -1,21 +1,18 @@ use std::{ - cmp::{max, min}, sync::{atomic::Ordering, Arc, Mutex}, - time::Duration, + time::{Duration, Instant}, }; use async_channel::Sender; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use gstreamer_app::AppSink; -use tokio::time::{sleep_until, Instant}; +use tokio::time::sleep_until; use tokio_tungstenite::{connect_async, tungstenite::Message}; -use tracing::{error, info, instrument}; - -use crate::remote_sources::TrackerState; +use tracing::{error, info, instrument, warn}; use super::{ - perf_state::TrackerMetrics, process_box_string::process_incoming_string, ApplicationEvent, - SocketState, + process_box_string::process_incoming_string, ApplicationEvent, SocketState, TrackerUpdate, + TrackerUpdatePackage, }; #[instrument(skip_all)] @@ -24,144 +21,117 @@ pub async fn remote_video_loop( appsink: Arc>, to_mec: Sender, socket_state: Arc, - tracker_state: Arc>, - tracker_metrics: Arc>, ) { info!( "Starting remote tracker processing connection to: {}", conn_string ); - let mut fail_count = 0; - { - let mut tm = tracker_metrics.lock().await; - tm.starting_connection(None); - } + socket_state.is_connected.store(true, Ordering::SeqCst); - loop { - socket_state.is_connected.store(true, Ordering::SeqCst); - - match connect_async(&conn_string).await { - Err(e) => { - fail_count += 1; - { - let mut tm = tracker_metrics.lock().await; - tm.starting_connection(Some(fail_count)); - } - - if fail_count >= 5 { - break; - } - - error!("Could not connect to remote video loop! Trying again in 1 seconds: {e}"); - sleep_until(Instant::now() + Duration::from_secs(1)).await; + match connect_async(&conn_string).await { + Err(e) => { + warn!("Could not connect to remote computer: {e}"); + if let Err(e) = to_mec + .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Fail)) + .await + { + error!("Could not send message to MEC! {e}"); } - Ok((connection, _)) => { - let (mut sender, mut recvr) = connection.split(); + } + Ok((connection, _)) => { + let (mut sender, mut recvr) = connection.split(); - let mut last_iter: Instant; + let mut last_iter: Instant; - loop { - last_iter = Instant::now(); - // Do this in an encloser to not keep a lock on the appsink - let image_message = match { - let appsnk = match appsink.lock() { - Ok(e) => e, - Err(e) => { - error!("Unrecoverable error: Could not get a lock on the appsink in remote video loop {e}"); - return; - } - }; - - get_video_frame(&appsnk) - } { + loop { + last_iter = Instant::now(); + // Do this in an encloser to not keep a lock on the appsink + let image_message = match { + let appsnk = match appsink.lock() { Ok(e) => e, Err(e) => { - error!("Could not get video frame! {e}"); - if let Err(e) = sender.close().await { - error!("Could not close socket to remote computer: {e}") - } - socket_state.is_connected.store(false, Ordering::SeqCst); - break; + error!("Unrecoverable error: Could not get a lock on the appsink in remote video loop {e}"); + return; } }; - - if let Err(e) = sender.send(image_message).await { - error!("There was an error sending the video frame to the server: {e}"); + get_video_frame(&appsnk) + } { + Ok(e) => e, + Err(e) => { + error!("Could not get video frame! {e}"); if let Err(e) = sender.close().await { error!("Could not close socket to remote computer: {e}") } socket_state.is_connected.store(false, Ordering::SeqCst); - socket_state.stay_connected.store(false, Ordering::SeqCst); break; } + }; - match recvr.try_next().await { - Ok(Some(message)) => { - let (x_off, y_off, _do_send) = - process_incoming_string(message.to_string(), &tracker_state) - .and_then(|_| calculate_tracking(&tracker_state)) - .unwrap_or((0, 0, false)); + if let Err(e) = sender.send(image_message).await { + error!("There was an error sending the video frame to the server: {e}"); + if let Err(e) = sender.close().await { + error!("Could not close socket to remote computer: {e}") + } + socket_state.is_connected.store(false, Ordering::SeqCst); + socket_state.stay_connected.store(false, Ordering::SeqCst); + break; + } - let do_send = true; - - // For some reason, this do_send is inverted from what it should be - // info!("Do Send is: {}", do_send.to_string()); - if do_send { + match recvr.try_next().await { + Ok(Some(message)) => { + match process_incoming_string(message.to_string()) { + Ok(v) => { if let Err(e) = to_mec - .send(ApplicationEvent::MoveEvent( - super::MoveEvent { x: x_off, y: y_off }, - super::ConnectionType::Automated, - )) + .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Update( + TrackerUpdatePackage { + boxes: v, + time: Instant::now(), + request_duration: Instant::now() - last_iter, + }, + ))) .await { - error!("Could not send message to MEC, assuming critical failure: {e}"); - socket_state.is_connected.store(false, Ordering::SeqCst); - socket_state.stay_connected.store(false, Ordering::SeqCst); - return; + error!("Could not send to MEC! {e}"); + break; } } - } - Ok(None) => { - info!("Recieved an empty message from the remote computer: Aborting"); - break; - } - Err(e) => { - error!("Got an error on while recieving from remote computer: {e}"); - } + Err(e) => { + error!("Could not parse incoming string! {e}"); + } + }; } - - if !socket_state.stay_connected.load(Ordering::SeqCst) { - info!("Shutting down remote video loop"); + Ok(None) => { + info!("Recieved an empty message from the remote computer: Aborting"); break; } - - { - let mut tm = tracker_metrics.lock().await; - tm.insert_time(Instant::now() - last_iter); + Err(e) => { + error!("Got an error on while recieving from remote computer: {e}"); } - - // rate limit updates - // prevent starving the GUI thread's lock on the tracker state - sleep_until(Instant::now() + Duration::from_millis(10)).await; } + + if !socket_state.stay_connected.load(Ordering::SeqCst) { + info!("Shutting down remote video loop"); + break; + } + + // rate limit updates + // prevent starving the GUI thread's lock on the tracker state + sleep_until(tokio::time::Instant::now() + Duration::from_millis(10)).await; } } - if !socket_state.stay_connected.load(Ordering::SeqCst) { - break; - } } info!("Shutting down remote video loop"); + if let Err(e) = to_mec + .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Clear)) + .await { - let mut tm = tracker_metrics.lock().await; - tm.clear_times(); + error!("Could not send message to MEC! {e}"); } + { - if let Ok(mut ts) = tracker_state.lock() { - ts.clear(); - } // This message forces a redraw after clearing the queue if let Err(e) = to_mec .send(ApplicationEvent::MoveEvent( @@ -189,49 +159,3 @@ fn get_video_frame(appsink: &AppSink) -> Result { .map_err(|e| format!("Could not get readable map: {e}"))?; Ok(Message::binary(map.to_vec())) } - -fn calculate_tracking( - tracker_state: &Arc>, -) -> core::result::Result<(i32, i32, bool), String> { - #[cfg(feature = "tracker-state-debug")] - debug!("Getting lock on tracker state for caculate tracking"); - if let Ok(mut ts) = tracker_state.lock() { - // if ts.last_detect + Duration::from_secs(2) < Instant::now() && !ts.identity_boxes.is_empty() { - // info!("Setting new target: {}", ts.identity_boxes[0].id); - // ts.tracking_id = ts.identity_boxes[0].id; - // } - - if let Some(target_box) = ts.identity_boxes.iter().find(|e| e.id == ts.tracking_id) { - let x_adjust = calc_x_adjust(target_box.x1, target_box.x2); - let y_adjust = calc_y_adjust(target_box.y1); - ts.last_detect = std::time::Instant::now(); - Ok((x_adjust, y_adjust, ts.enabled)) - } else { - Err("Couldn't find target in results".to_string()) - } - } else { - Err("Couldn't lock tracker state".to_string()) - } -} - -fn calc_x_adjust(x1: f32, x2: f32) -> i32 { - let dist_from_center = ((x1 + x2) / 2.0) - 0.5; - let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32; - if x_adjust < 15 && x_adjust > -15 { - x_adjust = 0; - } - min(max(x_adjust, -100), 100) -} - -fn calc_y_adjust(y1: f32) -> i32 { - // All values are normalized, then multiplied by 1000. 500 == 50% of the screen - let mut y_adjust = ((y1 - 0.1) * 250.0) as i32; - if y_adjust < 0 { - y_adjust -= 20; - } else if y_adjust < 30 { - y_adjust = 0; - } else { - y_adjust = (y_adjust as f32 * 0.75) as i32; - } - min(max(y_adjust, -100), 100) -} diff --git a/src/coordinator/tracker_state.rs b/src/coordinator/tracker_state.rs new file mode 100644 index 0000000..55e6078 --- /dev/null +++ b/src/coordinator/tracker_state.rs @@ -0,0 +1,77 @@ +use std::{ + cmp::{max, min}, + time::Instant, +}; + +use crate::ui::NormalizedBoxCoords; + +#[derive(Debug)] +pub struct TrackerState { + pub tracking_id: u32, + pub highlighted_id: Option, + pub last_detect: Instant, + pub enabled: bool, + + pub update_ids: bool, + + pub identity_boxes: Vec, +} + +impl TrackerState { + pub fn clear(&mut self) { + self.tracking_id = 0; + self.highlighted_id = None; + self.last_detect = Instant::now(); + self.enabled = false; + self.update_ids = false; + self.identity_boxes.clear(); + } + + pub fn update_from_boxes(&mut self, new_boxes: Vec) { + let mut old_ids: Vec = self.identity_boxes.iter().map(|x| x.id).collect(); + old_ids.sort(); + let mut new_ids: Vec = new_boxes.iter().map(|x| x.id).collect(); + new_ids.sort(); + + self.update_ids = new_ids == old_ids; + + self.identity_boxes = new_boxes; + } + + pub fn calculate_tracking(&mut self) -> core::result::Result<(i32, i32, bool), String> { + if let Some(target_box) = self + .identity_boxes + .iter() + .find(|e| e.id == self.tracking_id) + { + let x_adjust = calc_x_adjust(target_box.x1, target_box.x2); + let y_adjust = calc_y_adjust(target_box.y1); + self.last_detect = std::time::Instant::now(); + Ok((x_adjust, y_adjust, self.enabled)) + } else { + Err("Couldn't find target in results".to_string()) + } + } +} + +fn calc_x_adjust(x1: f32, x2: f32) -> i32 { + let dist_from_center = ((x1 + x2) / 2.0) - 0.5; + let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32; + if x_adjust < 15 && x_adjust > -15 { + x_adjust = 0; + } + min(max(x_adjust, -100), 100) +} + +fn calc_y_adjust(y1: f32) -> i32 { + // All values are normalized, then multiplied by 1000. 500 == 50% of the screen + let mut y_adjust = ((y1 - 0.1) * 250.0) as i32; + if y_adjust < 0 { + y_adjust -= 20; + } else if y_adjust < 30 { + y_adjust = 0; + } else { + y_adjust = (y_adjust as f32 * 0.75) as i32; + } + min(max(y_adjust, -100), 100) +} diff --git a/src/gstreamer_pipeline.rs b/src/gstreamer_pipeline.rs index fa73188..ff18c35 100644 --- a/src/gstreamer_pipeline.rs +++ b/src/gstreamer_pipeline.rs @@ -122,7 +122,8 @@ impl WebcamPipeline { // -- BEGIN PAINTABLE SINK PIPELINE let tee_caps = - gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu { + // gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu { + gstreamer::caps::Caps::from_str("video/x-raw").context(BuildSnafu { element: "tee caps", })?; diff --git a/src/remote_sources/mod.rs b/src/remote_sources/mod.rs index c4bc86f..09bdcfd 100644 --- a/src/remote_sources/mod.rs +++ b/src/remote_sources/mod.rs @@ -1,7 +1,6 @@ use std::{ net::SocketAddr, sync::{atomic::Ordering, Arc, Mutex}, - time::Instant, }; use async_channel::Sender; @@ -20,34 +19,10 @@ use tracing::instrument; mod remote_source; -use crate::{ - coordinator::{ApplicationEvent, ConnectionType, SocketState}, - ui::NormalizedBoxCoords, +use crate::coordinator::{ + tracker_state::TrackerState, ApplicationEvent, ConnectionType, SocketState, }; -#[derive(Debug)] -pub struct TrackerState { - pub tracking_id: u32, - pub highlighted_id: Option, - pub last_detect: Instant, - pub enabled: bool, - - pub update_ids: bool, - - pub identity_boxes: Vec, -} - -impl TrackerState { - pub fn clear(&mut self) { - self.tracking_id = 0; - self.highlighted_id = None; - self.last_detect = Instant::now(); - self.enabled = false; - self.update_ids = false; - self.identity_boxes.clear(); - } -} - #[instrument(skip(rt, mec))] pub async fn start_socketserver( rt: Handle, diff --git a/src/ui/control_panel.rs b/src/ui/control_panel.rs index ae344dd..3b2a22d 100644 --- a/src/ui/control_panel.rs +++ b/src/ui/control_panel.rs @@ -14,7 +14,7 @@ use tracing::{error, event, span, Level}; #[cfg(feature = "tracker-state-debug")] use tracing::debug; -use crate::{coordinator::ApplicationEvent, remote_sources::TrackerState}; +use crate::coordinator::{tracker_state::TrackerState, ApplicationEvent}; #[derive(Debug)] pub struct ControlPanel { diff --git a/src/ui/liveview_panel.rs b/src/ui/liveview_panel.rs index f1ff004..268d79c 100644 --- a/src/ui/liveview_panel.rs +++ b/src/ui/liveview_panel.rs @@ -9,7 +9,7 @@ use gtk::{ AspectFrame, Box, DrawingArea, EventControllerMotion, GestureClick, Label, Overlay, Picture, }; -use crate::remote_sources::TrackerState; +use crate::coordinator::tracker_state::TrackerState; use super::NormalizedBoxCoords; diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 1d9944c..515b1a1 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -12,8 +12,8 @@ use tokio::runtime::Handle; use tokio::sync::RwLock; use crate::config::AppConfig; +use crate::coordinator::tracker_state::TrackerState; use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent}; -use crate::remote_sources::TrackerState; mod control_panel; mod liveview_panel;