diff --git a/src/gst.rs b/src/gst.rs index 2358132..4e581b8 100644 --- a/src/gst.rs +++ b/src/gst.rs @@ -20,9 +20,17 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { .name("camera_to_rtp_pipeine") .build(); - let source = ElementFactory::make("mfvideosrc").build().unwrap(); + let source = ElementFactory::make("mfvideosrc") + .build() + .expect("Could not make mfvideosrc element!"); - let video_convert = ElementFactory::make("videoconvert").build().unwrap(); + let video_convert = ElementFactory::make("videoconvert") + .build() + .expect("Could not make videoconvert gst element!"); + + let video_rate = ElementFactory::make("videorate") + .build() + .expect("Could not make videoscale gst element!"); let video_scale = ElementFactory::make("videoscale") .property("add-borders", true) @@ -32,13 +40,18 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { let video_scale_caps = gstreamer::Caps::builder("video/x-raw") .field("height", HEIGHT as i32) .field("width", (HEIGHT as f64 * config.aspect_ratio) as i32) + // .field("framerate", gstreamer::Fraction::new(30, 1)) .build(); // We are using VP8 because VP9 resulted in much worse video quality // when testing -NY 8/25/2024 - let vp8enc = ElementFactory::make("vp8enc").build().unwrap(); + let vp8enc = ElementFactory::make("vp8enc") + .build() + .expect("Could not make vp8enc gst element!"); - let rtp = ElementFactory::make("rtpvp8pay").build().unwrap(); + let rtp = ElementFactory::make("rtpvp8pay") + .build() + .expect("Could not make rtpvp8pay gst element!"); let app_sink = gst_app::AppSink::builder().build(); @@ -46,6 +59,7 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { .add_many([ &source, &video_convert, + &video_rate, &video_scale, &vp8enc, &rtp, @@ -53,13 +67,15 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline { ]) .expect("Could not add all the stuff to the pipeline"); - gst::Element::link_many(&[&source, &video_convert, &video_scale]).unwrap(); + gst::Element::link_many(&[&source, &video_convert, &video_rate, &video_scale]) + .expect("Could not link source through video scale!"); video_scale .link_filtered(&vp8enc, &video_scale_caps) .expect("Could not link videoscale to vp8enc with caps!"); - gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]).unwrap(); + gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]) + .expect("Could not gst link vp8enc through appsink!"); Pipeline { pipeline, diff --git a/src/main.rs b/src/main.rs index 38e5a23..a7767e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,12 +3,12 @@ use std::sync::{ Arc, }; -use gstreamer::Buffer; +use gstreamer::{Buffer, FlowError, FlowSuccess}; use gstreamer::{prelude::ElementExt, State}; use gstreamer_app as gst_app; use tokio::{net::TcpListener, sync::Notify}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, debug}; +use tracing::{debug, error, info}; use webrtc::{ peer_connection::RTCPeerConnection, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter}, @@ -22,9 +22,9 @@ mod rig; mod tracker; mod web_rtc; -use web_rtc::{kickoff_connection, setup_callbacks, listen_for_socket}; use rig::{start_event_loop, ApplicationEvent}; use tracker::{tracker_loop, TrackerEvent}; +use web_rtc::{kickoff_connection, listen_for_socket, setup_callbacks}; const SATELLITE_NAME: &str = "CameraSatellite_1"; @@ -57,12 +57,13 @@ async fn main() { let rt = tokio::runtime::Handle::current(); - let stream: Arc>> = Arc::new(tokio::sync::Mutex::new(stream)); + let stream: Arc>> = + Arc::new(tokio::sync::Mutex::new(stream)); let cs1 = cancel_tasks.clone(); pipeline.sink.set_callbacks( gst_app::AppSinkCallbacks::builder() - .new_sample(move |app_sink| { + .new_sample(move |app_sink| -> Result { let sample = app_sink .pull_sample() .map_err(|_| gstreamer::FlowError::Eos)?; @@ -79,6 +80,7 @@ async fn main() { error!("Error sending to stream of buffers, it was closed: {e}"); cs1.cancel(); to_quit_5.store(true, Ordering::SeqCst); + return Err(gstreamer::FlowError::Error); } Ok(gstreamer::FlowSuccess::Ok) @@ -111,12 +113,7 @@ async fn main() { tokio::spawn(start_event_loop(mec, to_mec.clone())); let (to_tec, tec) = async_channel::bounded::(20); - tokio::spawn(tracker_loop( - to_tec.clone(), - tec, - to_mec.clone(), - )); - + tokio::spawn(tracker_loop(to_tec.clone(), tec, to_mec.clone())); loop { let to_quit_2 = to_quit.clone(); @@ -127,13 +124,14 @@ async fn main() { let reset_connection_3 = reset_connection.clone(); let video_receiver_stream = stream.clone(); - let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!"); + let listener = TcpListener::bind("127.0.0.1:8765") + .await + .expect("Could not bind tcp listener!"); info!("Started listening on 127.0.0.1:8765"); let (app_sender, to_core_reciever) = async_channel::bounded::(20); let (to_app_events, app_receiver) = async_channel::bounded::(20); - if to_quit_3.load(Ordering::SeqCst) { break; } @@ -148,10 +146,13 @@ async fn main() { if to_quit_3.load(Ordering::SeqCst) { break; } - + drop(listener); - if let Err(e) = to_tec.send(TrackerEvent::ChangeMEC(app_sender.clone())).await { + if let Err(e) = to_tec + .send(TrackerEvent::ChangeMEC(app_sender.clone())) + .await + { error!("There was an error sending a message to the TEC! {e}"); to_quit_2.store(true, Ordering::SeqCst); } @@ -170,7 +171,6 @@ async fn main() { } } - // Send the local description to the remote if let Some(local_desc) = peer_connection.local_description().await { app_sender @@ -222,65 +222,90 @@ async fn main() { info!("starting webrtc connection kickoff"); // send the offer and trickle ice candidates to the remote, and accept their description - if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { + if let Err(e) = + kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await + { error!("There was an issue with WebRTC setup! Resetting connection: {e}"); } info!("webrtc kickoff complete, entering watch mode"); // loop through messages coming from the remote. - while let Ok(msg) = app_receiver.recv().await { - if to_quit.load(Ordering::SeqCst) { break; } - match msg { - ApplicationMessage::WebRTCPacket(_pkt) => { - error!("don't know what to do with this packet!"); - } - ApplicationMessage::WebRTCIceCandidateInit(pkt) => { - if let Err(e) = peer_connection.add_ice_candidate(pkt).await { - error!("There was an error adding the trickle ICE candidate! {e}"); - } - } - ApplicationMessage::WebRTCIceCandidate(_pkg) => { - error!("Unhandled ice candidate!"); - } - ApplicationMessage::NameRequest(Some(name)) => info!("Got a message about '{}'", name), - ApplicationMessage::NameRequest(None) => { - if let Err(e) = app_sender - .send(ApplicationMessage::NameRequest(Some( - SATELLITE_NAME.to_owned(), - ))) - .await - { - error!("Could not let the remote know my name! {e}"); - } - } - ApplicationMessage::ChangeTrackingID(id) => { - if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await { - error!("Could not send message to tracker state! Closing down. {e}"); - break; - } - } - ApplicationMessage::TrackingBoxes(_) => { - error!("I got a tracking boxes message?"); - } - ApplicationMessage::ManualMovementOverride((x, y)) => { - if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await { - error!("Could not send manual override to state machine! {e}"); - break; - } - } - ApplicationMessage::CloseConnection => { - info!("Received connection closing, breaking the loop"); - to_quit.store(true, Ordering::SeqCst); - cancel_tasks.cancel(); - - + loop { + tokio::select! { + _ = cancel_tasks.cancelled() => { + info!("Event Loop cancelled flag caught"); break; } + Ok(msg) = app_receiver.recv() => { + if to_quit.load(Ordering::SeqCst) { + info!("ToQuit set, breaking app_receiver set"); + break; + } + match msg { + ApplicationMessage::WebRTCPacket(_pkt) => { + error!("don't know what to do with this packet!"); + } + ApplicationMessage::WebRTCIceCandidateInit(pkt) => { + if let Err(e) = peer_connection.add_ice_candidate(pkt).await { + error!("There was an error adding the trickle ICE candidate! {e}"); + } + } + ApplicationMessage::WebRTCIceCandidate(_pkg) => { + error!("Unhandled ice candidate!"); + } + ApplicationMessage::NameRequest(Some(name)) => { + info!("Got a message about '{}'", name) + } + ApplicationMessage::NameRequest(None) => { + if let Err(e) = app_sender + .send(ApplicationMessage::NameRequest(Some( + SATELLITE_NAME.to_owned(), + ))) + .await + { + error!("Could not let the remote know my name! {e}"); + } + } + ApplicationMessage::ChangeTrackingID(id) => { + if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await { + error!("Could not send message to tracker state! Closing down. {e}"); + break; + } + } + ApplicationMessage::TrackingBoxes(_) => { + error!("I got a tracking boxes message?"); + } + ApplicationMessage::ManualMovementOverride((x, y)) => { + if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await { + error!("Could not send manual override to state machine! {e}"); + break; + } + } + ApplicationMessage::CloseConnection => { + info!("Received connection closing, breaking the loop"); + to_quit.store(true, Ordering::SeqCst); + cancel_tasks.cancel(); + + break; + } + } + } } } + + if let Err(e) = peer_connection.close().await { + error!("Got an error while closing the webrtc connection! {}", e); + } + app_sender.close(); + app_receiver.close(); + info!("Loop is exiting!"); + } - info!("WebRTC loop exited, should be closing down\nTo_Quit: {}", to_quit.load(Ordering::SeqCst)); + info!( + "WebRTC loop exited, should be closing down\nTo_Quit: {}", + to_quit.load(Ordering::SeqCst) + ); if let Err(e) = pipeline.pipeline.set_state(State::Null) { panic!("Could not start pipeline! {e}"); } diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index ce76f6e..37160f8 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -52,7 +52,8 @@ pub async fn tracker_loop( if state.app_sender.is_some() { if let Err(e) = state .app_sender - .as_ref().unwrap() + .as_ref() + .unwrap() .send(ApplicationMessage::TrackingBoxes( vcs_common::types::TrackingUpdate { target_id: None, diff --git a/src/web_rtc.rs b/src/web_rtc.rs index 1b7085a..c540e65 100644 --- a/src/web_rtc.rs +++ b/src/web_rtc.rs @@ -3,12 +3,12 @@ use std::sync::{ Arc, }; -use tokio::sync::Notify; -use tokio::net::TcpListener; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::accept_async; use futures_util::{SinkExt, StreamExt}; -use tracing::{error, info, debug, warn, instrument}; +use tokio::net::TcpListener; +use tokio::sync::Notify; +use tokio_tungstenite::accept_async; +use tokio_tungstenite::tungstenite::Message; +use tracing::{debug, error, info, instrument, warn}; use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; use webrtc::{ api::{ @@ -239,7 +239,12 @@ pub async fn kickoff_connection( Ok(()) } -pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender, reset_connection: Arc) { +pub async fn listen_for_socket( + listener: &TcpListener, + from_app: AppReceiver, + to_ui: AppSender, + reset_connection: Arc, +) { if let Ok((stream, _)) = listener.accept().await { match accept_async(stream).await { Err(e) => error!("Could not convert incoming stream to websocket: {e}"), @@ -287,62 +292,63 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to Err(e) => { error!("There was an error getting a message from the remote! {e}"); reset_connection.store(true, Ordering::SeqCst); - } - Ok(msg) => match msg { - Message::Ping(_) | Message::Pong(_) => {} - Message::Close(_) => { - info!("Received WebSocket close message! Closing the websocket"); - break; - } - Message::Frame(_) => { - info!("Received a Frame websocket message?"); - } - Message::Text(text) => { - debug!("Recieved text from websocket: {text}"); - #[cfg(debug_assertions)] - { - match serde_json::from_str(&text) { - Ok(msg) => { - if let Err(e) = to_ui.send(msg).await { - error!("Could not send message from ws to application! Closing and exiting\n{e}"); - break; + Ok(msg) => { + match msg { + Message::Ping(_) | Message::Pong(_) => {} + Message::Close(_) => { + info!("Received WebSocket close message! Closing the websocket"); + break; + } + Message::Frame(_) => { + info!("Received a Frame websocket message?"); + } + Message::Text(text) => { + debug!("Recieved text from websocket: {text}"); + #[cfg(debug_assertions)] + { + match serde_json::from_str(&text) { + Ok(msg) => { + if let Err(e) = to_ui.send(msg).await { + error!("Could not send message from ws to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); } } - Err(e) => { - error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); - } } - } - #[cfg(not(debug_assertions))] - { - warn!("Recieved a `Text` message from the remote while running in release mode! " + + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Text` message from the remote while running in release mode! " + "Was the other endpoint running release mode?\n msg: {text}"); - } - } - Message::Binary(msg) => { - #[cfg(debug_assertions)] - { - match bincode::deserialize::(&msg) { - Ok(m) => { - if let Err(e) = to_ui.send(m).await { - error!("Could not send message to application! Closing and exiting\n{e}"); - break; - } - } - Err(e) => { - error!("Received a malformed binary message from the websocket!\n{e}"); - } } } + Message::Binary(msg) => { + #[cfg(debug_assertions)] + { + match bincode::deserialize::(&msg) { + Ok(m) => { + if let Err(e) = to_ui.send(m).await { + error!("Could not send message to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed binary message from the websocket!\n{e}"); + } + } + } - #[cfg(not(debug_assertions))] - { - warn!("Recieved a `Binary` message from the remote while running in debug mode! " + + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Binary` message from the remote while running in debug mode! " + "Was the other endpoing running debug mode?"); + } } } - }, + } } } warn!("The websocket listener closed"); @@ -351,4 +357,3 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to } } } -