moved tracker_state lock to coordstate :'(

This commit is contained in:
Nickiel12 2024-06-23 20:17:57 -07:00
parent 8251af515f
commit 362538c001
11 changed files with 234 additions and 208 deletions

View file

@ -16,7 +16,7 @@ use tracing::{debug, error, info, instrument};
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::coordinator::socket_listen; use crate::coordinator::socket_listen;
use crate::remote_sources::TrackerState; use crate::coordinator::tracker_state::TrackerState;
use crate::{gstreamer_pipeline, remote_sources}; use crate::{gstreamer_pipeline, remote_sources};
use crate::{joystick_source::joystick_loop, ui::GuiUpdate}; use crate::{joystick_source::joystick_loop, ui::GuiUpdate};
@ -184,8 +184,6 @@ impl<'a> CoordState<'a> {
self.pipeline.sink_frame.clone(), self.pipeline.sink_frame.clone(),
self.to_mec.clone(), self.to_mec.clone(),
self.tracker_connection_state.clone(), self.tracker_connection_state.clone(),
self.tracker_state.clone(),
self.tracker_metrics.clone(),
)); ));
} }

View file

@ -20,11 +20,12 @@ mod coord_state;
mod perf_state; mod perf_state;
mod process_box_string; mod process_box_string;
mod remote_video_processor; mod remote_video_processor;
pub mod tracker_state;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::remote_sources::TrackerState; use crate::ui::{GuiUpdate, NormalizedBoxCoords};
use crate::ui::GuiUpdate;
pub use coord_state::{CoordState, SocketState}; pub use coord_state::{CoordState, SocketState};
use tracker_state::TrackerState;
const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2); const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2);
@ -41,10 +42,23 @@ pub enum ConnectionType {
Automated, Automated,
} }
pub enum TrackerUpdate {
Clear,
Fail,
Update(TrackerUpdatePackage),
}
pub struct TrackerUpdatePackage {
boxes: Vec<NormalizedBoxCoords>,
time: Instant,
request_duration: Duration,
}
pub enum ApplicationEvent { pub enum ApplicationEvent {
CameraConnectionPress, CameraConnectionPress,
SocketMessage(Message), SocketMessage(Message),
MoveEvent(MoveEvent, ConnectionType), MoveEvent(MoveEvent, ConnectionType),
TrackerUpdate(TrackerUpdate),
EnableAutomatic(bool), EnableAutomatic(bool),
} }
@ -142,6 +156,60 @@ pub async fn start_coordinator(
} }
} }
} }
ApplicationEvent::TrackerUpdate(update) => match update {
TrackerUpdate::Clear => {
if let Ok(mut ts) = state.tracker_state.lock() {
ts.clear();
}
{
let mut tm = state.tracker_metrics.lock().await;
tm.clear_times();
}
}
TrackerUpdate::Fail => {
let mut tm = state.tracker_metrics.lock().await;
let fail_count: usize = tm.fail_count + 1;
tm.starting_connection(Some(fail_count));
}
TrackerUpdate::Update(update) => {
let mut x_adj: i32 = 0;
let mut y_adj: i32 = 0;
if let Ok(mut ts) = state.tracker_state.lock() {
ts.update_from_boxes(update.boxes);
ts.last_detect = update.time;
match ts.calculate_tracking() {
Ok((x, y, _tracker_enabled)) => {
x_adj = x;
y_adj = y;
}
Err(e) => {
error!("Could not calculate the tracking!: {e}");
}
}
}
let me = MoveEvent { x: x_adj, y: y_adj };
if let Err(e) = state
.to_mec
.send(ApplicationEvent::MoveEvent(
me.clone(),
ConnectionType::Automated,
))
.await
{
error!("Could not send to MEC... even though in the MEC?! {e}");
}
if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(me)).await {
error!("Could not send to MEC... even though in the MEC?! {e}");
}
{
let mut tm = state.tracker_metrics.lock().await;
tm.insert_time(update.request_duration);
}
}
},
} }
} }

