Compare commits
No commits in common. "64c840a2fd0f9d999986b277ba2a810f32173ff6" and "400b33adb296ee5260bf732841ae006f555fd4d2" have entirely different histories.
64c840a2fd
...
400b33adb2
5 changed files with 197 additions and 301 deletions
22
Cargo.lock
generated
22
Cargo.lock
generated
|
@ -274,9 +274,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
|
|||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.7.2"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
|
||||
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
|
||||
|
||||
[[package]]
|
||||
name = "cbc"
|
||||
|
@ -289,9 +289,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "cc"
|
||||
version = "1.1.21"
|
||||
version = "1.1.18"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0"
|
||||
checksum = "b62ac837cdb5cb22e10a256099b4fc502b1dfe560cb282963a974d7abd80e476"
|
||||
dependencies = [
|
||||
"shlex",
|
||||
]
|
||||
|
@ -1310,9 +1310,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.19.0"
|
||||
version = "1.20.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
|
||||
checksum = "33ea5043e58958ee56f3e15a90aee535795cd7dfd319846288d93c5b57d85cbe"
|
||||
|
||||
[[package]]
|
||||
name = "opaque-debug"
|
||||
|
@ -2258,9 +2258,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.22.21"
|
||||
version = "0.22.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3b072cee73c449a636ffd6f32bd8de3a9f7119139aff882f44943ce2986dc5cf"
|
||||
checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d"
|
||||
dependencies = [
|
||||
"indexmap",
|
||||
"serde",
|
||||
|
@ -2391,9 +2391,9 @@ checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe"
|
|||
|
||||
[[package]]
|
||||
name = "unicode-normalization"
|
||||
version = "0.1.24"
|
||||
version = "0.1.23"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956"
|
||||
checksum = "a56d1686db2308d901306f92a263857ef59ea39678a5458e7cb17f01415101f5"
|
||||
dependencies = [
|
||||
"tinyvec",
|
||||
]
|
||||
|
@ -2486,7 +2486,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "vcs-common"
|
||||
version = "0.1.0"
|
||||
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#f1c42e73a701755803c239db905f98aa764ac696"
|
||||
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#14b06e3351761a5eb29b9a2d35b1f5aa4212b82b"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"bincode",
|
||||
|
|
28
src/gst.rs
28
src/gst.rs
|
@ -20,17 +20,9 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
|||
.name("camera_to_rtp_pipeine")
|
||||
.build();
|
||||
|
||||
let source = ElementFactory::make("mfvideosrc")
|
||||
.build()
|
||||
.expect("Could not make mfvideosrc element!");
|
||||
let source = ElementFactory::make("mfvideosrc").build().unwrap();
|
||||
|
||||
let video_convert = ElementFactory::make("videoconvert")
|
||||
.build()
|
||||
.expect("Could not make videoconvert gst element!");
|
||||
|
||||
let video_rate = ElementFactory::make("videorate")
|
||||
.build()
|
||||
.expect("Could not make videoscale gst element!");
|
||||
let video_convert = ElementFactory::make("videoconvert").build().unwrap();
|
||||
|
||||
let video_scale = ElementFactory::make("videoscale")
|
||||
.property("add-borders", true)
|
||||
|
@ -40,18 +32,13 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
|||
let video_scale_caps = gstreamer::Caps::builder("video/x-raw")
|
||||
.field("height", HEIGHT as i32)
|
||||
.field("width", (HEIGHT as f64 * config.aspect_ratio) as i32)
|
||||
// .field("framerate", gstreamer::Fraction::new(30, 1))
|
||||
.build();
|
||||
|
||||
// We are using VP8 because VP9 resulted in much worse video quality
|
||||
// when testing -NY 8/25/2024
|
||||
let vp8enc = ElementFactory::make("vp8enc")
|
||||
.build()
|
||||
.expect("Could not make vp8enc gst element!");
|
||||
let vp8enc = ElementFactory::make("vp8enc").build().unwrap();
|
||||
|
||||
let rtp = ElementFactory::make("rtpvp8pay")
|
||||
.build()
|
||||
.expect("Could not make rtpvp8pay gst element!");
|
||||
let rtp = ElementFactory::make("rtpvp8pay").build().unwrap();
|
||||
|
||||
let app_sink = gst_app::AppSink::builder().build();
|
||||
|
||||
|
@ -59,7 +46,6 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
|||
.add_many([
|
||||
&source,
|
||||
&video_convert,
|
||||
&video_rate,
|
||||
&video_scale,
|
||||
&vp8enc,
|
||||
&rtp,
|
||||
|
@ -67,15 +53,13 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
|||
])
|
||||
.expect("Could not add all the stuff to the pipeline");
|
||||
|
||||
gst::Element::link_many(&[&source, &video_convert, &video_rate, &video_scale])
|
||||
.expect("Could not link source through video scale!");
|
||||
gst::Element::link_many(&[&source, &video_convert, &video_scale]).unwrap();
|
||||
|
||||
video_scale
|
||||
.link_filtered(&vp8enc, &video_scale_caps)
|
||||
.expect("Could not link videoscale to vp8enc with caps!");
|
||||
|
||||
gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()])
|
||||
.expect("Could not gst link vp8enc through appsink!");
|
||||
gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]).unwrap();
|
||||
|
||||
Pipeline {
|
||||
pipeline,
|
||||
|
|
318
src/main.rs
318
src/main.rs
|
@ -3,12 +3,12 @@ use std::sync::{
|
|||
Arc,
|
||||
};
|
||||
|
||||
use gstreamer::Buffer;
|
||||
use gstreamer::{prelude::ElementExt, State};
|
||||
use gstreamer::{Buffer, FlowError, FlowSuccess};
|
||||
use gstreamer_app as gst_app;
|
||||
use tokio::{net::TcpListener, sync::Notify};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{debug, error, info};
|
||||
use tracing::{error, info, debug};
|
||||
use webrtc::{
|
||||
peer_connection::RTCPeerConnection,
|
||||
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
|
||||
|
@ -22,9 +22,9 @@ mod rig;
|
|||
mod tracker;
|
||||
mod web_rtc;
|
||||
|
||||
use web_rtc::{kickoff_connection, setup_callbacks, listen_for_socket};
|
||||
use rig::{start_event_loop, ApplicationEvent};
|
||||
use tracker::{tracker_loop, TrackerEvent};
|
||||
use web_rtc::{kickoff_connection, listen_for_socket, setup_callbacks};
|
||||
|
||||
const SATELLITE_NAME: &str = "CameraSatellite_1";
|
||||
|
||||
|
@ -57,13 +57,12 @@ async fn main() {
|
|||
|
||||
let rt = tokio::runtime::Handle::current();
|
||||
|
||||
let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> =
|
||||
Arc::new(tokio::sync::Mutex::new(stream));
|
||||
let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> = Arc::new(tokio::sync::Mutex::new(stream));
|
||||
|
||||
let cs1 = cancel_tasks.clone();
|
||||
pipeline.sink.set_callbacks(
|
||||
gst_app::AppSinkCallbacks::builder()
|
||||
.new_sample(move |app_sink| -> Result<FlowSuccess, FlowError> {
|
||||
.new_sample(move |app_sink| {
|
||||
let sample = app_sink
|
||||
.pull_sample()
|
||||
.map_err(|_| gstreamer::FlowError::Eos)?;
|
||||
|
@ -80,7 +79,6 @@ async fn main() {
|
|||
error!("Error sending to stream of buffers, it was closed: {e}");
|
||||
cs1.cancel();
|
||||
to_quit_5.store(true, Ordering::SeqCst);
|
||||
return Err(gstreamer::FlowError::Error);
|
||||
}
|
||||
|
||||
Ok(gstreamer::FlowSuccess::Ok)
|
||||
|
@ -113,7 +111,12 @@ async fn main() {
|
|||
tokio::spawn(start_event_loop(mec, to_mec.clone()));
|
||||
|
||||
let (to_tec, tec) = async_channel::bounded::<TrackerEvent>(20);
|
||||
tokio::spawn(tracker_loop(to_tec.clone(), tec, to_mec.clone()));
|
||||
tokio::spawn(tracker_loop(
|
||||
to_tec.clone(),
|
||||
tec,
|
||||
to_mec.clone(),
|
||||
));
|
||||
|
||||
|
||||
loop {
|
||||
let to_quit_2 = to_quit.clone();
|
||||
|
@ -124,14 +127,13 @@ async fn main() {
|
|||
let reset_connection_3 = reset_connection.clone();
|
||||
|
||||
let video_receiver_stream = stream.clone();
|
||||
let listener = TcpListener::bind("127.0.0.1:8765")
|
||||
.await
|
||||
.expect("Could not bind tcp listener!");
|
||||
let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!");
|
||||
info!("Started listening on 127.0.0.1:8765");
|
||||
|
||||
let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20);
|
||||
let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20);
|
||||
|
||||
|
||||
if to_quit_3.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
@ -149,218 +151,136 @@ async fn main() {
|
|||
|
||||
drop(listener);
|
||||
|
||||
if let Err(e) = app_sender
|
||||
.send(ApplicationMessage::ConnectionSupportsWebRTCRequest)
|
||||
.await
|
||||
{
|
||||
error!("Could not post request webrtc support to async channel: {e}");
|
||||
break;
|
||||
}
|
||||
|
||||
if let Err(e) = to_tec
|
||||
.send(TrackerEvent::ChangeMEC(app_sender.clone()))
|
||||
.await
|
||||
{
|
||||
if let Err(e) = to_tec.send(TrackerEvent::ChangeMEC(app_sender.clone())).await {
|
||||
error!("There was an error sending a message to the TEC! {e}");
|
||||
to_quit_2.store(true, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
let mut do_webrtc = false;
|
||||
match app_receiver.recv().await {
|
||||
Ok(ApplicationMessage::NameRequest(None)) => {
|
||||
if let Err(e) = app_sender
|
||||
.send(ApplicationMessage::NameRequest(Some(
|
||||
SATELLITE_NAME.to_owned(),
|
||||
)))
|
||||
.await
|
||||
{
|
||||
error!("Could not let the remote know my name! {e}");
|
||||
}
|
||||
}
|
||||
Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => {
|
||||
if let Err(e) = app_sender
|
||||
.send(ApplicationMessage::ConnectionSupportsWebRTC(true))
|
||||
.await
|
||||
{
|
||||
error!("Coudl not post request webrtc support=true to async channel, dropping connection: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(ApplicationMessage::ConnectionSupportsWebRTC(state)) => {
|
||||
do_webrtc = state;
|
||||
}
|
||||
info!("Setting up web rtc pc status");
|
||||
// Set up the webrtc peer connection
|
||||
let peer_connection: Arc<RTCPeerConnection>;
|
||||
let video_track: Arc<TrackLocalStaticRTP>;
|
||||
match web_rtc::setup_webrtc(uuid.clone()).await {
|
||||
Err(e) => {
|
||||
error!("Could not receive incoming message from remote during webrtc status check!: {e}");
|
||||
panic!("Could not set up WebRTC connection! {e}");
|
||||
}
|
||||
_ => {
|
||||
info!("Received unhandled message from remote");
|
||||
Ok((pc, vt)) => {
|
||||
peer_connection = pc;
|
||||
video_track = vt;
|
||||
}
|
||||
}
|
||||
|
||||
let mut peer_connection: Option<Arc<RTCPeerConnection>> = None;
|
||||
let video_track: Arc<TrackLocalStaticRTP>;
|
||||
if do_webrtc {
|
||||
info!("Setting up web rtc pc status");
|
||||
// Set up the webrtc peer connection
|
||||
match web_rtc::setup_webrtc(uuid.clone()).await {
|
||||
Err(e) => {
|
||||
panic!("Could not set up WebRTC connection! {e}");
|
||||
|
||||
// Send the local description to the remote
|
||||
if let Some(local_desc) = peer_connection.local_description().await {
|
||||
app_sender
|
||||
.send(ApplicationMessage::WebRTCPacket(local_desc))
|
||||
.await
|
||||
.expect("Could not send message to the socket message channel");
|
||||
}
|
||||
info!("local information sent to the remote");
|
||||
|
||||
let notify_tx = Arc::new(Notify::new());
|
||||
let notify_video = notify_tx.clone();
|
||||
|
||||
setup_callbacks(
|
||||
peer_connection.clone(),
|
||||
to_quit_3,
|
||||
app_sender.clone(),
|
||||
notify_tx,
|
||||
)
|
||||
.await;
|
||||
info!("webrtc callbacks registered");
|
||||
|
||||
tokio::spawn(async move {
|
||||
notify_video.notified().await;
|
||||
|
||||
info!("Starting video stream!");
|
||||
|
||||
let mut stream = video_receiver_stream.lock().await;
|
||||
|
||||
while let Some(map) = stream.recv().await {
|
||||
if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
Ok((pc, vt)) => {
|
||||
peer_connection = Some(pc);
|
||||
video_track = vt;
|
||||
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
stream.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Send the local description to the remote
|
||||
if let Some(local_desc) = peer_connection.as_ref().unwrap().local_description().await {
|
||||
app_sender
|
||||
.send(ApplicationMessage::WebRTCPacket(local_desc))
|
||||
.await
|
||||
.expect("Could not send message to the socket message channel");
|
||||
}
|
||||
info!("local information sent to the remote");
|
||||
|
||||
let notify_tx = Arc::new(Notify::new());
|
||||
let notify_video = notify_tx.clone();
|
||||
|
||||
setup_callbacks(
|
||||
peer_connection.as_mut().unwrap().clone(),
|
||||
to_quit_3,
|
||||
app_sender.clone(),
|
||||
notify_tx,
|
||||
)
|
||||
.await;
|
||||
info!("webrtc callbacks registered");
|
||||
|
||||
tokio::spawn(async move {
|
||||
notify_video.notified().await;
|
||||
|
||||
info!("Starting video stream!");
|
||||
|
||||
let mut stream = video_receiver_stream.lock().await;
|
||||
|
||||
while let Some(map) = stream.recv().await {
|
||||
if reset_connection_2.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
stream.close();
|
||||
break;
|
||||
}
|
||||
if let Ok(buf) = map.map_readable() {
|
||||
info!("Sending bytes: {}", buf.size());
|
||||
if let Err(err) = video_track.write(&buf).await {
|
||||
if webrtc::Error::ErrClosedPipe == err {
|
||||
error!("The peerConnection has been closed.");
|
||||
} else {
|
||||
error!("video_track write err: {}", err);
|
||||
}
|
||||
if let Ok(buf) = map.map_readable() {
|
||||
info!("Sending bytes: {}", buf.size());
|
||||
if let Err(err) = video_track.write(&buf).await {
|
||||
if webrtc::Error::ErrClosedPipe == err {
|
||||
error!("The peerConnection has been closed.");
|
||||
} else {
|
||||
error!("video_track write err: {}", err);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
info!("starting webrtc connection kickoff");
|
||||
// send the offer and trickle ice candidates to the remote, and accept their description
|
||||
if let Err(e) = kickoff_connection(
|
||||
peer_connection.as_ref().unwrap(),
|
||||
app_sender.clone(),
|
||||
&app_receiver,
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
info!("webrtc kickoff complete, entering watch mode");
|
||||
} // end supports_webrtc portion
|
||||
info!("starting webrtc connection kickoff");
|
||||
// send the offer and trickle ice candidates to the remote, and accept their description
|
||||
if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await {
|
||||
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
|
||||
}
|
||||
|
||||
info!("webrtc kickoff complete, entering watch mode");
|
||||
|
||||
// loop through messages coming from the remote.
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel_tasks.cancelled() => {
|
||||
info!("Event Loop cancelled flag caught");
|
||||
break;
|
||||
while let Ok(msg) = app_receiver.recv().await {
|
||||
if to_quit.load(Ordering::SeqCst) { break; }
|
||||
match msg {
|
||||
ApplicationMessage::WebRTCPacket(_pkt) => {
|
||||
error!("don't know what to do with this packet!");
|
||||
}
|
||||
Ok(msg) = app_receiver.recv() => {
|
||||
if to_quit.load(Ordering::SeqCst) {
|
||||
info!("ToQuit set, breaking app_receiver set");
|
||||
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
|
||||
if let Err(e) = peer_connection.add_ice_candidate(pkt).await {
|
||||
error!("There was an error adding the trickle ICE candidate! {e}");
|
||||
}
|
||||
}
|
||||
ApplicationMessage::WebRTCIceCandidate(_pkg) => {
|
||||
error!("Unhandled ice candidate!");
|
||||
}
|
||||
ApplicationMessage::NameRequest(Some(name)) => info!("Got a message about '{}'", name),
|
||||
ApplicationMessage::NameRequest(None) => {
|
||||
if let Err(e) = app_sender
|
||||
.send(ApplicationMessage::NameRequest(Some(
|
||||
SATELLITE_NAME.to_owned(),
|
||||
)))
|
||||
.await
|
||||
{
|
||||
error!("Could not let the remote know my name! {e}");
|
||||
}
|
||||
}
|
||||
ApplicationMessage::ChangeTrackingID(id) => {
|
||||
if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await {
|
||||
error!("Could not send message to tracker state! Closing down. {e}");
|
||||
break;
|
||||
}
|
||||
match msg {
|
||||
ApplicationMessage::ConnectionSupportsWebRTC(_) => {},
|
||||
ApplicationMessage::ConnectionSupportsWebRTCRequest => {
|
||||
if let Err(e) = app_sender.send(ApplicationMessage::ConnectionSupportsWebRTC(do_webrtc)).await {
|
||||
error!("Could not post connection supports webrtc status to async channel!: {e}");
|
||||
}
|
||||
}
|
||||
ApplicationMessage::WebRTCPacket(_pkt) => {
|
||||
error!("don't know what to do with this packet!");
|
||||
}
|
||||
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
|
||||
if peer_connection.is_some() {
|
||||
if let Err(e) = peer_connection.as_ref().unwrap().add_ice_candidate(pkt).await {
|
||||
error!("There was an error adding the trickle ICE candidate! {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
ApplicationMessage::WebRTCIceCandidate(_pkg) => {
|
||||
error!("Unhandled ice candidate!");
|
||||
}
|
||||
ApplicationMessage::NameRequest(Some(name)) => {
|
||||
info!("Got a message about '{}'", name)
|
||||
}
|
||||
ApplicationMessage::NameRequest(None) => {
|
||||
if let Err(e) = app_sender
|
||||
.send(ApplicationMessage::NameRequest(Some(
|
||||
SATELLITE_NAME.to_owned(),
|
||||
)))
|
||||
.await
|
||||
{
|
||||
error!("Could not let the remote know my name! {e}");
|
||||
}
|
||||
}
|
||||
ApplicationMessage::ChangeTrackingID(id) => {
|
||||
if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await {
|
||||
error!("Could not send message to tracker state! Closing down. {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
ApplicationMessage::TrackingBoxes(_) => {
|
||||
error!("I got a tracking boxes message?");
|
||||
}
|
||||
ApplicationMessage::ManualMovementOverride((x, y)) => {
|
||||
if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await {
|
||||
error!("Could not send manual override to state machine! {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
ApplicationMessage::CloseConnection => {
|
||||
info!("Received connection closing, breaking the loop");
|
||||
to_quit.store(true, Ordering::SeqCst);
|
||||
cancel_tasks.cancel();
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
ApplicationMessage::TrackingBoxes(_) => {
|
||||
error!("I got a tracking boxes message?");
|
||||
}
|
||||
ApplicationMessage::ManualMovementOverride((x, y)) => {
|
||||
if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await {
|
||||
error!("Could not send manual override to state machine! {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
ApplicationMessage::CloseConnection => {
|
||||
info!("Received connection closing, breaking the loop");
|
||||
to_quit.store(true, Ordering::SeqCst);
|
||||
cancel_tasks.cancel();
|
||||
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if peer_connection.is_some() {
|
||||
if let Err(e) = peer_connection.as_mut().unwrap().close().await {
|
||||
error!("Got an error while closing the webrtc connection! {}", e);
|
||||
}
|
||||
}
|
||||
app_sender.close();
|
||||
app_receiver.close();
|
||||
info!("Loop is exiting!");
|
||||
}
|
||||
info!(
|
||||
"WebRTC loop exited, should be closing down\nTo_Quit: {}",
|
||||
to_quit.load(Ordering::SeqCst)
|
||||
);
|
||||
info!("WebRTC loop exited, should be closing down\nTo_Quit: {}", to_quit.load(Ordering::SeqCst));
|
||||
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
|
||||
panic!("Could not start pipeline! {e}");
|
||||
}
|
||||
|
|
|
@ -52,8 +52,7 @@ pub async fn tracker_loop(
|
|||
if state.app_sender.is_some() {
|
||||
if let Err(e) = state
|
||||
.app_sender
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.as_ref().unwrap()
|
||||
.send(ApplicationMessage::TrackingBoxes(
|
||||
vcs_common::types::TrackingUpdate {
|
||||
target_id: None,
|
||||
|
|
125
src/web_rtc.rs
125
src/web_rtc.rs
|
@ -3,12 +3,12 @@ use std::sync::{
|
|||
Arc,
|
||||
};
|
||||
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_tungstenite::accept_async;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use tracing::{debug, error, info, instrument, warn};
|
||||
use tokio_tungstenite::accept_async;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use tracing::{error, info, debug, warn, instrument};
|
||||
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
|
||||
use webrtc::{
|
||||
api::{
|
||||
|
@ -227,9 +227,7 @@ pub async fn kickoff_connection(
|
|||
}
|
||||
Ok(ApplicationMessage::ChangeTrackingID(_))
|
||||
| Ok(ApplicationMessage::TrackingBoxes(_))
|
||||
| Ok(ApplicationMessage::ManualMovementOverride(_))
|
||||
| Ok(ApplicationMessage::ConnectionSupportsWebRTC(_))
|
||||
| Ok(ApplicationMessage::ConnectionSupportsWebRTCRequest) => {}
|
||||
| Ok(ApplicationMessage::ManualMovementOverride(_)) => {}
|
||||
Err(e) => {
|
||||
panic!("WC channel was closed?! {e}");
|
||||
}
|
||||
|
@ -241,12 +239,7 @@ pub async fn kickoff_connection(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn listen_for_socket(
|
||||
listener: &TcpListener,
|
||||
from_app: AppReceiver,
|
||||
to_ui: AppSender,
|
||||
reset_connection: Arc<AtomicBool>,
|
||||
) {
|
||||
pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender, reset_connection: Arc<AtomicBool>) {
|
||||
if let Ok((stream, _)) = listener.accept().await {
|
||||
match accept_async(stream).await {
|
||||
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
|
||||
|
@ -294,63 +287,62 @@ pub async fn listen_for_socket(
|
|||
Err(e) => {
|
||||
error!("There was an error getting a message from the remote! {e}");
|
||||
reset_connection.store(true, Ordering::SeqCst);
|
||||
}
|
||||
Ok(msg) => {
|
||||
match msg {
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
Message::Close(_) => {
|
||||
info!("Received WebSocket close message! Closing the websocket");
|
||||
break;
|
||||
}
|
||||
Message::Frame(_) => {
|
||||
info!("Received a Frame websocket message?");
|
||||
}
|
||||
Message::Text(text) => {
|
||||
debug!("Recieved text from websocket: {text}");
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
match serde_json::from_str(&text) {
|
||||
Ok(msg) => {
|
||||
if let Err(e) = to_ui.send(msg).await {
|
||||
error!("Could not send message from ws to application! Closing and exiting\n{e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
warn!("Recieved a `Text` message from the remote while running in release mode! " +
|
||||
"Was the other endpoint running release mode?\n msg: {text}");
|
||||
}
|
||||
}
|
||||
Message::Binary(msg) => {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
match bincode::deserialize::<ApplicationMessage>(&msg) {
|
||||
Ok(m) => {
|
||||
if let Err(e) = to_ui.send(m).await {
|
||||
error!("Could not send message to application! Closing and exiting\n{e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received a malformed binary message from the websocket!\n{e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
|
||||
"Was the other endpoing running debug mode?");
|
||||
}
|
||||
Ok(msg) => match msg {
|
||||
Message::Ping(_) | Message::Pong(_) => {}
|
||||
Message::Close(_) => {
|
||||
info!("Received WebSocket close message! Closing the websocket");
|
||||
break;
|
||||
}
|
||||
Message::Frame(_) => {
|
||||
info!("Received a Frame websocket message?");
|
||||
}
|
||||
Message::Text(text) => {
|
||||
debug!("Recieved text from websocket: {text}");
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
match serde_json::from_str(&text) {
|
||||
Ok(msg) => {
|
||||
if let Err(e) = to_ui.send(msg).await {
|
||||
error!("Could not send message from ws to application! Closing and exiting\n{e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
warn!("Recieved a `Text` message from the remote while running in release mode! " +
|
||||
"Was the other endpoint running release mode?\n msg: {text}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Message::Binary(msg) => {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
match bincode::deserialize::<ApplicationMessage>(&msg) {
|
||||
Ok(m) => {
|
||||
if let Err(e) = to_ui.send(m).await {
|
||||
error!("Could not send message to application! Closing and exiting\n{e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received a malformed binary message from the websocket!\n{e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
|
||||
"Was the other endpoing running debug mode?");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
warn!("The websocket listener closed");
|
||||
|
@ -359,3 +351,4 @@ pub async fn listen_for_socket(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue