From 400b33adb296ee5260bf732841ae006f555fd4d2 Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Sun, 15 Sep 2024 04:38:41 +0000 Subject: [PATCH] more robust connection handling --- Cargo.lock | 35 ++++++++++++++++----------------- Cargo.toml | 3 ++- src/main.rs | 52 ++++++++++++++++++++++++++++++++++++++++---------- src/web_rtc.rs | 10 ++++++++-- 4 files changed, 70 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6037a8d..282ed13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -818,9 +818,9 @@ dependencies = [ [[package]] name = "glib" -version = "0.20.2" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c1ea829497810f8e87f5ee6d05c4879af641704add879e6b6080607cceeefe4" +checksum = "95648aac01b75503000bb3bcaa5ec7a7a2dd61e43636b8b1814854de94dd80e4" dependencies = [ "bitflags 2.6.0", "futures-channel", @@ -839,9 +839,9 @@ dependencies = [ [[package]] name = "glib-macros" -version = "0.20.2" +version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "951aa19c5e89555c0ca5e94ee874b24b2594ece8412b387bd84ee3266b8a3ea0" +checksum = "302f1d633c9cdef4350330e7b68fd8016e2834bb106c93fdf9789fcde753c1ab" dependencies = [ "heck", "proc-macro-crate", @@ -1310,9 +1310,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" [[package]] name = "opaque-debug" @@ -1623,9 +1623,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853" dependencies = [ "bitflags 2.6.0", ] @@ -1757,9 +1757,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.12" +version = "0.23.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8" dependencies = [ "once_cell", "ring", @@ -1777,9 +1777,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0" [[package]] name = "rustls-webpki" -version = "0.102.7" +version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ "ring", "rustls-pki-types", @@ -2385,9 +2385,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" @@ -2400,9 +2400,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" [[package]] name = "unicode-xid" @@ -2474,6 +2474,7 @@ dependencies = [ "snafu", "tokio", "tokio-tungstenite", + "tokio-util", "toml", "tracing", "tracing-subscriber", @@ -2485,7 +2486,7 @@ dependencies = [ [[package]] name = "vcs-common" version = "0.1.0" -source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#01495ff1d8d105de485f0c746251cd471176d1cc" +source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#14b06e3351761a5eb29b9a2d35b1f5aa4212b82b" dependencies = [ "async-channel", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 10ab17c..808114c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ webrtc = "0.11.0" vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } async-channel = "2.3.1" -uuid = "1.10.0" futures-util = "0.3.30" bincode = "1.3.3" +tokio-util = "0.7.12" +uuid = "1.10.0" diff --git a/src/main.rs b/src/main.rs index be116ac..38e5a23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ use gstreamer::Buffer; use gstreamer::{prelude::ElementExt, State}; use gstreamer_app as gst_app; use tokio::{net::TcpListener, sync::Notify}; +use tokio_util::sync::CancellationToken; use tracing::{error, info, debug}; use webrtc::{ peer_connection::RTCPeerConnection, @@ -30,7 +31,6 @@ const SATELLITE_NAME: &str = "CameraSatellite_1"; #[tokio::main] async fn main() { let uuid = uuid::Uuid::new_v4().to_string(); - // TRACING SETUP let _sub = tracing_subscriber::fmt() .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) @@ -43,6 +43,8 @@ async fn main() { let to_quit_5 = to_quit.clone(); let to_quit_6 = to_quit.clone(); + let cancel_tasks = CancellationToken::new(); + // GSTREAMER SETUP if let Err(e) = gstreamer::init() { panic!("Could not start gstreamer! {e}"); @@ -57,6 +59,7 @@ async fn main() { let stream: Arc>> = Arc::new(tokio::sync::Mutex::new(stream)); + let cs1 = cancel_tasks.clone(); pipeline.sink.set_callbacks( gst_app::AppSinkCallbacks::builder() .new_sample(move |app_sink| { @@ -74,6 +77,7 @@ async fn main() { if let Err(e) = to_stream.blocking_send(buffer) { error!("Error sending to stream of buffers, it was closed: {e}"); + cs1.cancel(); to_quit_5.store(true, Ordering::SeqCst); } @@ -87,15 +91,15 @@ async fn main() { panic!("Could not start pipeline! {e}"); } + let cs2 = cancel_tasks.clone(); // set up handler for ctrl_c to quit the application tokio::spawn(async move { match tokio::signal::ctrl_c().await { Ok(()) => { + info!("Ctrl_C handler caught!"); + cs2.cancel(); to_quit_6.store(true, std::sync::atomic::Ordering::SeqCst); - // Start the gstreamer pipeline - if let Err(e) = pipeline.pipeline.set_state(State::Null) { - panic!("Could not start pipeline! {e}"); - } + // remaining cleanup handled at the end of the main loop } Err(e) => { error!("Could not watch ctrl_c signal! {e}"); @@ -113,12 +117,15 @@ async fn main() { to_mec.clone(), )); - loop { let to_quit_2 = to_quit.clone(); let to_quit_3 = to_quit.clone(); + let reset_connection = Arc::new(AtomicBool::new(false)); + let reset_connection_2 = reset_connection.clone(); + let reset_connection_3 = reset_connection.clone(); + let video_receiver_stream = stream.clone(); let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!"); info!("Started listening on 127.0.0.1:8765"); @@ -126,8 +133,17 @@ async fn main() { let (app_sender, to_core_reciever) = async_channel::bounded::(20); let (to_app_events, app_receiver) = async_channel::bounded::(20); - todo!("Need to find a way to cancel this future if to_quit is set manually"); - listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender).await; + + if to_quit_3.load(Ordering::SeqCst) { + break; + } + + tokio::select! { + _ = listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender, reset_connection_3) => {} + _ = cancel_tasks.cancelled() => { + info!("Listen_for_socket canclled by cancellation token"); + } + }; if to_quit_3.load(Ordering::SeqCst) { break; @@ -154,6 +170,7 @@ async fn main() { } } + // Send the local description to the remote if let Some(local_desc) = peer_connection.local_description().await { app_sender @@ -183,11 +200,15 @@ async fn main() { let mut stream = video_receiver_stream.lock().await; while let Some(map) = stream.recv().await { + if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) { + break; + } if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { stream.close(); break; } if let Ok(buf) = map.map_readable() { + info!("Sending bytes: {}", buf.size()); if let Err(err) = video_track.write(&buf).await { if webrtc::Error::ErrClosedPipe == err { error!("The peerConnection has been closed."); @@ -202,8 +223,7 @@ async fn main() { info!("starting webrtc connection kickoff"); // send the offer and trickle ice candidates to the remote, and accept their description if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { - to_quit.store(true, Ordering::SeqCst); - panic!("There was an issue with WebRTC setup! {e}"); + error!("There was an issue with WebRTC setup! Resetting connection: {e}"); } info!("webrtc kickoff complete, entering watch mode"); @@ -249,7 +269,19 @@ async fn main() { break; } } + ApplicationMessage::CloseConnection => { + info!("Received connection closing, breaking the loop"); + to_quit.store(true, Ordering::SeqCst); + cancel_tasks.cancel(); + + + break; + } } } } + info!("WebRTC loop exited, should be closing down\nTo_Quit: {}", to_quit.load(Ordering::SeqCst)); + if let Err(e) = pipeline.pipeline.set_state(State::Null) { + panic!("Could not start pipeline! {e}"); + } } diff --git a/src/web_rtc.rs b/src/web_rtc.rs index 5636649..1b7085a 100644 --- a/src/web_rtc.rs +++ b/src/web_rtc.rs @@ -205,7 +205,7 @@ pub async fn kickoff_connection( info!("added ice candidate"); break; } - Ok(ApplicationMessage::WebRTCIceCandidate(pkt)) => { + Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { error!("Got a non init ice candidate. Now what?"); } Ok(ApplicationMessage::NameRequest(Some(name))) => { @@ -221,6 +221,10 @@ pub async fn kickoff_connection( error!("Could not let the remote know my name! {e}"); } } + Ok(ApplicationMessage::CloseConnection) => { + info!("Close connection message received from remote, cancelling connection setup"); + return Err(Error::ErrClosedPipe); + } Ok(ApplicationMessage::ChangeTrackingID(_)) | Ok(ApplicationMessage::TrackingBoxes(_)) | Ok(ApplicationMessage::ManualMovementOverride(_)) => {} @@ -235,7 +239,7 @@ pub async fn kickoff_connection( Ok(()) } -pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender) { +pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender, reset_connection: Arc) { if let Ok((stream, _)) = listener.accept().await { match accept_async(stream).await { Err(e) => error!("Could not convert incoming stream to websocket: {e}"), @@ -282,6 +286,8 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to match msg { Err(e) => { error!("There was an error getting a message from the remote! {e}"); + reset_connection.store(true, Ordering::SeqCst); + } Ok(msg) => match msg { Message::Ping(_) | Message::Pong(_) => {}