View file

@ -6,6 +6,7 @@ const DEGRADED_TRACKER_TIME: u128 = 150;
#[derive(Debug)] #[derive(Debug)]
pub struct TrackerMetrics { pub struct TrackerMetrics {
pub header_text: sync::Arc<sync::RwLock<String>>, pub header_text: sync::Arc<sync::RwLock<String>>,
pub fail_count: usize,
tracker_times: VecDeque<u128>, tracker_times: VecDeque<u128>,
} }
@ -13,6 +14,7 @@ impl TrackerMetrics {
pub fn new(text_reference: sync::Arc<sync::RwLock<String>>) -> Self { pub fn new(text_reference: sync::Arc<sync::RwLock<String>>) -> Self {
let mut ret = TrackerMetrics { let mut ret = TrackerMetrics {
header_text: text_reference, header_text: text_reference,
fail_count: 0,
tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES), tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES),
}; };

View file

@ -1,12 +1,6 @@
use std::sync::{Arc, Mutex};
use super::TrackerState;
use crate::ui::NormalizedBoxCoords; use crate::ui::NormalizedBoxCoords;
pub fn process_incoming_string( pub fn process_incoming_string(message: String) -> Result<Vec<NormalizedBoxCoords>, String> {
message: String,
identity_boxes: &Arc<Mutex<TrackerState>>, // This goes all the way back to the GUI thread for drawing boxes
) -> core::result::Result<(), String> {
let mut boxes: Vec<NormalizedBoxCoords> = Vec::new(); let mut boxes: Vec<NormalizedBoxCoords> = Vec::new();
for line in message.lines() { for line in message.lines() {
@ -45,18 +39,5 @@ pub fn process_incoming_string(
}); });
} }
// Replace the memory address in the mutex guard with that of the created vec above Ok(boxes)
if let Ok(mut ib) = identity_boxes.lock() {
let mut old_ids: Vec<u32> = ib.identity_boxes.iter().map(|x| x.id).collect();
old_ids.sort();
let mut new_ids: Vec<u32> = boxes.iter().map(|x| x.id).collect();
new_ids.sort();
ib.update_ids = new_ids == old_ids;
// Replace the memory address in the mutex guard with that of the created vec above
ib.identity_boxes = boxes;
}
Ok(())
} }

View file

