Compare commits

...

5 commits

Author SHA1 Message Date
Nickiel12
a3d8a6fd06 bumped up degraded time 2024-06-03 11:11:02 -07:00
Nickiel12
c226d10d58 fixed issue causing degraded to disable button 2024-06-03 11:10:28 -07:00
Nickiel12
8b397a9b05 re-tied tracker loop to send/listen cycle 2024-06-03 11:10:15 -07:00
Nickiel12
b9969f746c removed queue from appsink 2024-06-03 11:09:31 -07:00
Nickiel12
e9a125500e added todo list 2024-06-03 09:29:51 -07:00
6 changed files with 83 additions and 89 deletions

View file

@ -186,7 +186,6 @@ impl<'a> CoordState<'a> {
self.tracker_connection_state.clone(), self.tracker_connection_state.clone(),
self.tracker_state.clone(), self.tracker_state.clone(),
self.tracker_metrics.clone(), self.tracker_metrics.clone(),
self.rt.clone(),
)); ));
} }

View file

@ -1,7 +1,7 @@
use std::{collections::VecDeque, sync, time::Duration}; use std::{collections::VecDeque, sync, time::Duration};
const MAX_RECORDED_TIMES: usize = 10; const MAX_RECORDED_TIMES: usize = 10;
const DEGRADED_TRACKER_TIME: u128 = 100; const DEGRADED_TRACKER_TIME: u128 = 150;
#[derive(Debug)] #[derive(Debug)]
pub struct TrackerMetrics { pub struct TrackerMetrics {

View file

@ -5,15 +5,11 @@ use std::{
}; };
use async_channel::Sender; use async_channel::Sender;
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt}; use futures_util::{SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink; use gstreamer_app::AppSink;
use gstreamer_video::{video_frame::Readable, VideoFrame, VideoInfo}; use gstreamer_video::{video_frame::Readable, VideoFrame, VideoInfo};
use tokio::{ use tokio::time::{sleep_until, Instant};
net::TcpStream, use tokio_tungstenite::{connect_async, tungstenite::Message};
runtime::Handle,
time::{sleep_until, Instant},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{error, info, instrument}; use tracing::{error, info, instrument};
use crate::remote_sources::TrackerState; use crate::remote_sources::TrackerState;
@ -31,7 +27,6 @@ pub async fn remote_video_loop(
socket_state: Arc<SocketState>, socket_state: Arc<SocketState>,
tracker_state: Arc<Mutex<TrackerState>>, tracker_state: Arc<Mutex<TrackerState>>,
tracker_metrics: Arc<tokio::sync::Mutex<TrackerMetrics>>, tracker_metrics: Arc<tokio::sync::Mutex<TrackerMetrics>>,
runtime: Handle,
) { ) {
info!( info!(
"Starting remote tracker processing connection to: {}", "Starting remote tracker processing connection to: {}",
@ -68,14 +63,14 @@ pub async fn remote_video_loop(
sleep_until(Instant::now() + Duration::from_secs(1)).await; sleep_until(Instant::now() + Duration::from_secs(1)).await;
} }
Ok((connection, _)) => { Ok((connection, _)) => {
let (mut sender, recvr) = connection.split(); let (mut sender, mut recvr) = connection.split();
runtime.spawn(listen_to_messages( // runtime.spawn(listen_to_messages(
recvr, // recvr,
to_mec.clone(), // to_mec.clone(),
tracker_state.clone(), // tracker_state.clone(),
socket_state.clone(), // socket_state.clone(),
)); // ));
let mut last_iter: Instant; let mut last_iter: Instant;
@ -100,7 +95,7 @@ pub async fn remote_video_loop(
error!("Could not close socket to remote computer: {e}") error!("Could not close socket to remote computer: {e}")
} }
socket_state.is_connected.store(false, Ordering::SeqCst); socket_state.is_connected.store(false, Ordering::SeqCst);
return; break;
} }
}; };
@ -118,64 +113,9 @@ pub async fn remote_video_loop(
} }
socket_state.is_connected.store(false, Ordering::SeqCst); socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst); socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
}
if !socket_state.stay_connected.load(Ordering::SeqCst) {
info!("Shutting down remote video loop");
break; break;
} }
{
let mut tm = tracker_metrics.lock().await;
tm.insert_time(Instant::now() - last_iter);
}
// rate limit updates
sleep_until(Instant::now() + Duration::from_millis(50)).await;
}
}
}
if !socket_state.stay_connected.load(Ordering::SeqCst) {
break;
}
}
info!("Shutting down remote video loop");
{
let mut tm = tracker_metrics.lock().await;
tm.clear_times();
}
{
if let Ok(mut ts) = tracker_state.lock() {
ts.clear();
}
// This message forces a redraw after clearing the queue
if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent(
crate::coordinator::MoveEvent { x: 0, y: 0 },
crate::coordinator::ConnectionType::Automated,
))
.await
{
error!(
"Error sending message to MEC during shutdown of tracker thread: {}",
e
);
}
}
socket_state.is_connected.store(false, Ordering::SeqCst);
}
#[instrument]
async fn listen_to_messages(
mut recvr: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
to_mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
socket_state: Arc<SocketState>,
) {
info!("Starting tracker connection listen");
while socket_state.stay_connected.load(Ordering::SeqCst) {
match recvr.try_next().await { match recvr.try_next().await {
Ok(Some(message)) => { Ok(Some(message)) => {
let (x_off, y_off, _do_send) = let (x_off, y_off, _do_send) =
@ -210,13 +150,54 @@ async fn listen_to_messages(
error!("Got an error on while recieving from remote computer: {e}"); error!("Got an error on while recieving from remote computer: {e}");
} }
} }
if !socket_state.stay_connected.load(Ordering::SeqCst) {
info!("Shutting down remote video loop");
break;
} }
info!(
"Stopping tracker listen connection with keep alive: {}", {
socket_state.stay_connected.load(Ordering::SeqCst) let mut tm = tracker_metrics.lock().await;
tm.insert_time(Instant::now() - last_iter);
}
// rate limit updates
// sleep_until(Instant::now() + Duration::from_millis(100)).await;
}
}
}
if !socket_state.stay_connected.load(Ordering::SeqCst) {
break;
}
}
info!("Shutting down remote video loop");
{
let mut tm = tracker_metrics.lock().await;
tm.clear_times();
}
{
if let Ok(mut ts) = tracker_state.lock() {
ts.clear();
}
// This message forces a redraw after clearing the queue
if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent(
crate::coordinator::MoveEvent { x: 0, y: 0 },
crate::coordinator::ConnectionType::Automated,
))
.await
{
error!(
"Error sending message to MEC during shutdown of tracker thread: {}",
e
); );
}
}
socket_state.is_connected.store(false, Ordering::SeqCst);
} }
fn get_video_frame( fn get_video_frame(
appsink: &AppSink, appsink: &AppSink,
video_info: &VideoInfo, video_info: &VideoInfo,

View file

@ -16,7 +16,6 @@ pub struct WebcamPipeline {
pub queue_app: Element, pub queue_app: Element,
pub sink_paintable: Element, pub sink_paintable: Element,
pub queue: Element,
pub resize: Element, pub resize: Element,
pub sink_frame: Arc<Mutex<AppSink>>, pub sink_frame: Arc<Mutex<AppSink>>,
} }
@ -47,7 +46,12 @@ impl WebcamPipeline {
.build() .build()
.context(BuildSnafu { element: "tee" })?; .context(BuildSnafu { element: "tee" })?;
let queue_app = ElementFactory::make("queue").build().context(BuildSnafu { let queue_app = ElementFactory::make("queue")
.property("max-size-time", 1u64)
.property("max-size-buffers", 0u32)
.property("max-size-bytes", 0u32)
.build()
.context(BuildSnafu {
element: "paintable queue", element: "paintable queue",
})?; })?;
let sink_paintable = ElementFactory::make("gtk4paintablesink") let sink_paintable = ElementFactory::make("gtk4paintablesink")
@ -57,9 +61,9 @@ impl WebcamPipeline {
element: "gtkpaintablesink", element: "gtkpaintablesink",
})?; })?;
let queue = ElementFactory::make("queue").build().context(BuildSnafu { // queue.connect_closure("overrun", false, glib::closure!(|queue: Element| {
element: "appsink queue", // println!("The queue is full!");
})?; // }));
let resize = ElementFactory::make("videoscale") let resize = ElementFactory::make("videoscale")
.build() .build()
@ -67,7 +71,7 @@ impl WebcamPipeline {
element: "videoscale", element: "videoscale",
})?; })?;
let caps_string = "video/x-raw,format=RGB,width=640,height=480,max-buffers=1,drop=true"; let caps_string = "video/x-raw,format=RGB,width=640,height=480";
// let caps_string = String::from("video/x-raw,format=RGB,max-buffers=1,drop=true"); // let caps_string = String::from("video/x-raw,format=RGB,max-buffers=1,drop=true");
let appsrc_caps = gstreamer::Caps::from_str(caps_string).context(BuildSnafu { let appsrc_caps = gstreamer::Caps::from_str(caps_string).context(BuildSnafu {
element: "appsink caps", element: "appsink caps",
@ -76,7 +80,7 @@ impl WebcamPipeline {
let sink_frame = AppSink::builder() let sink_frame = AppSink::builder()
.name("frame_output") .name("frame_output")
.sync(false) .sync(false)
.max_buffers(1u32) .max_buffers(3u32)
.drop(true) .drop(true)
.caps(&appsrc_caps) .caps(&appsrc_caps)
.build(); .build();
@ -92,7 +96,6 @@ impl WebcamPipeline {
&queue_app, &queue_app,
&sink_paintable, &sink_paintable,
&resize, &resize,
&queue,
&sink_frame.upcast_ref(), &sink_frame.upcast_ref(),
]) ])
.context(LinkSnafu { .context(LinkSnafu {
@ -147,23 +150,19 @@ impl WebcamPipeline {
.ok_or(PipelineError::PadRequestError { .ok_or(PipelineError::PadRequestError {
element: "tee pad 2".to_string(), element: "tee pad 2".to_string(),
})?; })?;
let appsink_queue_sinkpad = let appsink_resize_sinkpad =
queue resize
.static_pad("sink") .static_pad("sink")
.ok_or(PipelineError::PadRequestError { .ok_or(PipelineError::PadRequestError {
element: "appsink queue".to_string(), element: "appsink queue".to_string(),
})?; })?;
tee_src_2 tee_src_2
.link(&appsink_queue_sinkpad) .link(&appsink_resize_sinkpad)
.context(PadLinkSnafu { .context(PadLinkSnafu {
from: "tee src pad 2", from: "tee src pad 2",
to: "appsink queue sinkpad", to: "appsink queue sinkpad",
})?; })?;
queue.link(&resize).context(LinkSnafu {
from: "appsink queue",
to: "videoscale",
})?;
resize.link(&sink_frame).context(LinkSnafu { resize.link(&sink_frame).context(LinkSnafu {
from: "videoscale", from: "videoscale",
to: "appsink", to: "appsink",
@ -177,7 +176,6 @@ impl WebcamPipeline {
queue_app, queue_app,
sink_paintable, sink_paintable,
resize, resize,
queue,
sink_frame: Arc::new(Mutex::new(sink_frame)), sink_frame: Arc::new(Mutex::new(sink_frame)),
}) })
} }

View file

@ -274,7 +274,13 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
tracker_enable_toggle.set_label("Connect to Tracker Computer"); tracker_enable_toggle.set_label("Connect to Tracker Computer");
tracker_enable_toggle.set_active(false); tracker_enable_toggle.set_active(false);
tracker_enable_toggle.set_sensitive(true); tracker_enable_toggle.set_sensitive(true);
} else if reader.contains("Degraded") || reader.contains("Connecting") { } else if reader.contains("Degraded") {
tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Disconnect Tracker Computer");
tracker_enable_toggle.set_active(true);
tracker_enable_toggle.set_sensitive(true);
} else if reader.contains("Connecting") {
tracker_status_label.set_css_classes(&["LoadingConnection"]); tracker_status_label.set_css_classes(&["LoadingConnection"]);
tracker_enable_toggle.set_label("Please Wait"); tracker_enable_toggle.set_label("Please Wait");
@ -354,7 +360,6 @@ fn draw_boxes(width: i32, height: i32, ctx: &Context, tracker_state: &Arc<Mutex<
#[cfg(feature = "tracker-state-debug")] #[cfg(feature = "tracker-state-debug")]
debug!("Getting tracker state for drawing boxes"); debug!("Getting tracker state for drawing boxes");
if let Ok(ts) = tracker_state.lock() { if let Ok(ts) = tracker_state.lock() {
println!("ts currently tracking: {}", ts.tracking_id);
let active = ts.tracking_id; let active = ts.tracking_id;
let highlighted_id = ts.highlighted_id.unwrap_or(0); let highlighted_id = ts.highlighted_id.unwrap_or(0);
for nb in ts.identity_boxes.iter() { for nb in ts.identity_boxes.iter() {

11
todo.md Normal file
View file

@ -0,0 +1,11 @@
# Functional
- Up-direction maxes at -50 instead of 100
- Tracking lag real issue
## QoL
- Fine tuning the tracking speeds to be non-linear, make sure the pi doesn't have that speed cap (remember could be expecting 6v max speed).
- During connection waits, add loading dots
## Future ideas
- person recognition