moved to channels instead of mutexes for tracker state updates

This commit is contained in:
Nickiel12 2024-06-27 19:56:11 -07:00
parent 362538c001
commit c28ffbaa17
6 changed files with 159 additions and 138 deletions

View file

@ -1,5 +1,4 @@
use std::pin::Pin;
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -33,7 +32,7 @@ pub struct SocketState {
#[derive(Debug)]
pub struct CoordState<'a> {
pub settings: Arc<RwLock<AppConfig>>,
pub tracker_metrics: Arc<tokio::sync::Mutex<TrackerMetrics>>,
pub tracker_metrics: TrackerMetrics,
pub sck_outbound: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
pub remote_sources_state: Arc<SocketState>,
@ -50,7 +49,7 @@ pub struct CoordState<'a> {
pub pipeline: gstreamer_pipeline::WebcamPipeline,
pub tracker_state: Arc<Mutex<TrackerState>>,
pub tracker_state: TrackerState,
pub tracker_connection_state: Arc<SocketState>,
}
@ -60,13 +59,11 @@ impl<'a> CoordState<'a> {
to_mec: Sender<ApplicationEvent>,
to_gui: Sender<GuiUpdate>,
rt: Handle,
tracker_state: Arc<Mutex<TrackerState>>,
settings: Arc<RwLock<AppConfig>>,
tracker_header: Arc<std::sync::RwLock<String>>,
) -> Self {
let this = CoordState {
settings,
tracker_metrics: Arc::new(tokio::sync::Mutex::new(TrackerMetrics::new(tracker_header))),
tracker_metrics: TrackerMetrics::new(to_gui.clone()),
sck_outbound: None,
stay_alive_sck_recvr: Arc::new(AtomicBool::new(false)),
@ -86,7 +83,15 @@ impl<'a> CoordState<'a> {
pipeline: gstreamer_pipeline::WebcamPipeline::new().unwrap(),
tracker_state,
tracker_state: TrackerState{
tracking_id: 0,
highlighted_id: None,
last_detect: Instant::now(),
enabled: true,
identity_boxes: vec![],
update_ids: false,
},
tracker_connection_state: Arc::new(SocketState {
stay_connected: AtomicBool::new(false),
is_connected: AtomicBool::new(false),
@ -225,7 +230,6 @@ impl<'a> CoordState<'a> {
self.rt.clone(),
self.to_mec.clone(),
self.remote_sources_state.clone(),
self.tracker_state.clone(),
));
}

View file

@ -1,5 +1,4 @@
use std::pin::pin;
use std::sync::Mutex;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -25,7 +24,6 @@ pub mod tracker_state;
use crate::config::AppConfig;
use crate::ui::{GuiUpdate, NormalizedBoxCoords};
pub use coord_state::{CoordState, SocketState};
use tracker_state::TrackerState;
const PRIORITY_TIMEOUT: Duration = Duration::from_secs(2);
@ -46,12 +44,14 @@ pub enum TrackerUpdate {
Clear,
Fail,
Update(TrackerUpdatePackage),
HeaderUpdate(String),
}
#[derive(Clone)]
pub struct TrackerUpdatePackage {
boxes: Vec<NormalizedBoxCoords>,
time: Instant,
request_duration: Duration,
pub boxes: Vec<NormalizedBoxCoords>,
pub time: Instant,
pub request_duration: Duration,
}
pub enum ApplicationEvent {
@ -59,6 +59,7 @@ pub enum ApplicationEvent {
SocketMessage(Message),
MoveEvent(MoveEvent, ConnectionType),
TrackerUpdate(TrackerUpdate),
ChangeTracking(u32),
EnableAutomatic(bool),
}
@ -69,9 +70,7 @@ pub async fn start_coordinator(
to_mec: Sender<ApplicationEvent>,
to_gui: Sender<GuiUpdate>,
runtime: Handle,
tracker_state: Arc<Mutex<TrackerState>>,
settings: Arc<RwLock<AppConfig>>,
tracker_header: Arc<std::sync::RwLock<String>>,
) {
info!("Starting coordinator!");
@ -82,9 +81,7 @@ pub async fn start_coordinator(
to_mec,
to_gui,
runtime,
tracker_state,
settings,
tracker_header,
);
state
@ -119,16 +116,16 @@ pub async fn start_coordinator(
ApplicationEvent::SocketMessage(socket_message) => {
state.socket_send(socket_message).await;
}
ApplicationEvent::ChangeTracking(new_id) => {
state.tracker_state.tracking_id = new_id;
}
ApplicationEvent::EnableAutomatic(do_enable) => {
#[cfg(feature = "tracker-state-debug")]
debug!("Trying to get lock on tracker_state for enable automatic");
if let Ok(mut ts) = state.tracker_state.lock() {
ts.enabled = do_enable;
state
.tracker_connection_state
.stay_connected
.store(do_enable, Ordering::SeqCst);
}
state.tracker_state.enabled = do_enable;
state
.tracker_connection_state
.stay_connected
.store(do_enable, Ordering::SeqCst);
state.check_states().await;
}
ApplicationEvent::MoveEvent(coord, priority) => {
@ -157,36 +154,43 @@ pub async fn start_coordinator(
}
}
ApplicationEvent::TrackerUpdate(update) => match update {
TrackerUpdate::HeaderUpdate(_) => {}
TrackerUpdate::Clear => {
if let Ok(mut ts) = state.tracker_state.lock() {
ts.clear();
}
{
let mut tm = state.tracker_metrics.lock().await;
tm.clear_times();
state.tracker_state.clear();
state.tracker_metrics.clear_times();
if let Err(e) = state.to_gui.send(GuiUpdate::TrackerUpdate(TrackerUpdate::Clear)).await {
error!("Could not send message to GUI: {e}");
break;
}
}
TrackerUpdate::Fail => {
let mut tm = state.tracker_metrics.lock().await;
let fail_count: usize = tm.fail_count + 1;
tm.starting_connection(Some(fail_count));
let fail_count: usize = state.tracker_metrics.fail_count + 1;
state.tracker_metrics.starting_connection(Some(fail_count));
}
TrackerUpdate::Update(update) => {
let mut x_adj: i32 = 0;
let mut y_adj: i32 = 0;
if let Ok(mut ts) = state.tracker_state.lock() {
ts.update_from_boxes(update.boxes);
ts.last_detect = update.time;
if let Err(e) = state.to_gui
.send(GuiUpdate::TrackerUpdate(TrackerUpdate::Update(
update.clone(),
)))
.await
{
error!("Could not send message to the GUI: {e}");
break;
}
match ts.calculate_tracking() {
Ok((x, y, _tracker_enabled)) => {
x_adj = x;
y_adj = y;
}
Err(e) => {
error!("Could not calculate the tracking!: {e}");
}
state.tracker_state.update_from_boxes(update.boxes);
state.tracker_state.last_detect = update.time;
match state.tracker_state.calculate_tracking() {
Ok((x, y, _tracker_enabled)) => {
x_adj = x;
y_adj = y;
}
Err(e) => {
error!("Could not calculate the tracking!: {e}");
}
}
@ -204,10 +208,7 @@ pub async fn start_coordinator(
if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(me)).await {
error!("Could not send to MEC... even though in the MEC?! {e}");
}
{
let mut tm = state.tracker_metrics.lock().await;
tm.insert_time(update.request_duration);
}
state.tracker_metrics.insert_time(update.request_duration);
}
},
}

View file

@ -1,48 +1,59 @@
use std::{collections::VecDeque, sync, time::Duration};
use std::{collections::VecDeque, time::Duration};
use async_channel::Sender;
use tracing::error;
use crate::ui::GuiUpdate;
const MAX_RECORDED_TIMES: usize = 10;
const DEGRADED_TRACKER_TIME: u128 = 150;
#[derive(Debug)]
pub struct TrackerMetrics {
pub header_text: sync::Arc<sync::RwLock<String>>,
pub header_text: String,
pub fail_count: usize,
tracker_times: VecDeque<u128>,
to_gui: Sender<GuiUpdate>,
}
impl TrackerMetrics {
pub fn new(text_reference: sync::Arc<sync::RwLock<String>>) -> Self {
pub fn new(to_gui: Sender<GuiUpdate>) -> Self {
let mut ret = TrackerMetrics {
header_text: text_reference,
header_text: String::from(""),
fail_count: 0,
tracker_times: VecDeque::with_capacity(MAX_RECORDED_TIMES),
to_gui,
};
ret.clear_times();
ret
}
fn update_gui(&mut self) {
if let Err(e) = self.to_gui.send_blocking(GuiUpdate::TrackerUpdate(super::TrackerUpdate::HeaderUpdate(self.header_text.clone()))) {
error!("TrackerMetrics couldnt' send update to GUI: {e}");
}
}
pub fn starting_connection(&mut self, fail_count: Option<usize>) {
self.clear_times();
if let Ok(mut writer) = self.header_text.write() {
writer.clear();
match fail_count {
None => writer.push_str("Status: Connecting ..."),
Some(v) => writer.push_str(&format!("Status: Attempt {}/5", v)),
}
self.header_text.clear();
match fail_count {
None => self.header_text.push_str("Status: Connecting ..."),
Some(v) => self.header_text.push_str(&format!("Status: Attempt {}/5", v)),
}
self.update_gui();
}
pub fn clear_times(&mut self) {
for _ in 0..10 {
self.tracker_times.pop_front();
}
// self.insert_time(Duration::new(0, 0));
if let Ok(mut writer) = self.header_text.write() {
writer.clear();
writer.push_str("Status: Disconnected");
}
self.header_text = "Status: Disconnected".to_string();
self.update_gui();
}
pub fn insert_time(&mut self, new_measurement: Duration) {
@ -54,31 +65,24 @@ impl TrackerMetrics {
let avg_time = self.tracker_times.iter().sum::<u128>() / self.tracker_times.len() as u128;
if avg_time == 0 {
if let Ok(mut writer) = self.header_text.write() {
writer.clear();
writer.push_str(&format!(
"Status: Failed Avg Response: {} ms",
avg_time.to_string()
));
}
self.header_text = format!(
"Status: Failed Avg Response: {} ms",
avg_time.to_string()
);
}
if avg_time > DEGRADED_TRACKER_TIME {
if let Ok(mut writer) = self.header_text.write() {
writer.clear();
writer.push_str(&format!(
"Status: Degraded Avg Response: {} ms",
avg_time.to_string()
));
}
self.header_text = format!(
"Status: Degraded Avg Response: {} ms",
avg_time.to_string()
);
} else {
if let Ok(mut writer) = self.header_text.write() {
writer.clear();
writer.push_str(&format!(
"Status: Nominal Avg Response: {} ms",
avg_time.to_string()
));
}
self.header_text = format!(
"Status: Nominal Avg Response: {} ms",
avg_time.to_string()
);
}
self.update_gui();
}
}

View file

@ -20,7 +20,7 @@ use tracing::instrument;
mod remote_source;
use crate::coordinator::{
tracker_state::TrackerState, ApplicationEvent, ConnectionType, SocketState,
ApplicationEvent, ConnectionType, SocketState,
};
#[instrument(skip(rt, mec))]
@ -28,7 +28,6 @@ pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
connection_state: Arc<SocketState>,
tracker_state: Arc<Mutex<TrackerState>>,
) {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
@ -49,7 +48,6 @@ pub async fn start_socketserver(
peer,
stream,
mec.clone(),
tracker_state.clone(),
));
}
@ -61,9 +59,8 @@ async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
) {
if let Err(e) = handle_connection(peer, stream, mec.clone(), tracker_state).await {
if let Err(e) = handle_connection(peer, stream, mec.clone()).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => error!("Error processing connection: {}", err),
@ -76,7 +73,6 @@ async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);

View file

@ -3,13 +3,14 @@ use std::{
sync::{Arc, Mutex},
};
use async_channel::Sender;
use gtk::{
gdk::Paintable,
prelude::{BoxExt, GestureExt, WidgetExt},
AspectFrame, Box, DrawingArea, EventControllerMotion, GestureClick, Label, Overlay, Picture,
};
use crate::coordinator::tracker_state::TrackerState;
use crate::coordinator::{tracker_state::TrackerState, ApplicationEvent};
use super::NormalizedBoxCoords;
@ -24,7 +25,7 @@ pub struct LiveViewPanel {
}
impl LiveViewPanel {
pub fn new(tracker_state: Arc<Mutex<TrackerState>>) -> Self {
pub fn new(tracker_state: Arc<Mutex<TrackerState>>, to_mec: Sender<ApplicationEvent>) -> Self {
let right_box = gtk::Box::builder()
.orientation(gtk::Orientation::Vertical)
.hexpand(true)
@ -74,7 +75,7 @@ impl LiveViewPanel {
click_handler.connect_pressed(move |gesture, _id, x, y| {
gesture.set_state(gtk::EventSequenceState::Claimed);
LiveViewPanel::click_gesture_callback(&handler_picture, &tracker_state, x, y)
LiveViewPanel::click_gesture_callback(&handler_picture, &tracker_state, &to_mec, x, y)
});
overlay_box.add_controller(click_handler);
@ -104,6 +105,7 @@ impl LiveViewPanel {
fn click_gesture_callback(
overlay: &Picture,
tracker_state: &Arc<Mutex<TrackerState>>,
to_mec: &Sender<ApplicationEvent>,
x_coord: f64,
y_coord: f64,
) {
@ -115,6 +117,9 @@ impl LiveViewPanel {
if let Ok(mut ts) = tracker_state.lock() {
if let Some(v) = calc_box_under_mouse(&ts.identity_boxes, x_coord, y_coord) {
ts.tracking_id = v;
if let Err(e) = to_mec.send_blocking(ApplicationEvent::ChangeTracking(v)) {
panic!("Could not send message to MEC, unrecoverable: {e}");
}
}
}
}

View file

@ -13,7 +13,7 @@ use tokio::sync::RwLock;
use crate::config::AppConfig;
use crate::coordinator::tracker_state::TrackerState;
use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent};
use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent, TrackerUpdate};
mod control_panel;
mod liveview_panel;
@ -28,6 +28,7 @@ pub enum GuiUpdate {
SocketConnected,
MoveEvent(MoveEvent),
UpdatePaintable(gstreamer::Element),
TrackerUpdate(TrackerUpdate),
}
#[derive(Debug, Clone, Copy)]
@ -158,16 +159,12 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
let coord_config = config.clone();
let tracker_header = Arc::new(std::sync::RwLock::new(String::new()));
runtime.spawn(start_coordinator(
mec,
to_mec.clone(),
to_gui,
runtime.clone(),
tracker_state.clone(),
coord_config,
tracker_header.clone(),
));
let control_panel = ControlPanel::new(tracker_state.clone());
@ -177,7 +174,7 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
main_box.append(&left_box);
let liveview_panel = LiveViewPanel::new(tracker_state.clone());
let liveview_panel = LiveViewPanel::new(tracker_state.clone(), to_mec.clone());
main_box.append(liveview_panel.get_top_level());
let drawable = gtk::DrawingArea::builder().build();
@ -196,6 +193,8 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
.model()
.expect("The list view should have a model!");
let tracker_state_2 = tracker_state.clone();
glib::timeout_add_local(Duration::from_millis(500), move || {
#[cfg(feature = "tracker-state-debug")]
debug!("Getting lock on tracker state for checking identity boxes");
@ -259,47 +258,12 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
glib::ControlFlow::Continue
});
let tracker_status_label = liveview_panel.tracker_status_label.clone();
let tracker_enable_toggle = control_panel
let mut tracker_status_label = liveview_panel.tracker_status_label.clone();
let mut tracker_enable_toggle = control_panel
.connection_buttons
.tracker_enable_toggle
.clone();
glib::timeout_add_seconds_local(1, move || {
if let Ok(reader) = tracker_header.read() {
tracker_status_label.set_text(reader.as_str());
if reader.contains("Failed") || reader.contains("Disconnected") {
tracker_status_label.set_css_classes(&["NoConnection"]);
tracker_enable_toggle.set_label("Connect to Tracker Computer");
tracker_enable_toggle.set_active(false);
tracker_enable_toggle.set_sensitive(true);
} else if reader.contains("Degraded") {
tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Disconnect Tracker Computer");
tracker_enable_toggle.set_active(true);
tracker_enable_toggle.set_sensitive(true);
} else if reader.contains("Connecting") {
tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Please Wait");
tracker_enable_toggle.set_active(false);
tracker_enable_toggle.set_sensitive(false);
} else if reader.contains("Nominal") {
tracker_status_label.set_css_classes(&["YesConnection"]);
tracker_enable_toggle.set_label("Disconnect Tracker Computer");
tracker_enable_toggle.set_active(true);
tracker_enable_toggle.set_sensitive(true);
}
glib::ControlFlow::Continue
} else {
error!("Couldn't get rwlock on metrics");
glib::ControlFlow::Break
}
});
glib::spawn_future_local(glib::clone!(@weak drawable => async move {
while let Ok(d) = gui_recv.recv().await {
drawable.queue_draw();
@ -337,6 +301,23 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
liveview_panel.set_paintable(&paintable);
}
GuiUpdate::TrackerUpdate(update) => match update {
TrackerUpdate::Fail => {}
TrackerUpdate::HeaderUpdate(new_header) => {
update_tracker_header(new_header, &mut tracker_status_label, &mut tracker_enable_toggle)
}
TrackerUpdate::Clear => {
if let Ok(mut ts) = tracker_state_2.lock() {
ts.clear();
}
}
TrackerUpdate::Update(update) => {
if let Ok(mut ts) = tracker_state_2.lock() {
ts.update_from_boxes(update.boxes);
ts.last_detect = update.time;
}
}
}
}
}
info!("Closing update loop");
@ -348,6 +329,36 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
window.present();
}
fn update_tracker_header(new_header: String, tracker_status_label: &mut gtk::Label, tracker_enable_toggle: &mut gtk::ToggleButton) {
tracker_status_label.set_text(&new_header);
if new_header.contains("Failed") || new_header.contains("Disconnected") {
tracker_status_label.set_css_classes(&["NoConnection"]);
tracker_enable_toggle.set_label("Connect to Tracker Computer");
tracker_enable_toggle.set_active(false);
tracker_enable_toggle.set_sensitive(true);
} else if new_header.contains("Degraded") {
tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Disconnect Tracker Computer");
tracker_enable_toggle.set_active(true);
tracker_enable_toggle.set_sensitive(true);
} else if new_header.contains("Connecting") {
tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Please Wait");
tracker_enable_toggle.set_active(false);
tracker_enable_toggle.set_sensitive(false);
} else if new_header.contains("Nominal") {
tracker_status_label.set_css_classes(&["YesConnection"]);
tracker_enable_toggle.set_label("Disconnect Tracker Computer");
tracker_enable_toggle.set_active(true);
tracker_enable_toggle.set_sensitive(true);
}
}
fn draw_boxes(width: i32, height: i32, ctx: &Context, tracker_state: &Arc<Mutex<TrackerState>>) {
ctx.set_line_width(5.0);
ctx.select_font_face(