@ -1,21 +1,18 @@
use std::{ use std::{
cmp::{max, min},
sync::{atomic::Ordering, Arc, Mutex}, sync::{atomic::Ordering, Arc, Mutex},
time::Duration, time::{Duration, Instant},
}; };
use async_channel::Sender; use async_channel::Sender;
use futures_util::{SinkExt, StreamExt, TryStreamExt}; use futures_util::{SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink; use gstreamer_app::AppSink;
use tokio::time::{sleep_until, Instant}; use tokio::time::sleep_until;
use tokio_tungstenite::{connect_async, tungstenite::Message}; use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info, instrument}; use tracing::{error, info, instrument, warn};
use crate::remote_sources::TrackerState;
use super::{ use super::{
perf_state::TrackerMetrics, process_box_string::process_incoming_string, ApplicationEvent, process_box_string::process_incoming_string, ApplicationEvent, SocketState, TrackerUpdate,
SocketState, TrackerUpdatePackage,
}; };
#[instrument(skip_all)] #[instrument(skip_all)]
@ -24,37 +21,23 @@ pub async fn remote_video_loop(
appsink: Arc<Mutex<AppSink>>, appsink: Arc<Mutex<AppSink>>,
to_mec: Sender<ApplicationEvent>, to_mec: Sender<ApplicationEvent>,
socket_state: Arc<SocketState>, socket_state: Arc<SocketState>,
tracker_state: Arc<Mutex<TrackerState>>,
tracker_metrics: Arc<tokio::sync::Mutex<TrackerMetrics>>,
) { ) {
info!( info!(
"Starting remote tracker processing connection to: {}", "Starting remote tracker processing connection to: {}",
conn_string conn_string
); );
let mut fail_count = 0;
{
let mut tm = tracker_metrics.lock().await;
tm.starting_connection(None);
}
loop {
socket_state.is_connected.store(true, Ordering::SeqCst); socket_state.is_connected.store(true, Ordering::SeqCst);
match connect_async(&conn_string).await { match connect_async(&conn_string).await {
Err(e) => { Err(e) => {
fail_count += 1; warn!("Could not connect to remote computer: {e}");
if let Err(e) = to_mec
.send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Fail))
.await
{ {
let mut tm = tracker_metrics.lock().await; error!("Could not send message to MEC! {e}");
tm.starting_connection(Some(fail_count));
} }
if fail_count >= 5 {
break;
}
error!("Could not connect to remote video loop! Trying again in 1 seconds: {e}");
sleep_until(Instant::now() + Duration::from_secs(1)).await;
} }
Ok((connection, _)) => { Ok((connection, _)) => {
let (mut sender, mut recvr) = connection.split(); let (mut sender, mut recvr) = connection.split();
@ -86,7 +69,6 @@ pub async fn remote_video_loop(
} }
}; };
if let Err(e) = sender.send(image_message).await { if let Err(e) = sender.send(image_message).await {
error!("There was an error sending the video frame to the server: {e}"); error!("There was an error sending the video frame to the server: {e}");
if let Err(e) = sender.close().await { if let Err(e) = sender.close().await {
@ -99,29 +81,26 @@ pub async fn remote_video_loop(
match recvr.try_next().await { match recvr.try_next().await {
Ok(Some(message)) => { Ok(Some(message)) => {
let (x_off, y_off, _do_send) = match process_incoming_string(message.to_string()) {
process_incoming_string(message.to_string(), &tracker_state) Ok(v) => {
.and_then(|_| calculate_tracking(&tracker_state))
.unwrap_or((0, 0, false));
let do_send = true;
// For some reason, this do_send is inverted from what it should be
// info!("Do Send is: {}", do_send.to_string());
if do_send {
if let Err(e) = to_mec if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent( .send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Update(
super::MoveEvent { x: x_off, y: y_off }, TrackerUpdatePackage {
super::ConnectionType::Automated, boxes: v,
)) time: Instant::now(),
request_duration: Instant::now() - last_iter,
},
)))
.await .await
{ {
error!("Could not send message to MEC, assuming critical failure: {e}"); error!("Could not send to MEC! {e}");
socket_state.is_connected.store(false, Ordering::SeqCst); break;
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
} }
} }
Err(e) => {
error!("Could not parse incoming string! {e}");
}
};
} }
Ok(None) => { Ok(None) => {
info!("Recieved an empty message from the remote computer: Aborting"); info!("Recieved an empty message from the remote computer: Aborting");
@ -137,31 +116,22 @@ pub async fn remote_video_loop(
break; break;
} }
{
let mut tm = tracker_metrics.lock().await;
tm.insert_time(Instant::now() - last_iter);
}
// rate limit updates // rate limit updates
// prevent starving the GUI thread's lock on the tracker state // prevent starving the GUI thread's lock on the tracker state
sleep_until(Instant::now() + Duration::from_millis(10)).await; sleep_until(tokio::time::Instant::now() + Duration::from_millis(10)).await;
} }
} }
} }
if !socket_state.stay_connected.load(Ordering::SeqCst) {
break;
}
}
info!("Shutting down remote video loop"); info!("Shutting down remote video loop");
if let Err(e) = to_mec
.send(ApplicationEvent::TrackerUpdate(TrackerUpdate::Clear))
.await
{ {
let mut tm = tracker_metrics.lock().await; error!("Could not send message to MEC! {e}");
tm.clear_times();
} }
{ {
if let Ok(mut ts) = tracker_state.lock() {
ts.clear();
}
// This message forces a redraw after clearing the queue // This message forces a redraw after clearing the queue
if let Err(e) = to_mec if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent( .send(ApplicationEvent::MoveEvent(
@ -189,49 +159,3 @@ fn get_video_frame(appsink: &AppSink) -> Result<Message, String> {
.map_err(|e| format!("Could not get readable map: {e}"))?; .map_err(|e| format!("Could not get readable map: {e}"))?;
Ok(Message::binary(map.to_vec())) Ok(Message::binary(map.to_vec()))
} }
fn calculate_tracking(
tracker_state: &Arc<Mutex<TrackerState>>,
) -> core::result::Result<(i32, i32, bool), String> {
#[cfg(feature = "tracker-state-debug")]
debug!("Getting lock on tracker state for caculate tracking");
if let Ok(mut ts) = tracker_state.lock() {
// if ts.last_detect + Duration::from_secs(2) < Instant::now() && !ts.identity_boxes.is_empty() {
// info!("Setting new target: {}", ts.identity_boxes[0].id);
// ts.tracking_id = ts.identity_boxes[0].id;
// }
if let Some(target_box) = ts.identity_boxes.iter().find(|e| e.id == ts.tracking_id) {
let x_adjust = calc_x_adjust(target_box.x1, target_box.x2);
let y_adjust = calc_y_adjust(target_box.y1);
ts.last_detect = std::time::Instant::now();
Ok((x_adjust, y_adjust, ts.enabled))
} else {
Err("Couldn't find target in results".to_string())
}
} else {
Err("Couldn't lock tracker state".to_string())
}
}
fn calc_x_adjust(x1: f32, x2: f32) -> i32 {
let dist_from_center = ((x1 + x2) / 2.0) - 0.5;
let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32;
if x_adjust < 15 && x_adjust > -15 {
x_adjust = 0;
}
min(max(x_adjust, -100), 100)
}
fn calc_y_adjust(y1: f32) -> i32 {
// All values are normalized, then multiplied by 1000. 500 == 50% of the screen
let mut y_adjust = ((y1 - 0.1) * 250.0) as i32;
if y_adjust < 0 {
y_adjust -= 20;
} else if y_adjust < 30 {
y_adjust = 0;
} else {
y_adjust = (y_adjust as f32 * 0.75) as i32;
}
min(max(y_adjust, -100), 100)
}

View file

@ -0,0 +1,77 @@
use std::{
cmp::{max, min},
time::Instant,
};
use crate::ui::NormalizedBoxCoords;
#[derive(Debug)]
pub struct TrackerState {
pub tracking_id: u32,
pub highlighted_id: Option<u32>,
pub last_detect: Instant,
pub enabled: bool,
pub update_ids: bool,
pub identity_boxes: Vec<NormalizedBoxCoords>,
}
impl TrackerState {
pub fn clear(&mut self) {
self.tracking_id = 0;
self.highlighted_id = None;
self.last_detect = Instant::now();
self.enabled = false;
self.update_ids = false;
self.identity_boxes.clear();
}
pub fn update_from_boxes(&mut self, new_boxes: Vec<NormalizedBoxCoords>) {
let mut old_ids: Vec<u32> = self.identity_boxes.iter().map(|x| x.id).collect();
old_ids.sort();
let mut new_ids: Vec<u32> = new_boxes.iter().map(|x| x.id).collect();
new_ids.sort();
self.update_ids = new_ids == old_ids;
self.identity_boxes = new_boxes;
}
pub fn calculate_tracking(&mut self) -> core::result::Result<(i32, i32, bool), String> {
if let Some(target_box) = self
.identity_boxes
.iter()
.find(|e| e.id == self.tracking_id)
{
let x_adjust = calc_x_adjust(target_box.x1, target_box.x2);
let y_adjust = calc_y_adjust(target_box.y1);
self.last_detect = std::time::Instant::now();
Ok((x_adjust, y_adjust, self.enabled))
} else {
Err("Couldn't find target in results".to_string())
}
}
}
fn calc_x_adjust(x1: f32, x2: f32) -> i32 {
let dist_from_center = ((x1 + x2) / 2.0) - 0.5;
let mut x_adjust = ((dist_from_center / 0.5 * 2.0) * 100.0) as i32;
if x_adjust < 15 && x_adjust > -15 {
x_adjust = 0;
}
min(max(x_adjust, -100), 100)
}
fn calc_y_adjust(y1: f32) -> i32 {
// All values are normalized, then multiplied by 1000. 500 == 50% of the screen
let mut y_adjust = ((y1 - 0.1) * 250.0) as i32;
if y_adjust < 0 {
y_adjust -= 20;
} else if y_adjust < 30 {
y_adjust = 0;
} else {
y_adjust = (y_adjust as f32 * 0.75) as i32;
}
min(max(y_adjust, -100), 100)
}

View file

@ -122,7 +122,8 @@ impl WebcamPipeline {
// -- BEGIN PAINTABLE SINK PIPELINE // -- BEGIN PAINTABLE SINK PIPELINE
let tee_caps = let tee_caps =
gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu { // gstreamer::caps::Caps::from_str("video/x-raw,framerate=15/1").context(BuildSnafu {
gstreamer::caps::Caps::from_str("video/x-raw").context(BuildSnafu {
element: "tee caps", element: "tee caps",
})?; })?;

View file

@ -1,7 +1,6 @@
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
sync::{atomic::Ordering, Arc, Mutex}, sync::{atomic::Ordering, Arc, Mutex},
time::Instant,
}; };
use async_channel::Sender; use async_channel::Sender;
@ -20,34 +19,10 @@ use tracing::instrument;
mod remote_source; mod remote_source;
use crate::{ use crate::coordinator::{
coordinator::{ApplicationEvent, ConnectionType, SocketState}, tracker_state::TrackerState, ApplicationEvent, ConnectionType, SocketState,
ui::NormalizedBoxCoords,
}; };
#[derive(Debug)]
pub struct TrackerState {
pub tracking_id: u32,
pub highlighted_id: Option<u32>,
pub last_detect: Instant,
pub enabled: bool,
pub update_ids: bool,
pub identity_boxes: Vec<NormalizedBoxCoords>,
}
impl TrackerState {
pub fn clear(&mut self) {
self.tracking_id = 0;
self.highlighted_id = None;
self.last_detect = Instant::now();
self.enabled = false;
self.update_ids = false;
self.identity_boxes.clear();
}
}
#[instrument(skip(rt, mec))] #[instrument(skip(rt, mec))]
pub async fn start_socketserver( pub async fn start_socketserver(
rt: Handle, rt: Handle,

View file

@ -14,7 +14,7 @@ use tracing::{error, event, span, Level};
#[cfg(feature = "tracker-state-debug")] #[cfg(feature = "tracker-state-debug")]
use tracing::debug; use tracing::debug;
use crate::{coordinator::ApplicationEvent, remote_sources::TrackerState}; use crate::coordinator::{tracker_state::TrackerState, ApplicationEvent};
#[derive(Debug)] #[derive(Debug)]
pub struct ControlPanel { pub struct ControlPanel {

View file

@ -9,7 +9,7 @@ use gtk::{
AspectFrame, Box, DrawingArea, EventControllerMotion, GestureClick, Label, Overlay, Picture, AspectFrame, Box, DrawingArea, EventControllerMotion, GestureClick, Label, Overlay, Picture,
}; };
use crate::remote_sources::TrackerState; use crate::coordinator::tracker_state::TrackerState;
use super::NormalizedBoxCoords; use super::NormalizedBoxCoords;

View file

@ -12,8 +12,8 @@ use tokio::runtime::Handle;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::coordinator::tracker_state::TrackerState;
use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent}; use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent};
use crate::remote_sources::TrackerState;
mod control_panel; mod control_panel;
mod liveview_panel; mod liveview_panel;