From 8993750febc59b7b0e20fbeb8d38ae25b1b82af5 Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Mon, 26 Aug 2024 00:17:40 +0000 Subject: [PATCH] improved code modularity and comments --- Cargo.lock | 10 +-- Cargo.toml | 2 +- src/config.rs | 3 + src/gst.rs | 29 ++++++- src/main.rs | 225 ++++++++++++++----------------------------------- src/web_rtc.rs | 198 +++++++++++++++++++++++++++++++++++++++---- 6 files changed, 273 insertions(+), 194 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1163457..97c8283 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,12 +61,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "anyhow" -version = "1.0.86" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" - [[package]] name = "arc-swap" version = "1.7.1" @@ -2479,7 +2473,6 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" name = "vcs-camera-satellite" version = "0.1.0" dependencies = [ - "anyhow", "async-channel", "config", "gstreamer", @@ -2493,6 +2486,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", + "uuid", "vcs-common", "webrtc", ] @@ -2500,7 +2494,7 @@ dependencies = [ [[package]] name = "vcs-common" version = "0.1.0" -source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#59b0b88c53032514bf5ba68efe17b6f089734bfe" +source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#a89e970be35e01eb8a59c368e7b817e93984c137" dependencies = [ "async-channel", "bincode", diff --git a/Cargo.toml b/Cargo.toml index a201576..24185f6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -anyhow = "1.0.86" config = "0.14.0" gstreamer = { version = "0.23.0", features = ["v1_22"] } @@ -24,3 +23,4 @@ webrtc = "0.11.0" vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } async-channel = "2.3.1" +uuid = "1.10.0" diff --git a/src/config.rs b/src/config.rs index fb0eb81..67725bd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,6 +9,7 @@ use tracing::{info, instrument}; pub struct AppConfig { pub destination_ip: String, pub destination_port: u32, + pub aspect_ratio: f64, } impl Default for AppConfig { @@ -16,6 +17,8 @@ impl Default for AppConfig { AppConfig { destination_ip: "localhost".to_string(), destination_port: 7891, + + aspect_ratio: 16.0 / 9.0, } } } diff --git a/src/gst.rs b/src/gst.rs index 2d094e6..2358132 100644 --- a/src/gst.rs +++ b/src/gst.rs @@ -1,17 +1,21 @@ use gstreamer::{ self as gst, - prelude::{Cast, GstBinExtManual}, + prelude::{Cast, ElementExtManual, GstBinExtManual}, ElementFactory, }; use gstreamer_app as gst_app; +use crate::config::AppConfig; + pub struct Pipeline { pub pipeline: gst::Pipeline, pub sink: gst_app::AppSink, } -pub fn new_pipeline() -> Pipeline { +const HEIGHT: usize = 480; + +pub fn new_pipeline(config: &AppConfig) -> Pipeline { let pipeline = gst::Pipeline::builder() .name("camera_to_rtp_pipeine") .build(); @@ -20,6 +24,18 @@ pub fn new_pipeline() -> Pipeline { let video_convert = ElementFactory::make("videoconvert").build().unwrap(); + let video_scale = ElementFactory::make("videoscale") + .property("add-borders", true) + .build() + .unwrap(); + + let video_scale_caps = gstreamer::Caps::builder("video/x-raw") + .field("height", HEIGHT as i32) + .field("width", (HEIGHT as f64 * config.aspect_ratio) as i32) + .build(); + + // We are using VP8 because VP9 resulted in much worse video quality + // when testing -NY 8/25/2024 let vp8enc = ElementFactory::make("vp8enc").build().unwrap(); let rtp = ElementFactory::make("rtpvp8pay").build().unwrap(); @@ -30,13 +46,20 @@ pub fn new_pipeline() -> Pipeline { .add_many([ &source, &video_convert, + &video_scale, &vp8enc, &rtp, app_sink.upcast_ref(), ]) .expect("Could not add all the stuff to the pipeline"); - gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp, app_sink.upcast_ref()]).unwrap(); + gst::Element::link_many(&[&source, &video_convert, &video_scale]).unwrap(); + + video_scale + .link_filtered(&vp8enc, &video_scale_caps) + .expect("Could not link videoscale to vp8enc with caps!"); + + gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]).unwrap(); Pipeline { pipeline, diff --git a/src/main.rs b/src/main.rs index 9046686..f900b67 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,17 @@ -use std::{sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration}; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; -use anyhow::Error; -use gstreamer::{prelude::ElementExt, State}; use gstreamer::Buffer; +use gstreamer::{prelude::ElementExt, State}; use gstreamer_app as gst_app; use tokio::sync::Notify; use tracing::{error, info}; +use web_rtc::{kickoff_connection, setup_callbacks}; use webrtc::{ - api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, ice_transport::{ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState}, media::audio::buffer::info, peer_connection::peer_connection_state::RTCPeerConnectionState, rtp_transceiver::rtp_codec::RTCRtpCodecCapability, track::track_local::{ - track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter, - } + peer_connection::RTCPeerConnection, + track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter}, }; use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; @@ -19,7 +21,9 @@ mod gst; mod web_rtc; #[tokio::main] -async fn main() -> Result<(), Error> { +async fn main() { + let uuid = uuid::Uuid::new_v4().to_string(); + // TRACING SETUP let _sub = tracing_subscriber::fmt() .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) @@ -31,138 +35,73 @@ async fn main() -> Result<(), Error> { let to_quit = Arc::new(AtomicBool::new(false)); let to_quit_2 = to_quit.clone(); let to_quit_3 = to_quit.clone(); - let to_quit_4 = to_quit.clone(); let to_quit_5 = to_quit.clone(); // GSTREAMER SETUP - gstreamer::init()?; + if let Err(e) = gstreamer::init() { + panic!("Could not start gstreamer! {e}"); + } - let pipeline = gst::new_pipeline(); + let pipeline = gst::new_pipeline(&config); + // frame buffers from the gstreamer pipeline to send over the webrtc connection let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); - let (send_oneshot, recv_oneshot) = tokio::sync::oneshot::channel::< - Result<(AppSender, AppReceiver), tokio_tungstenite::tungstenite::Error>, - >(); - let rt = tokio::runtime::Handle::current(); - - // connect to remote server - tokio::spawn(vcs_common::connect_to_server(format!("ws://{}:{}", config.destination_ip, config.destination_port), send_oneshot, rt)); - let app_sender: AppSender; let app_receiver: AppReceiver; - match recv_oneshot.await { - Err(e) => { - panic!("Could not get connection status from oneshot!! {e}"); - } - Ok(Err(e)) => { - panic!("Could not connect to vcs websocket! {e}"); - } - Ok(Ok((sender, recvr))) => { + + // connect to remote server + match vcs_common::connect_to_server( + format!("ws://{}:{}", config.destination_ip, config.destination_port), + rt, + ) + .await + { + Err(e) => panic!("There was an error connecting to the remote: {e}"), + Ok((sender, recvr)) => { app_sender = sender; app_receiver = recvr; } } - - - let peer_connection = Arc::new(web_rtc::setup_webrtc().await); - - let video_track = Arc::new(TrackLocalStaticRTP::new( - RTCRtpCodecCapability { - mime_type: MIME_TYPE_VP8.to_owned(), - ..Default::default() - }, - "video_test".to_owned(), - "webrtc_test".to_owned(), - )); - - let rtp_sender = peer_connection - .add_track(Arc::clone(&video_track) as Arc) - .await - .unwrap(); - - info!("VP8 track has been added to peer connection"); - - - if let Some(local_desc) = peer_connection.local_description().await { - app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel"); + // Set up the webrtc peer connection + let peer_connection: Arc; + let video_track: Arc; + match web_rtc::setup_webrtc(uuid.clone()).await { + Err(e) => { + panic!("Could not set up WebRTC connection! {e}"); + } + Ok((pc, vt)) => { + peer_connection = pc; + video_track = vt; + } } - - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - peer_connection.on_ice_connection_state_change(Box::new( - move |connection_state: RTCIceConnectionState| { - info!("Connection State has changed {connection_state}"); - if connection_state == RTCIceConnectionState::Failed { - error!("On Ice Candidate called for quit!"); - to_quit_3.store(true, Ordering::SeqCst); - } - Box::pin(async {}) - }, - )); - - let pc = Arc::downgrade(&peer_connection); - let to_wc_2 = app_sender.clone(); - peer_connection.on_ice_candidate(Box::new(move |c: Option| { - //println!("on_ice_candidate {:?}", c); - let to_wc_3 = to_wc_2.clone(); - - Box::pin(async move { - if c.is_some() { - info!("trickle ICE notification"); - if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c.unwrap())) { - error!("Could not send ice candidate to other end! {:?}", e); - } - } - }) - })); - + // Send the local description to the remote + if let Some(local_desc) = peer_connection.local_description().await { + app_sender + .send(ApplicationMessage::WebRTCPacket(local_desc)) + .await + .expect("Could not send message to the socket message channel"); + } let notify_tx = Arc::new(Notify::new()); let notify_video = notify_tx.clone(); - // Set the handler for Peer connection state - // This will notify you when the peer has connected/disconnected - peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - info!("Peer Connection State has changed: {s}"); - - if s == RTCPeerConnectionState::Connected { - notify_tx.notify_one(); - } - - if s == RTCPeerConnectionState::Failed { - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. - error!("Peer Connection has gone to failed exiting: Done forwarding"); - to_quit_4.store(true, Ordering::SeqCst) - } - - Box::pin(async {}) - })); - - tokio::spawn(async move { - // handle incoming packets that have been processed by the - // interceptors - let mut rtcp_buf = vec![0u8; 1500]; - while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} - webrtc::util::Result::<()>::Ok(()) - }); + setup_callbacks( + peer_connection.clone(), + to_quit_3, + app_sender.clone(), + notify_tx, + ) + .await; tokio::spawn(async move { notify_video.notified().await; info!("Starting video stream!"); - // It is important to use a time.Ticker instead of time.Sleep because - // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data - // * works around latency issues with Sleep - // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. - // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. - while let Some(map) = stream.recv().await { if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { stream.close(); @@ -205,61 +144,18 @@ async fn main() -> Result<(), Error> { .build(), ); - - - - - info!("Starting WebRTC handshake"); - let offer = peer_connection.create_offer(None).await.unwrap(); - - peer_connection.set_local_description(offer.clone()).await.expect("Could not set local description from offer"); - - if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await { - error!("Could not send offer to app_sender!"); - panic!("{}", e); + // send the offer and trickle ice candidates to the remote, and accept their description + if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { + to_quit.store(true, Ordering::SeqCst); + panic!("There was an issue with WebRTC setup! {e}"); } - info!("Offer sent!"); - - loop { - match app_receiver.recv().await { - Ok(ApplicationMessage::WebRTCPacket(pkt)) => { - info!("Recieved response package! {:?}", pkt); - peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}"); - error!("Remote Description has been registered!"); - } - Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => { - peer_connection.add_ice_candidate(pkt).await.expect("Could not assign new ice candidate"); - info!("added ice candidate"); - break; - } - Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { - error!("Got a non init ice candidate. Now what?"); - } - Err(e) => { - panic!("Channel was closed?!"); - } - } - } - - info!("Get a response assumedly"); - - tokio::time::sleep(Duration::from_millis(5000)).await; - - // let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection"); - - // peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection"); - - - - - - - + // Start the gstreamer pipeline if let Err(e) = pipeline.pipeline.set_state(State::Playing) { panic!("Could not start pipeline! {e}"); } + // set up handler for ctrl_c to quit the application tokio::spawn(async move { match tokio::signal::ctrl_c().await { Ok(()) => { @@ -271,19 +167,20 @@ async fn main() -> Result<(), Error> { } }); + // loop through messages coming from the remote. while let Ok(msg) = app_receiver.recv().await { match msg { ApplicationMessage::WebRTCPacket(pkt) => { error!("don't know what to do with this packet!"); } ApplicationMessage::WebRTCIceCandidateInit(pkt) => { - error!("Unhandled ice init candidate!"); + if let Err(e) = peer_connection.add_ice_candidate(pkt).await { + error!("There was an error adding the trickle ICE candidate! {e}"); + } } ApplicationMessage::WebRTCIceCandidate(pkg) => { error!("Unhandled ice candidate!"); } } } - - Ok(()) } diff --git a/src/web_rtc.rs b/src/web_rtc.rs index f95dc81..3c0c2e7 100644 --- a/src/web_rtc.rs +++ b/src/web_rtc.rs @@ -1,17 +1,38 @@ -use std::sync::Arc; - -use tracing::info; -use webrtc::{ - api::{ - interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, - }, - ice_transport::ice_server::RTCIceServer, - interceptor::registry::Registry, - peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, }; -pub async fn setup_webrtc() -> Arc { - // WebRTC stuff +use tokio::sync::Notify; +use tracing::{error, info, instrument}; +use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, + media_engine::{MediaEngine, MIME_TYPE_VP8}, + APIBuilder, + }, + ice_transport::{ + ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState, + ice_server::RTCIceServer, + }, + interceptor::registry::Registry, + peer_connection::{ + configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState, + RTCPeerConnection, + }, + rtp_transceiver::rtp_codec::RTCRtpCodecCapability, + track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal}, + Error, +}; + +/// Boilerplate for creating the initial Peer Connection +/// Takes a unique ID representing this instance to name the tracks and streams +#[instrument] +pub async fn setup_webrtc( + machine_uuid: String, +) -> Result<(Arc, Arc), Error> { + info!("Setting up webrtc engine and peer connection"); let mut m = MediaEngine::default(); if let Err(e) = m.register_default_codecs() { @@ -19,21 +40,20 @@ pub async fn setup_webrtc() -> Arc { panic!("Could not register default codecs for webrtc! {e}"); } - info!("Default codecs registered"); - // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry // for each PeerConnection. let mut registry = Registry::new(); // may be able to prune for - registry = register_default_interceptors(registry, &mut m).unwrap(); + registry = register_default_interceptors(registry, &mut m)?; let api = APIBuilder::new() .with_media_engine(m) .with_interceptor_registry(registry) .build(); + // TODO figure out a STUN server or lack of one let config = RTCConfiguration { ice_servers: vec![RTCIceServer { urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and @@ -43,7 +63,149 @@ pub async fn setup_webrtc() -> Arc { ..Default::default() }; - // TODO: remove this unwrap - // this is the offering for the remove device - return Arc::new(api.new_peer_connection(config).await.unwrap()); + // the connection object that stores all information and callbacks + // for a connection + let peer_connection = Arc::new(api.new_peer_connection(config).await?); + + info!("Peer Connection initialized"); + + let video_track = Arc::new(TrackLocalStaticRTP::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + machine_uuid.clone() + "_track", + machine_uuid.clone() + "_video_stream", + )); + + let rtp_sender = peer_connection + .add_track(Arc::clone(&video_track) as Arc) + .await?; + + info!("Video track has been added to peer connection"); + + tokio::spawn(async move { + // handle incoming packets that have been processed by the + // interceptors + let mut rtcp_buf = vec![0u8; 1500]; + while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} + webrtc::util::Result::<()>::Ok(()) + }); + + return Ok((peer_connection, video_track)); +} + +/// Register callbacks for the Peer Connection +/// sets up on_ice_connection_state_change (live), on_ice_candidate (local) +/// and on_peer_connection_state_change (live) +/// takes the peer connection, the flag to indicate if the program should quit +/// a reference to the Sender Channel to the remote, and a notifier for when +/// the video should begin being sent +#[instrument(skip_all)] +pub async fn setup_callbacks( + pc: Arc, + to_quit: Arc, + app_sender: AppSender, + notify_video: Arc, +) { + let to_quit_3 = to_quit.clone(); + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + pc.on_ice_connection_state_change(Box::new(move |connection_state: RTCIceConnectionState| { + info!("Connection State has changed {connection_state}"); + if connection_state == RTCIceConnectionState::Failed { + error!("On Ice Candidate called for quit!"); + to_quit_3.store(true, Ordering::SeqCst); + } + Box::pin(async {}) + })); + + let to_wc_2 = app_sender.clone(); + pc.on_ice_candidate(Box::new(move |c: Option| { + //println!("on_ice_candidate {:?}", c); + let to_wc_3 = to_wc_2.clone(); + + Box::pin(async move { + if let Some(c) = c { + if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c)) { + error!("Could not send ice candidate to other end! {:?}", e); + } + info!("New trickle ICE packet sent"); + } + }) + })); + + let to_quit_4 = to_quit.clone(); + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + info!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Connected { + notify_video.notify_one(); + } + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + error!("Peer Connection has gone to failed exiting: Done forwarding"); + to_quit_4.store(true, Ordering::SeqCst) + } + + Box::pin(async {}) + })); +} + +/// WebRTC steps for offering a connection, trickle ICE, and +/// finish negotiion +pub async fn kickoff_connection( + pc: &Arc, + app_sender: AppSender, + app_receiver: &AppReceiver, +) -> Result<(), Error> { + info!("Starting WebRTC handshake"); + + // Create and store the offer + let offer = pc.create_offer(None).await?; + + pc.set_local_description(offer.clone()).await?; + + // Send the offer to the remote + if let Err(e) = app_sender + .send(ApplicationMessage::WebRTCPacket(offer)) + .await + { + error!("Could not send offer to app_sender!"); + panic!("{}", e); + } + + info!("WebRTC offer sent!"); + + // Wait for the response. The RemoteDescription will come first,followed by several + // trickle ICE candidates. Waiting for only one trickle ICE candidate is probably bad + // practice, but it's working right now + loop { + match app_receiver.recv().await { + Ok(ApplicationMessage::WebRTCPacket(pkt)) => { + pc.set_remote_description(pkt).await?; + info!("Remote Description has been registered!"); + } + Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => { + pc.add_ice_candidate(pkt).await?; + info!("added ice candidate"); + break; + } + Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { + error!("Got a non init ice candidate. Now what?"); + } + Err(e) => { + panic!("WC channel was closed?! {e}"); + } + } + } + + info!("WebRTC Handshake complete; remote description and ice candidates selected"); + + Ok(()) }