re-tied tracker loop to send/listen cycle

This commit is contained in:
Nickiel12 2024-06-03 11:10:15 -07:00
parent b9969f746c
commit 8b397a9b05
2 changed files with 48 additions and 68 deletions

View file

@ -186,7 +186,6 @@ impl<'a> CoordState<'a> {
self.tracker_connection_state.clone(),
self.tracker_state.clone(),
self.tracker_metrics.clone(),
self.rt.clone(),
));
}

View file

@ -5,15 +5,11 @@ use std::{
};
use async_channel::Sender;
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt};
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink;
use gstreamer_video::{video_frame::Readable, VideoFrame, VideoInfo};
use tokio::{
net::TcpStream,
runtime::Handle,
time::{sleep_until, Instant},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tokio::time::{sleep_until, Instant};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{error, info, instrument};
use crate::remote_sources::TrackerState;
@ -31,7 +27,6 @@ pub async fn remote_video_loop(
socket_state: Arc<SocketState>,
tracker_state: Arc<Mutex<TrackerState>>,
tracker_metrics: Arc<tokio::sync::Mutex<TrackerMetrics>>,
runtime: Handle,
) {
info!(
"Starting remote tracker processing connection to: {}",
@ -68,14 +63,14 @@ pub async fn remote_video_loop(
sleep_until(Instant::now() + Duration::from_secs(1)).await;
}
Ok((connection, _)) => {
let (mut sender, recvr) = connection.split();
let (mut sender, mut recvr) = connection.split();
runtime.spawn(listen_to_messages(
recvr,
to_mec.clone(),
tracker_state.clone(),
socket_state.clone(),
));
// runtime.spawn(listen_to_messages(
// recvr,
// to_mec.clone(),
// tracker_state.clone(),
// socket_state.clone(),
// ));
let mut last_iter: Instant;
@ -100,7 +95,7 @@ pub async fn remote_video_loop(
error!("Could not close socket to remote computer: {e}")
}
socket_state.is_connected.store(false, Ordering::SeqCst);
return;
break;
}
};
@ -118,7 +113,42 @@ pub async fn remote_video_loop(
}
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
break;
}
match recvr.try_next().await {
Ok(Some(message)) => {
let (x_off, y_off, _do_send) =
process_incoming_string(message.to_string(), &tracker_state)
.and_then(|_| calculate_tracking(&tracker_state))
.unwrap_or((0, 0, false));
let do_send = true;
// For some reason, this do_send is inverted from what it should be
// info!("Do Send is: {}", do_send.to_string());
if do_send {
if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent(
super::MoveEvent { x: x_off, y: y_off },
super::ConnectionType::Automated,
))
.await
{
error!("Could not send message to MEC, assuming critical failure: {e}");
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
}
}
}
Ok(None) => {
info!("Recieved an empty message from the remote computer: Aborting");
break;
}
Err(e) => {
error!("Got an error on while recieving from remote computer: {e}");
}
}
if !socket_state.stay_connected.load(Ordering::SeqCst) {
@ -132,7 +162,7 @@ pub async fn remote_video_loop(
}
// rate limit updates
sleep_until(Instant::now() + Duration::from_millis(50)).await;
// sleep_until(Instant::now() + Duration::from_millis(100)).await;
}
}
}
@ -167,55 +197,6 @@ pub async fn remote_video_loop(
socket_state.is_connected.store(false, Ordering::SeqCst);
}
#[instrument]
async fn listen_to_messages(
mut recvr: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
to_mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
socket_state: Arc<SocketState>,
) {
info!("Starting tracker connection listen");
while socket_state.stay_connected.load(Ordering::SeqCst) {
match recvr.try_next().await {
Ok(Some(message)) => {
let (x_off, y_off, _do_send) =
process_incoming_string(message.to_string(), &tracker_state)
.and_then(|_| calculate_tracking(&tracker_state))
.unwrap_or((0, 0, false));
let do_send = true;
// For some reason, this do_send is inverted from what it should be
// info!("Do Send is: {}", do_send.to_string());
if do_send {
if let Err(e) = to_mec
.send(ApplicationEvent::MoveEvent(
super::MoveEvent { x: x_off, y: y_off },
super::ConnectionType::Automated,
))
.await
{
error!("Could not send message to MEC, assuming critical failure: {e}");
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
}
}
}
Ok(None) => {
info!("Recieved an empty message from the remote computer: Aborting");
break;
}
Err(e) => {
error!("Got an error on while recieving from remote computer: {e}");
}
}
}
info!(
"Stopping tracker listen connection with keep alive: {}",
socket_state.stay_connected.load(Ordering::SeqCst)
);
}
fn get_video_frame(
appsink: &AppSink,