From 64c840a2fd0f9d999986b277ba2a810f32173ff6 Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Wed, 18 Sep 2024 18:31:30 +0000 Subject: [PATCH] added can webrtc communication --- Cargo.lock | 22 +++--- src/main.rs | 191 +++++++++++++++++++++++++++++++------------------ src/web_rtc.rs | 4 +- 3 files changed, 137 insertions(+), 80 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 282ed13..f0b292d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -274,9 +274,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" [[package]] name = "cbc" @@ -289,9 +289,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.18" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" +checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" dependencies = [ "shlex", ] @@ -1310,9 +1310,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.20.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "opaque-debug" @@ -2258,9 +2258,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.22.20" +version = "0.22.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" +checksum = "3b072cee73c449a636ffd6f32bd8de3a9f7119139aff882f44943ce2986dc5cf" dependencies = [ "indexmap", "serde", @@ -2391,9 +2391,9 @@ checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" [[package]] name = "unicode-normalization" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" +checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" dependencies = [ "tinyvec", ] @@ -2486,7 +2486,7 @@ dependencies = [ [[package]] name = "vcs-common" version = "0.1.0" -source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#14b06e3351761a5eb29b9a2d35b1f5aa4212b82b" +source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#f1c42e73a701755803c239db905f98aa764ac696" dependencies = [ "async-channel", "bincode", diff --git a/src/main.rs b/src/main.rs index a7767e5..21f3be7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,8 +3,8 @@ use std::sync::{ Arc, }; -use gstreamer::{Buffer, FlowError, FlowSuccess}; use gstreamer::{prelude::ElementExt, State}; +use gstreamer::{Buffer, FlowError, FlowSuccess}; use gstreamer_app as gst_app; use tokio::{net::TcpListener, sync::Notify}; use tokio_util::sync::CancellationToken; @@ -149,6 +149,14 @@ async fn main() { drop(listener); + if let Err(e) = app_sender + .send(ApplicationMessage::ConnectionSupportsWebRTCRequest) + .await + { + error!("Could not post request webrtc support to async channel: {e}"); + break; + } + if let Err(e) = to_tec .send(TrackerEvent::ChangeMEC(app_sender.clone())) .await @@ -157,78 +165,116 @@ async fn main() { to_quit_2.store(true, Ordering::SeqCst); } - info!("Setting up web rtc pc status"); - // Set up the webrtc peer connection - let peer_connection: Arc; - let video_track: Arc; - match web_rtc::setup_webrtc(uuid.clone()).await { + let mut do_webrtc = false; + match app_receiver.recv().await { + Ok(ApplicationMessage::NameRequest(None)) => { + if let Err(e) = app_sender + .send(ApplicationMessage::NameRequest(Some( + SATELLITE_NAME.to_owned(), + ))) + .await + { + error!("Could not let the remote know my name! {e}"); + } + } + Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => { + if let Err(e) = app_sender + .send(ApplicationMessage::ConnectionSupportsWebRTC(true)) + .await + { + error!("Coudl not post request webrtc support=true to async channel, dropping connection: {e}"); + break; + } + } + Ok(ApplicationMessage::ConnectionSupportsWebRTC(state)) => { + do_webrtc = state; + } Err(e) => { - panic!("Could not set up WebRTC connection! {e}"); + error!("Could not receive incoming message from remote during webrtc status check!: {e}"); } - Ok((pc, vt)) => { - peer_connection = pc; - video_track = vt; + _ => { + info!("Received unhandled message from remote"); } } - // Send the local description to the remote - if let Some(local_desc) = peer_connection.local_description().await { - app_sender - .send(ApplicationMessage::WebRTCPacket(local_desc)) - .await - .expect("Could not send message to the socket message channel"); - } - info!("local information sent to the remote"); - - let notify_tx = Arc::new(Notify::new()); - let notify_video = notify_tx.clone(); - - setup_callbacks( - peer_connection.clone(), - to_quit_3, - app_sender.clone(), - notify_tx, - ) - .await; - info!("webrtc callbacks registered"); - - tokio::spawn(async move { - notify_video.notified().await; - - info!("Starting video stream!"); - - let mut stream = video_receiver_stream.lock().await; - - while let Some(map) = stream.recv().await { - if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) { - break; + let mut peer_connection: Option> = None; + let video_track: Arc; + if do_webrtc { + info!("Setting up web rtc pc status"); + // Set up the webrtc peer connection + match web_rtc::setup_webrtc(uuid.clone()).await { + Err(e) => { + panic!("Could not set up WebRTC connection! {e}"); } - if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { - stream.close(); - break; + Ok((pc, vt)) => { + peer_connection = Some(pc); + video_track = vt; } - if let Ok(buf) = map.map_readable() { - info!("Sending bytes: {}", buf.size()); - if let Err(err) = video_track.write(&buf).await { - if webrtc::Error::ErrClosedPipe == err { - error!("The peerConnection has been closed."); - } else { - error!("video_track write err: {}", err); - } + } + + // Send the local description to the remote + if let Some(local_desc) = peer_connection.as_ref().unwrap().local_description().await { + app_sender + .send(ApplicationMessage::WebRTCPacket(local_desc)) + .await + .expect("Could not send message to the socket message channel"); + } + info!("local information sent to the remote"); + + let notify_tx = Arc::new(Notify::new()); + let notify_video = notify_tx.clone(); + + setup_callbacks( + peer_connection.as_mut().unwrap().clone(), + to_quit_3, + app_sender.clone(), + notify_tx, + ) + .await; + info!("webrtc callbacks registered"); + + tokio::spawn(async move { + notify_video.notified().await; + + info!("Starting video stream!"); + + let mut stream = video_receiver_stream.lock().await; + + while let Some(map) = stream.recv().await { + if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) { + break; } - }; + if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { + stream.close(); + break; + } + if let Ok(buf) = map.map_readable() { + info!("Sending bytes: {}", buf.size()); + if let Err(err) = video_track.write(&buf).await { + if webrtc::Error::ErrClosedPipe == err { + error!("The peerConnection has been closed."); + } else { + error!("video_track write err: {}", err); + } + } + }; + } + }); + + info!("starting webrtc connection kickoff"); + // send the offer and trickle ice candidates to the remote, and accept their description + if let Err(e) = kickoff_connection( + peer_connection.as_ref().unwrap(), + app_sender.clone(), + &app_receiver, + ) + .await + { + error!("There was an issue with WebRTC setup! Resetting connection: {e}"); } - }); - info!("starting webrtc connection kickoff"); - // send the offer and trickle ice candidates to the remote, and accept their description - if let Err(e) = - kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await - { - error!("There was an issue with WebRTC setup! Resetting connection: {e}"); - } - - info!("webrtc kickoff complete, entering watch mode"); + info!("webrtc kickoff complete, entering watch mode"); + } // end supports_webrtc portion // loop through messages coming from the remote. loop { @@ -243,12 +289,20 @@ async fn main() { break; } match msg { + ApplicationMessage::ConnectionSupportsWebRTC(_) => {}, + ApplicationMessage::ConnectionSupportsWebRTCRequest => { + if let Err(e) = app_sender.send(ApplicationMessage::ConnectionSupportsWebRTC(do_webrtc)).await { + error!("Could not post connection supports webrtc status to async channel!: {e}"); + } + } ApplicationMessage::WebRTCPacket(_pkt) => { error!("don't know what to do with this packet!"); } ApplicationMessage::WebRTCIceCandidateInit(pkt) => { - if let Err(e) = peer_connection.add_ice_candidate(pkt).await { - error!("There was an error adding the trickle ICE candidate! {e}"); + if peer_connection.is_some() { + if let Err(e) = peer_connection.as_ref().unwrap().add_ice_candidate(pkt).await { + error!("There was an error adding the trickle ICE candidate! {e}"); + } } } ApplicationMessage::WebRTCIceCandidate(_pkg) => { @@ -294,13 +348,14 @@ async fn main() { } } - if let Err(e) = peer_connection.close().await { - error!("Got an error while closing the webrtc connection! {}", e); + if peer_connection.is_some() { + if let Err(e) = peer_connection.as_mut().unwrap().close().await { + error!("Got an error while closing the webrtc connection! {}", e); + } } app_sender.close(); app_receiver.close(); info!("Loop is exiting!"); - } info!( "WebRTC loop exited, should be closing down\nTo_Quit: {}", diff --git a/src/web_rtc.rs b/src/web_rtc.rs index c540e65..487aa3a 100644 --- a/src/web_rtc.rs +++ b/src/web_rtc.rs @@ -227,7 +227,9 @@ pub async fn kickoff_connection( } Ok(ApplicationMessage::ChangeTrackingID(_)) | Ok(ApplicationMessage::TrackingBoxes(_)) - | Ok(ApplicationMessage::ManualMovementOverride(_)) => {} + | Ok(ApplicationMessage::ManualMovementOverride(_)) + | Ok(ApplicationMessage::ConnectionSupportsWebRTC(_)) + | Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => {} Err(e) => { panic!("WC channel was closed?! {e}"); }