added can webrtc communication

This commit is contained in:
Nickiel12 2024-09-18 18:31:30 +00:00
parent 98c1a66f95
commit 64c840a2fd
3 changed files with 137 additions and 80 deletions

22
Cargo.lock generated
View file

@ -274,9 +274,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.7.1" version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
[[package]] [[package]]
name = "cbc" name = "cbc"
@ -289,9 +289,9 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.1.18" version = "1.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476" checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0"
dependencies = [ dependencies = [
"shlex", "shlex",
] ]
@ -1310,9 +1310,9 @@ dependencies = [
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.20.0" version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]] [[package]]
name = "opaque-debug" name = "opaque-debug"
@ -2258,9 +2258,9 @@ dependencies = [
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.22.20" version = "0.22.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d" checksum = "3b072cee73c449a636ffd6f32bd8de3a9f7119139aff882f44943ce2986dc5cf"
dependencies = [ dependencies = [
"indexmap", "indexmap",
"serde", "serde",
@ -2391,9 +2391,9 @@ checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
[[package]] [[package]]
name = "unicode-normalization" name = "unicode-normalization"
version = "0.1.23" version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5" checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
dependencies = [ dependencies = [
"tinyvec", "tinyvec",
] ]
@ -2486,7 +2486,7 @@ dependencies = [
[[package]] [[package]]
name = "vcs-common" name = "vcs-common"
version = "0.1.0" version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#14b06e3351761a5eb29b9a2d35b1f5aa4212b82b" source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#f1c42e73a701755803c239db905f98aa764ac696"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"bincode", "bincode",

View file

@ -3,8 +3,8 @@ use std::sync::{
Arc, Arc,
}; };
use gstreamer::{Buffer, FlowError, FlowSuccess};
use gstreamer::{prelude::ElementExt, State}; use gstreamer::{prelude::ElementExt, State};
use gstreamer::{Buffer, FlowError, FlowSuccess};
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use tokio::{net::TcpListener, sync::Notify}; use tokio::{net::TcpListener, sync::Notify};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@ -149,6 +149,14 @@ async fn main() {
drop(listener); drop(listener);
if let Err(e) = app_sender
.send(ApplicationMessage::ConnectionSupportsWebRTCRequest)
.await
{
error!("Could not post request webrtc support to async channel: {e}");
break;
}
if let Err(e) = to_tec if let Err(e) = to_tec
.send(TrackerEvent::ChangeMEC(app_sender.clone())) .send(TrackerEvent::ChangeMEC(app_sender.clone()))
.await .await
@ -157,78 +165,116 @@ async fn main() {
to_quit_2.store(true, Ordering::SeqCst); to_quit_2.store(true, Ordering::SeqCst);
} }
info!("Setting up web rtc pc status"); let mut do_webrtc = false;
// Set up the webrtc peer connection match app_receiver.recv().await {
let peer_connection: Arc<RTCPeerConnection>; Ok(ApplicationMessage::NameRequest(None)) => {
let video_track: Arc<TrackLocalStaticRTP>; if let Err(e) = app_sender
match web_rtc::setup_webrtc(uuid.clone()).await { .send(ApplicationMessage::NameRequest(Some(
SATELLITE_NAME.to_owned(),
)))
.await
{
error!("Could not let the remote know my name! {e}");
}
}
Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => {
if let Err(e) = app_sender
.send(ApplicationMessage::ConnectionSupportsWebRTC(true))
.await
{
error!("Coudl not post request webrtc support=true to async channel, dropping connection: {e}");
break;
}
}
Ok(ApplicationMessage::ConnectionSupportsWebRTC(state)) => {
do_webrtc = state;
}
Err(e) => { Err(e) => {
panic!("Could not set up WebRTC connection! {e}"); error!("Could not receive incoming message from remote during webrtc status check!: {e}");
} }
Ok((pc, vt)) => { _ => {
peer_connection = pc; info!("Received unhandled message from remote");
video_track = vt;
} }
} }
// Send the local description to the remote let mut peer_connection: Option<Arc<RTCPeerConnection>> = None;
if let Some(local_desc) = peer_connection.local_description().await { let video_track: Arc<TrackLocalStaticRTP>;
app_sender if do_webrtc {
.send(ApplicationMessage::WebRTCPacket(local_desc)) info!("Setting up web rtc pc status");
.await // Set up the webrtc peer connection
.expect("Could not send message to the socket message channel"); match web_rtc::setup_webrtc(uuid.clone()).await {
} Err(e) => {
info!("local information sent to the remote"); panic!("Could not set up WebRTC connection! {e}");
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
setup_callbacks(
peer_connection.clone(),
to_quit_3,
app_sender.clone(),
notify_tx,
)
.await;
info!("webrtc callbacks registered");
tokio::spawn(async move {
notify_video.notified().await;
info!("Starting video stream!");
let mut stream = video_receiver_stream.lock().await;
while let Some(map) = stream.recv().await {
if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) {
break;
} }
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { Ok((pc, vt)) => {
stream.close(); peer_connection = Some(pc);
break; video_track = vt;
} }
if let Ok(buf) = map.map_readable() { }
info!("Sending bytes: {}", buf.size());
if let Err(err) = video_track.write(&buf).await { // Send the local description to the remote
if webrtc::Error::ErrClosedPipe == err { if let Some(local_desc) = peer_connection.as_ref().unwrap().local_description().await {
error!("The peerConnection has been closed."); app_sender
} else { .send(ApplicationMessage::WebRTCPacket(local_desc))
error!("video_track write err: {}", err); .await
} .expect("Could not send message to the socket message channel");
}
info!("local information sent to the remote");
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
setup_callbacks(
peer_connection.as_mut().unwrap().clone(),
to_quit_3,
app_sender.clone(),
notify_tx,
)
.await;
info!("webrtc callbacks registered");
tokio::spawn(async move {
notify_video.notified().await;
info!("Starting video stream!");
let mut stream = video_receiver_stream.lock().await;
while let Some(map) = stream.recv().await {
if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) {
break;
} }
}; if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
stream.close();
break;
}
if let Ok(buf) = map.map_readable() {
info!("Sending bytes: {}", buf.size());
if let Err(err) = video_track.write(&buf).await {
if webrtc::Error::ErrClosedPipe == err {
error!("The peerConnection has been closed.");
} else {
error!("video_track write err: {}", err);
}
}
};
}
});
info!("starting webrtc connection kickoff");
// send the offer and trickle ice candidates to the remote, and accept their description
if let Err(e) = kickoff_connection(
peer_connection.as_ref().unwrap(),
app_sender.clone(),
&app_receiver,
)
.await
{
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
} }
});
info!("starting webrtc connection kickoff"); info!("webrtc kickoff complete, entering watch mode");
// send the offer and trickle ice candidates to the remote, and accept their description } // end supports_webrtc portion
if let Err(e) =
kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await
{
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
}
info!("webrtc kickoff complete, entering watch mode");
// loop through messages coming from the remote. // loop through messages coming from the remote.
loop { loop {
@ -243,12 +289,20 @@ async fn main() {
break; break;
} }
match msg { match msg {
ApplicationMessage::ConnectionSupportsWebRTC(_) => {},
ApplicationMessage::ConnectionSupportsWebRTCRequest => {
if let Err(e) = app_sender.send(ApplicationMessage::ConnectionSupportsWebRTC(do_webrtc)).await {
error!("Could not post connection supports webrtc status to async channel!: {e}");
}
}
ApplicationMessage::WebRTCPacket(_pkt) => { ApplicationMessage::WebRTCPacket(_pkt) => {
error!("don't know what to do with this packet!"); error!("don't know what to do with this packet!");
} }
ApplicationMessage::WebRTCIceCandidateInit(pkt) => { ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
if let Err(e) = peer_connection.add_ice_candidate(pkt).await { if peer_connection.is_some() {
error!("There was an error adding the trickle ICE candidate! {e}"); if let Err(e) = peer_connection.as_ref().unwrap().add_ice_candidate(pkt).await {
error!("There was an error adding the trickle ICE candidate! {e}");
}
} }
} }
ApplicationMessage::WebRTCIceCandidate(_pkg) => { ApplicationMessage::WebRTCIceCandidate(_pkg) => {
@ -294,13 +348,14 @@ async fn main() {
} }
} }
if let Err(e) = peer_connection.close().await { if peer_connection.is_some() {
error!("Got an error while closing the webrtc connection! {}", e); if let Err(e) = peer_connection.as_mut().unwrap().close().await {
error!("Got an error while closing the webrtc connection! {}", e);
}
} }
app_sender.close(); app_sender.close();
app_receiver.close(); app_receiver.close();
info!("Loop is exiting!"); info!("Loop is exiting!");
} }
info!( info!(
"WebRTC loop exited, should be closing down\nTo_Quit: {}", "WebRTC loop exited, should be closing down\nTo_Quit: {}",

View file

@ -227,7 +227,9 @@ pub async fn kickoff_connection(
} }
Ok(ApplicationMessage::ChangeTrackingID(_)) Ok(ApplicationMessage::ChangeTrackingID(_))
| Ok(ApplicationMessage::TrackingBoxes(_)) | Ok(ApplicationMessage::TrackingBoxes(_))
| Ok(ApplicationMessage::ManualMovementOverride(_)) => {} | Ok(ApplicationMessage::ManualMovementOverride(_))
| Ok(ApplicationMessage::ConnectionSupportsWebRTC(_))
| Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => {}
Err(e) => { Err(e) => {
panic!("WC channel was closed?! {e}"); panic!("WC channel was closed?! {e}");
} }