more robust connection handling

This commit is contained in:
Nickiel12 2024-09-15 04:38:41 +00:00
parent 996e991ee4
commit 400b33adb2
4 changed files with 70 additions and 30 deletions

35
Cargo.lock generated
View file

@ -818,9 +818,9 @@ dependencies = [
[[package]] [[package]]
name = "glib" name = "glib"
version = "0.20.2" version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c1ea829497810f8e87f5ee6d05c4879af641704add879e6b6080607cceeefe4" checksum = "95648aac01b75503000bb3bcaa5ec7a7a2dd61e43636b8b1814854de94dd80e4"
dependencies = [ dependencies = [
"bitflags 2.6.0", "bitflags 2.6.0",
"futures-channel", "futures-channel",
@ -839,9 +839,9 @@ dependencies = [
[[package]] [[package]]
name = "glib-macros" name = "glib-macros"
version = "0.20.2" version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "951aa19c5e89555c0ca5e94ee874b24b2594ece8412b387bd84ee3266b8a3ea0" checksum = "302f1d633c9cdef4350330e7b68fd8016e2834bb106c93fdf9789fcde753c1ab"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro-crate", "proc-macro-crate",
@ -1310,9 +1310,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.19.0" version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
@ -1623,9 +1623,9 @@ dependencies = [
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.5.3" version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" checksum = "0884ad60e090bf1345b93da0a5de8923c93884cd03f40dfcfddd3b4bee661853"
dependencies = [ dependencies = [
"bitflags 2.6.0", "bitflags 2.6.0",
] ]
@ -1757,9 +1757,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.23.12" version = "0.23.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" checksum = "f2dabaac7466917e566adb06783a81ca48944c6898a1b08b9374106dd671f4c8"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"ring", "ring",
@ -1777,9 +1777,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.102.7" version = "0.102.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56" checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9"
dependencies = [ dependencies = [
"ring", "ring",
"rustls-pki-types", "rustls-pki-types",
@ -2385,9 +2385,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.12" version = "1.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]] [[package]]
name = "unicode-normalization" name = "unicode-normalization"
@ -2400,9 +2400,9 @@ dependencies = [
[[package]] [[package]]
name = "unicode-segmentation" name = "unicode-segmentation"
version = "1.11.0" version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493"
[[package]] [[package]]
name = "unicode-xid" name = "unicode-xid"
@ -2474,6 +2474,7 @@ dependencies = [
"snafu", "snafu",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tokio-util",
"toml", "toml",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
@ -2485,7 +2486,7 @@ dependencies = [
[[package]] [[package]]
name = "vcs-common" name = "vcs-common"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#01495ff1d8d105de485f0c746251cd471176d1cc" source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#14b06e3351761a5eb29b9a2d35b1f5aa4212b82b"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"bincode", "bincode",

View file

@ -23,6 +23,7 @@ webrtc = "0.11.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
async-channel = "2.3.1" async-channel = "2.3.1"
uuid = "1.10.0"
futures-util = "0.3.30" futures-util = "0.3.30"
bincode = "1.3.3" bincode = "1.3.3"
tokio-util = "0.7.12"
uuid = "1.10.0"

View file

@ -7,6 +7,7 @@ use gstreamer::Buffer;
use gstreamer::{prelude::ElementExt, State}; use gstreamer::{prelude::ElementExt, State};
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use tokio::{net::TcpListener, sync::Notify}; use tokio::{net::TcpListener, sync::Notify};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, debug}; use tracing::{error, info, debug};
use webrtc::{ use webrtc::{
peer_connection::RTCPeerConnection, peer_connection::RTCPeerConnection,
@ -30,7 +31,6 @@ const SATELLITE_NAME: &str = "CameraSatellite_1";
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let uuid = uuid::Uuid::new_v4().to_string(); let uuid = uuid::Uuid::new_v4().to_string();
// TRACING SETUP // TRACING SETUP
let _sub = tracing_subscriber::fmt() let _sub = tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
@ -43,6 +43,8 @@ async fn main() {
let to_quit_5 = to_quit.clone(); let to_quit_5 = to_quit.clone();
let to_quit_6 = to_quit.clone(); let to_quit_6 = to_quit.clone();
let cancel_tasks = CancellationToken::new();
// GSTREAMER SETUP // GSTREAMER SETUP
if let Err(e) = gstreamer::init() { if let Err(e) = gstreamer::init() {
panic!("Could not start gstreamer! {e}"); panic!("Could not start gstreamer! {e}");
@ -57,6 +59,7 @@ async fn main() {
let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> = Arc::new(tokio::sync::Mutex::new(stream)); let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> = Arc::new(tokio::sync::Mutex::new(stream));
let cs1 = cancel_tasks.clone();
pipeline.sink.set_callbacks( pipeline.sink.set_callbacks(
gst_app::AppSinkCallbacks::builder() gst_app::AppSinkCallbacks::builder()
.new_sample(move |app_sink| { .new_sample(move |app_sink| {
@ -74,6 +77,7 @@ async fn main() {
if let Err(e) = to_stream.blocking_send(buffer) { if let Err(e) = to_stream.blocking_send(buffer) {
error!("Error sending to stream of buffers, it was closed: {e}"); error!("Error sending to stream of buffers, it was closed: {e}");
cs1.cancel();
to_quit_5.store(true, Ordering::SeqCst); to_quit_5.store(true, Ordering::SeqCst);
} }
@ -87,15 +91,15 @@ async fn main() {
panic!("Could not start pipeline! {e}"); panic!("Could not start pipeline! {e}");
} }
let cs2 = cancel_tasks.clone();
// set up handler for ctrl_c to quit the application // set up handler for ctrl_c to quit the application
tokio::spawn(async move { tokio::spawn(async move {
match tokio::signal::ctrl_c().await { match tokio::signal::ctrl_c().await {
Ok(()) => { Ok(()) => {
info!("Ctrl_C handler caught!");
cs2.cancel();
to_quit_6.store(true, std::sync::atomic::Ordering::SeqCst); to_quit_6.store(true, std::sync::atomic::Ordering::SeqCst);
// Start the gstreamer pipeline // remaining cleanup handled at the end of the main loop
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
panic!("Could not start pipeline! {e}");
}
} }
Err(e) => { Err(e) => {
error!("Could not watch ctrl_c signal! {e}"); error!("Could not watch ctrl_c signal! {e}");
@ -114,11 +118,14 @@ async fn main() {
)); ));
loop { loop {
let to_quit_2 = to_quit.clone(); let to_quit_2 = to_quit.clone();
let to_quit_3 = to_quit.clone(); let to_quit_3 = to_quit.clone();
let reset_connection = Arc::new(AtomicBool::new(false));
let reset_connection_2 = reset_connection.clone();
let reset_connection_3 = reset_connection.clone();
let video_receiver_stream = stream.clone(); let video_receiver_stream = stream.clone();
let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!"); let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!");
info!("Started listening on 127.0.0.1:8765"); info!("Started listening on 127.0.0.1:8765");
@ -126,8 +133,17 @@ async fn main() {
let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20); let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20);
let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20); let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20);
todo!("Need to find a way to cancel this future if to_quit is set manually");
listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender).await; if to_quit_3.load(Ordering::SeqCst) {
break;
}
tokio::select! {
_ = listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender, reset_connection_3) => {}
_ = cancel_tasks.cancelled() => {
info!("Listen_for_socket canclled by cancellation token");
}
};
if to_quit_3.load(Ordering::SeqCst) { if to_quit_3.load(Ordering::SeqCst) {
break; break;
@ -154,6 +170,7 @@ async fn main() {
} }
} }
// Send the local description to the remote // Send the local description to the remote
if let Some(local_desc) = peer_connection.local_description().await { if let Some(local_desc) = peer_connection.local_description().await {
app_sender app_sender
@ -183,11 +200,15 @@ async fn main() {
let mut stream = video_receiver_stream.lock().await; let mut stream = video_receiver_stream.lock().await;
while let Some(map) = stream.recv().await { while let Some(map) = stream.recv().await {
if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) {
break;
}
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
stream.close(); stream.close();
break; break;
} }
if let Ok(buf) = map.map_readable() { if let Ok(buf) = map.map_readable() {
info!("Sending bytes: {}", buf.size());
if let Err(err) = video_track.write(&buf).await { if let Err(err) = video_track.write(&buf).await {
if webrtc::Error::ErrClosedPipe == err { if webrtc::Error::ErrClosedPipe == err {
error!("The peerConnection has been closed."); error!("The peerConnection has been closed.");
@ -202,8 +223,7 @@ async fn main() {
info!("starting webrtc connection kickoff"); info!("starting webrtc connection kickoff");
// send the offer and trickle ice candidates to the remote, and accept their description // send the offer and trickle ice candidates to the remote, and accept their description
if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await {
to_quit.store(true, Ordering::SeqCst); error!("There was an issue with WebRTC setup! Resetting connection: {e}");
panic!("There was an issue with WebRTC setup! {e}");
} }
info!("webrtc kickoff complete, entering watch mode"); info!("webrtc kickoff complete, entering watch mode");
@ -249,7 +269,19 @@ async fn main() {
break; break;
} }
} }
ApplicationMessage::CloseConnection => {
info!("Received connection closing, breaking the loop");
to_quit.store(true, Ordering::SeqCst);
cancel_tasks.cancel();
break;
} }
} }
} }
} }
info!("WebRTC loop exited, should be closing down\nTo_Quit: {}", to_quit.load(Ordering::SeqCst));
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
panic!("Could not start pipeline! {e}");
}
}

View file

@ -205,7 +205,7 @@ pub async fn kickoff_connection(
info!("added ice candidate"); info!("added ice candidate");
break; break;
} }
Ok(ApplicationMessage::WebRTCIceCandidate(pkt)) => { Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => {
error!("Got a non init ice candidate. Now what?"); error!("Got a non init ice candidate. Now what?");
} }
Ok(ApplicationMessage::NameRequest(Some(name))) => { Ok(ApplicationMessage::NameRequest(Some(name))) => {
@ -221,6 +221,10 @@ pub async fn kickoff_connection(
error!("Could not let the remote know my name! {e}"); error!("Could not let the remote know my name! {e}");
} }
} }
Ok(ApplicationMessage::CloseConnection) => {
info!("Close connection message received from remote, cancelling connection setup");
return Err(Error::ErrClosedPipe);
}
Ok(ApplicationMessage::ChangeTrackingID(_)) Ok(ApplicationMessage::ChangeTrackingID(_))
| Ok(ApplicationMessage::TrackingBoxes(_)) | Ok(ApplicationMessage::TrackingBoxes(_))
| Ok(ApplicationMessage::ManualMovementOverride(_)) => {} | Ok(ApplicationMessage::ManualMovementOverride(_)) => {}
@ -235,7 +239,7 @@ pub async fn kickoff_connection(
Ok(()) Ok(())
} }
pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender) { pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender, reset_connection: Arc<AtomicBool>) {
if let Ok((stream, _)) = listener.accept().await { if let Ok((stream, _)) = listener.accept().await {
match accept_async(stream).await { match accept_async(stream).await {
Err(e) => error!("Could not convert incoming stream to websocket: {e}"), Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
@ -282,6 +286,8 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to
match msg { match msg {
Err(e) => { Err(e) => {
error!("There was an error getting a message from the remote! {e}"); error!("There was an error getting a message from the remote! {e}");
reset_connection.store(true, Ordering::SeqCst);
} }
Ok(msg) => match msg { Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {} Message::Ping(_) | Message::Pong(_) => {}