From 996e991ee4a04c42a780dbf911837e4a91ece74e Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Wed, 11 Sep 2024 03:58:46 +0000 Subject: [PATCH] moved to recving websocket model --- Cargo.lock | 2 + Cargo.toml | 2 + src/main.rs | 259 +++++++++++++++++++++++++-------------------- src/tracker/mod.rs | 48 +++++---- src/web_rtc.rs | 123 ++++++++++++++++++++- 5 files changed, 294 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbd8aad..6037a8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2463,7 +2463,9 @@ name = "vcs-camera-satellite" version = "0.1.0" dependencies = [ "async-channel", + "bincode", "config", + "futures-util", "gstreamer", "gstreamer-app", "log", diff --git a/Cargo.toml b/Cargo.toml index 24185f6..10ab17c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,5 @@ webrtc = "0.11.0" vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } async-channel = "2.3.1" uuid = "1.10.0" +futures-util = "0.3.30" +bincode = "1.3.3" diff --git a/src/main.rs b/src/main.rs index c6c0000..be116ac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,9 +6,8 @@ use std::sync::{ use gstreamer::Buffer; use gstreamer::{prelude::ElementExt, State}; use gstreamer_app as gst_app; -use tokio::sync::Notify; -use tracing::{error, info}; -use web_rtc::{kickoff_connection, setup_callbacks}; +use tokio::{net::TcpListener, sync::Notify}; +use tracing::{error, info, debug}; use webrtc::{ peer_connection::RTCPeerConnection, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter}, @@ -22,6 +21,7 @@ mod rig; mod tracker; mod web_rtc; +use web_rtc::{kickoff_connection, setup_callbacks, listen_for_socket}; use rig::{start_event_loop, ApplicationEvent}; use tracker::{tracker_loop, TrackerEvent}; @@ -40,9 +40,8 @@ async fn main() { // EXIT HANDLER let to_quit = Arc::new(AtomicBool::new(false)); - let to_quit_2 = to_quit.clone(); - let to_quit_3 = to_quit.clone(); let to_quit_5 = to_quit.clone(); + let to_quit_6 = to_quit.clone(); // GSTREAMER SETUP if let Err(e) = gstreamer::init() { @@ -52,79 +51,11 @@ async fn main() { let pipeline = gst::new_pipeline(&config); // frame buffers from the gstreamer pipeline to send over the webrtc connection - let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); + let (to_stream, stream) = tokio::sync::mpsc::channel::(10); let rt = tokio::runtime::Handle::current(); - let app_sender: AppSender; - let app_receiver: AppReceiver; - // connect to remote server - match vcs_common::connect_to_server( - format!("ws://{}:{}", config.destination_ip, config.destination_port), - rt, - ) - .await - { - Err(e) => panic!("There was an error connecting to the remote: {e}"), - Ok((sender, recvr, _ws_open)) => { - app_sender = sender; - app_receiver = recvr; - } - } - - // Set up the webrtc peer connection - let peer_connection: Arc; - let video_track: Arc; - match web_rtc::setup_webrtc(uuid.clone()).await { - Err(e) => { - panic!("Could not set up WebRTC connection! {e}"); - } - Ok((pc, vt)) => { - peer_connection = pc; - video_track = vt; - } - } - - // Send the local description to the remote - if let Some(local_desc) = peer_connection.local_description().await { - app_sender - .send(ApplicationMessage::WebRTCPacket(local_desc)) - .await - .expect("Could not send message to the socket message channel"); - } - - let notify_tx = Arc::new(Notify::new()); - let notify_video = notify_tx.clone(); - - setup_callbacks( - peer_connection.clone(), - to_quit_3, - app_sender.clone(), - notify_tx, - ) - .await; - - tokio::spawn(async move { - notify_video.notified().await; - - info!("Starting video stream!"); - - while let Some(map) = stream.recv().await { - if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { - stream.close(); - break; - } - if let Ok(buf) = map.map_readable() { - if let Err(err) = video_track.write(&buf).await { - if webrtc::Error::ErrClosedPipe == err { - error!("The peerConnection has been closed."); - } else { - error!("video_track write err: {}", err); - } - } - }; - } - }); + let stream: Arc>> = Arc::new(tokio::sync::Mutex::new(stream)); pipeline.sink.set_callbacks( gst_app::AppSinkCallbacks::builder() @@ -142,7 +73,7 @@ async fn main() { })?; if let Err(e) = to_stream.blocking_send(buffer) { - error!("Error sending to stream of buffers: {e}"); + error!("Error sending to stream of buffers, it was closed: {e}"); to_quit_5.store(true, Ordering::SeqCst); } @@ -151,12 +82,6 @@ async fn main() { .build(), ); - // send the offer and trickle ice candidates to the remote, and accept their description - if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { - to_quit.store(true, Ordering::SeqCst); - panic!("There was an issue with WebRTC setup! {e}"); - } - // Start the gstreamer pipeline if let Err(e) = pipeline.pipeline.set_state(State::Playing) { panic!("Could not start pipeline! {e}"); @@ -166,7 +91,11 @@ async fn main() { tokio::spawn(async move { match tokio::signal::ctrl_c().await { Ok(()) => { - to_quit.store(true, std::sync::atomic::Ordering::SeqCst); + to_quit_6.store(true, std::sync::atomic::Ordering::SeqCst); + // Start the gstreamer pipeline + if let Err(e) = pipeline.pipeline.set_state(State::Null) { + panic!("Could not start pipeline! {e}"); + } } Err(e) => { error!("Could not watch ctrl_c signal! {e}"); @@ -182,47 +111,143 @@ async fn main() { to_tec.clone(), tec, to_mec.clone(), - app_sender.clone(), )); - // loop through messages coming from the remote. - while let Ok(msg) = app_receiver.recv().await { - match msg { - ApplicationMessage::WebRTCPacket(_pkt) => { - error!("don't know what to do with this packet!"); + + + loop { + let to_quit_2 = to_quit.clone(); + let to_quit_3 = to_quit.clone(); + + let video_receiver_stream = stream.clone(); + let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!"); + info!("Started listening on 127.0.0.1:8765"); + + let (app_sender, to_core_reciever) = async_channel::bounded::(20); + let (to_app_events, app_receiver) = async_channel::bounded::(20); + + todo!("Need to find a way to cancel this future if to_quit is set manually"); + listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender).await; + + if to_quit_3.load(Ordering::SeqCst) { + break; + } + + drop(listener); + + if let Err(e) = to_tec.send(TrackerEvent::ChangeMEC(app_sender.clone())).await { + error!("There was an error sending a message to the TEC! {e}"); + to_quit_2.store(true, Ordering::SeqCst); + } + + info!("Setting up web rtc pc status"); + // Set up the webrtc peer connection + let peer_connection: Arc; + let video_track: Arc; + match web_rtc::setup_webrtc(uuid.clone()).await { + Err(e) => { + panic!("Could not set up WebRTC connection! {e}"); } - ApplicationMessage::WebRTCIceCandidateInit(pkt) => { - if let Err(e) = peer_connection.add_ice_candidate(pkt).await { - error!("There was an error adding the trickle ICE candidate! {e}"); - } + Ok((pc, vt)) => { + peer_connection = pc; + video_track = vt; } - ApplicationMessage::WebRTCIceCandidate(_pkg) => { - error!("Unhandled ice candidate!"); - } - ApplicationMessage::NameRequest(Some(name)) => info!("Got a message about '{}'", name), - ApplicationMessage::NameRequest(None) => { - if let Err(e) = app_sender - .send(ApplicationMessage::NameRequest(Some( - SATELLITE_NAME.to_owned(), - ))) - .await - { - error!("Could not let the remote know my name! {e}"); - } - } - ApplicationMessage::ChangeTrackingID(id) => { - if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await { - error!("Could not send message to tracker state! Closing down. {e}"); + } + + // Send the local description to the remote + if let Some(local_desc) = peer_connection.local_description().await { + app_sender + .send(ApplicationMessage::WebRTCPacket(local_desc)) + .await + .expect("Could not send message to the socket message channel"); + } + info!("local information sent to the remote"); + + let notify_tx = Arc::new(Notify::new()); + let notify_video = notify_tx.clone(); + + setup_callbacks( + peer_connection.clone(), + to_quit_3, + app_sender.clone(), + notify_tx, + ) + .await; + info!("webrtc callbacks registered"); + + tokio::spawn(async move { + notify_video.notified().await; + + info!("Starting video stream!"); + + let mut stream = video_receiver_stream.lock().await; + + while let Some(map) = stream.recv().await { + if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { + stream.close(); break; } + if let Ok(buf) = map.map_readable() { + if let Err(err) = video_track.write(&buf).await { + if webrtc::Error::ErrClosedPipe == err { + error!("The peerConnection has been closed."); + } else { + error!("video_track write err: {}", err); + } + } + }; } - ApplicationMessage::TrackingBoxes(_) => { - error!("I got a tracking boxes message?"); - } - ApplicationMessage::ManualMovementOverride((x, y)) => { - if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await { - error!("Could not send manual override to state machine! {e}"); - break; + }); + + info!("starting webrtc connection kickoff"); + // send the offer and trickle ice candidates to the remote, and accept their description + if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { + to_quit.store(true, Ordering::SeqCst); + panic!("There was an issue with WebRTC setup! {e}"); + } + + info!("webrtc kickoff complete, entering watch mode"); + + // loop through messages coming from the remote. + while let Ok(msg) = app_receiver.recv().await { + if to_quit.load(Ordering::SeqCst) { break; } + match msg { + ApplicationMessage::WebRTCPacket(_pkt) => { + error!("don't know what to do with this packet!"); + } + ApplicationMessage::WebRTCIceCandidateInit(pkt) => { + if let Err(e) = peer_connection.add_ice_candidate(pkt).await { + error!("There was an error adding the trickle ICE candidate! {e}"); + } + } + ApplicationMessage::WebRTCIceCandidate(_pkg) => { + error!("Unhandled ice candidate!"); + } + ApplicationMessage::NameRequest(Some(name)) => info!("Got a message about '{}'", name), + ApplicationMessage::NameRequest(None) => { + if let Err(e) = app_sender + .send(ApplicationMessage::NameRequest(Some( + SATELLITE_NAME.to_owned(), + ))) + .await + { + error!("Could not let the remote know my name! {e}"); + } + } + ApplicationMessage::ChangeTrackingID(id) => { + if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await { + error!("Could not send message to tracker state! Closing down. {e}"); + break; + } + } + ApplicationMessage::TrackingBoxes(_) => { + error!("I got a tracking boxes message?"); + } + ApplicationMessage::ManualMovementOverride((x, y)) => { + if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await { + error!("Could not send manual override to state machine! {e}"); + break; + } } } } diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs index ef6dbde..ce76f6e 100644 --- a/src/tracker/mod.rs +++ b/src/tracker/mod.rs @@ -6,11 +6,12 @@ use crate::rig::ApplicationEvent; pub enum TrackerEvent { ChangeID(u16), + ChangeMEC(AppSender), } pub struct TrackerState { pub target_id: Option, - pub app_sender: AppSender, + pub app_sender: Option, } impl TrackerState {} @@ -19,11 +20,10 @@ pub async fn tracker_loop( to_tec: Sender, tec: Receiver, to_mec: Sender, - app_sender: AppSender, ) { let mut state = TrackerState { target_id: None, - app_sender, + app_sender: None, }; while let Ok(msg) = tec.recv().await { @@ -31,6 +31,9 @@ pub async fn tracker_loop( TrackerEvent::ChangeID(new_id) => { state.target_id = Some(new_id); } + TrackerEvent::ChangeMEC(new_mec) => { + state.app_sender = Some(new_mec); + } } // Pretend we processed a video frame! @@ -46,24 +49,27 @@ pub async fn tracker_loop( error!("Could not send tracking update to MEC! {e}"); } - if let Err(e) = state - .app_sender - .send(ApplicationMessage::TrackingBoxes( - vcs_common::types::TrackingUpdate { - target_id: None, - boxes: vec![vcs_common::types::NormalizedBoxCoords { - id: 1, - x1: 0.25, - y1: 0.25, - x2: 0.75, - y2: 0.75, - }], - }, - )) - .await - { - error!("Could not send message to remote core! {e}"); - break; + if state.app_sender.is_some() { + if let Err(e) = state + .app_sender + .as_ref().unwrap() + .send(ApplicationMessage::TrackingBoxes( + vcs_common::types::TrackingUpdate { + target_id: None, + boxes: vec![vcs_common::types::NormalizedBoxCoords { + id: 1, + x1: 0.25, + y1: 0.25, + x2: 0.75, + y2: 0.75, + }], + }, + )) + .await + { + error!("Could not send message to remote core! {e}"); + break; + } } } info!("Tracking loop is shutting down"); diff --git a/src/web_rtc.rs b/src/web_rtc.rs index b265683..5636649 100644 --- a/src/web_rtc.rs +++ b/src/web_rtc.rs @@ -4,7 +4,11 @@ use std::sync::{ }; use tokio::sync::Notify; -use tracing::{error, info, instrument}; +use tokio::net::TcpListener; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::accept_async; +use futures_util::{SinkExt, StreamExt}; +use tracing::{error, info, debug, warn, instrument}; use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; use webrtc::{ api::{ @@ -168,6 +172,9 @@ pub async fn kickoff_connection( ) -> Result<(), Error> { info!("Starting WebRTC handshake"); + warn!("Still using ice gathering complete"); + pc.gathering_complete_promise().await; + // Create and store the offer let offer = pc.create_offer(None).await?; @@ -198,7 +205,7 @@ pub async fn kickoff_connection( info!("added ice candidate"); break; } - Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { + Ok(ApplicationMessage::WebRTCIceCandidate(pkt)) => { error!("Got a non init ice candidate. Now what?"); } Ok(ApplicationMessage::NameRequest(Some(name))) => { @@ -227,3 +234,115 @@ pub async fn kickoff_connection( Ok(()) } + +pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender) { + if let Ok((stream, _)) = listener.accept().await { + match accept_async(stream).await { + Err(e) => error!("Could not convert incoming stream to websocket: {e}"), + Ok(ws_stream) => { + let (mut ws_sender, mut ws_receiver) = ws_stream.split(); + + tokio::spawn(async move { + while let Ok(msg) = from_app.recv().await { + #[cfg(debug_assertions)] + { + // serialized message + match serde_json::to_string(&msg) { + Err(e) => { + error!("Could not serialize ApplicationMessage to JSON! {e}") + } + Ok(msg) => { + if let Err(e) = ws_sender.send(Message::text(msg)).await { + error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + } + + #[cfg(not(debug_assertions))] + { + match bincode::serialize(&msg) { + Err(e) => error!( + "Could not serialize ApplicationMessage into binary! {e}" + ), + Ok(e) => { + if let Err(e) = sender.send(Message::binary(msg)).await { + error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + } + } + }); + + tokio::spawn(async move { + while let Some(msg) = ws_receiver.next().await { + match msg { + Err(e) => { + error!("There was an error getting a message from the remote! {e}"); + } + Ok(msg) => match msg { + Message::Ping(_) | Message::Pong(_) => {} + Message::Close(_) => { + info!("Received WebSocket close message! Closing the websocket"); + break; + } + Message::Frame(_) => { + info!("Received a Frame websocket message?"); + } + Message::Text(text) => { + debug!("Recieved text from websocket: {text}"); + #[cfg(debug_assertions)] + { + match serde_json::from_str(&text) { + Ok(msg) => { + if let Err(e) = to_ui.send(msg).await { + error!("Could not send message from ws to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); + } + } + } + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Text` message from the remote while running in release mode! " + + "Was the other endpoint running release mode?\n msg: {text}"); + } + } + Message::Binary(msg) => { + #[cfg(debug_assertions)] + { + match bincode::deserialize::(&msg) { + Ok(m) => { + if let Err(e) = to_ui.send(m).await { + error!("Could not send message to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed binary message from the websocket!\n{e}"); + } + } + } + + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Binary` message from the remote while running in debug mode! " + + "Was the other endpoing running debug mode?"); + } + } + }, + } + } + warn!("The websocket listener closed"); + }); + } + } + } +} +