diff --git a/Cargo.lock b/Cargo.lock index 7221040..5cb3699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,7 +124,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", "synstructure 0.13.1", ] @@ -147,7 +147,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -170,7 +170,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -295,9 +295,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.7" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" +checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" +dependencies = [ + "shlex", +] [[package]] name = "ccm" @@ -313,9 +316,9 @@ dependencies = [ [[package]] name = "cfg-expr" -version = "0.15.8" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +checksum = "345c78335be0624ed29012dc10c49102196c6882c12dde65d9f35b02da2aada8" dependencies = [ "smallvec", "target-lexicon", @@ -327,12 +330,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" - [[package]] name = "cipher" version = "0.4.4" @@ -409,9 +406,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +checksum = "51e852e6dc9a5bed1fae92dd2375037bf2b768725bf3be87811edee3249d09ad" dependencies = [ "libc", ] @@ -475,16 +472,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "ctrlc" -version = "3.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "672465ae37dc1bc6380a6547a8883d5dd397b0f1faaad4f265726cc7042a5345" -dependencies = [ - "nix 0.28.0", - "windows-sys", -] - [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -508,7 +495,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -584,7 +571,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -751,7 +738,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -824,9 +811,9 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" [[package]] name = "gio-sys" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4feb96b31c32730ea3e1e89aecd2e4e37ecb1c473ad8f685e3430a159419f63" +checksum = "5237611e97e9b86ab5768adc3eef853ae713ea797aa3835404acdfacffc9fb38" dependencies = [ "glib-sys", "gobject-sys", @@ -837,9 +824,9 @@ dependencies = [ [[package]] name = "glib" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fee90a615ce05be7a32932cfb8adf2c4bbb4700e80d37713c981fb24c0c56238" +checksum = "b19429cb83fcbf1f00b31ae3a123fab5cd3761bdd15b0cc07905804742f0d0e4" dependencies = [ "bitflags 2.6.0", "futures-channel", @@ -854,27 +841,26 @@ dependencies = [ "libc", "memchr", "smallvec", - "thiserror", ] [[package]] name = "glib-macros" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4da558d8177c0c8c54368818b508a4244e1286fce2858cef4e547023f0cfa5ef" +checksum = "960349f56469b75794157e93bf04f0bc8a622d0a6612d6a8f8d7eac41e0e1ee1" dependencies = [ "heck", "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] name = "glib-sys" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4958c26e5a01c9af00dea669a97369eccbec29a8e6d125c24ea2d85ee7467b60" +checksum = "44edae63bea922f18f7e63977ee60a257ec27c4613aff1a6a9bb572ad0d88269" dependencies = [ "libc", "system-deps", @@ -882,9 +868,9 @@ dependencies = [ [[package]] name = "gobject-sys" -version = "0.20.0" +version = "0.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6908864f5ffff15b56df7e90346863904f49b949337ed0456b9287af61903b8" +checksum = "fa3d1dcd8a1eb2e7c22be3d5e792b14b186f3524f79b25631730f9a8c169d49a" dependencies = [ "glib-sys", "libc", @@ -1071,9 +1057,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0" +checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c" dependencies = [ "equivalent", "hashbrown 0.14.5", @@ -1149,9 +1135,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.155" +version = "0.2.158" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" [[package]] name = "linked-hash-map" @@ -1217,9 +1203,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ "hermit-abi", "libc", @@ -1246,18 +1232,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "nix" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "nom" version = "7.1.3" @@ -1324,9 +1298,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.2" +version = "0.36.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9" dependencies = [ "memchr", ] @@ -1498,7 +1472,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -1864,29 +1838,29 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.208" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] name = "serde_json" -version = "1.0.122" +version = "1.0.125" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" dependencies = [ "itoa", "memchr", @@ -1934,6 +1908,21 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -1986,7 +1975,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2062,9 +2051,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.72" +version = "2.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" dependencies = [ "proc-macro2", "quote", @@ -2091,14 +2080,14 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] name = "system-deps" -version = "7.0.1" +version = "7.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c81f13d9a334a6c242465140bd262fae382b752ff2011c4f7419919a9c97922" +checksum = "070a0a5e7da2d24be457809c4b3baa57a835fd2829ad8b86f9a049052fe71031" dependencies = [ "cfg-expr", "heck", @@ -2130,7 +2119,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2200,9 +2189,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.39.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5" dependencies = [ "backtrace", "bytes", @@ -2210,6 +2199,7 @@ dependencies = [ "mio", "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -2223,7 +2213,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2315,7 +2305,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2433,9 +2423,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" [[package]] name = "unicode-xid" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +checksum = "229730647fbc343e3a80e463c1db7f78f3855d3f3739bee0dda773c9a037c90a" [[package]] name = "universal-hash" @@ -2492,7 +2482,6 @@ dependencies = [ "anyhow", "async-channel", "config", - "ctrlc", "gstreamer", "gstreamer-app", "log", @@ -2511,7 +2500,18 @@ dependencies = [ [[package]] name = "vcs-common" version = "0.1.0" -source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#898d5181e08ed7cbfef745bf9036eae71a08be52" +source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#1e37189866720e8d99b22492a78f47d220742831" +dependencies = [ + "async-channel", + "bincode", + "futures-util", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "webrtc", +] [[package]] name = "version-compare" @@ -2741,7 +2741,7 @@ dependencies = [ "lazy_static", "libc", "log", - "nix 0.26.4", + "nix", "portable-atomic", "rand", "thiserror", @@ -2928,7 +2928,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] [[package]] @@ -2948,5 +2948,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.72", + "syn 2.0.75", ] diff --git a/Cargo.toml b/Cargo.toml index ac3638e..a201576 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ edition = "2021" [dependencies] anyhow = "1.0.86" config = "0.14.0" -ctrlc = "3.4.4" gstreamer = { version = "0.23.0", features = ["v1_22"] } gstreamer-app = { version = "0.23.0", features = ["v1_22"] } @@ -16,7 +15,7 @@ log = "0.4.22" serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.122" snafu = "0.8.4" -tokio = "1.39.2" +tokio = { version = "1.39.2", features = ["signal"] } tokio-tungstenite = "0.23.1" toml = "0.8.19" tracing = "0.1.40" diff --git a/src/gst.rs b/src/gst.rs new file mode 100644 index 0000000..0efd2e6 --- /dev/null +++ b/src/gst.rs @@ -0,0 +1,45 @@ +use gstreamer::{ + self as gst, + prelude::{Cast, GstBinExtManual}, + ElementFactory, +}; +use gstreamer_app as gst_app; + +pub struct Pipeline { + pub pipeline: gst::Pipeline, + + pub sink: gst_app::AppSink, +} + +pub fn new_pipeline() -> Pipeline { + let pipeline = gst::Pipeline::builder() + .name("camera_to_rtp_pipeine") + .build(); + + let source = ElementFactory::make("v4l2src").build().unwrap(); + + let video_convert = ElementFactory::make("videoconvert").build().unwrap(); + + let vp8enc = ElementFactory::make("vp8enc").build().unwrap(); + + let rtp = ElementFactory::make("rtpvp8pay").build().unwrap(); + + let app_sink = gst_app::AppSink::builder().build(); + + pipeline + .add_many([ + &source, + &video_convert, + &vp8enc, + &rtp, + app_sink.upcast_ref(), + ]) + .expect("Could not add all the stuff to the pipeline"); + + gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp]).unwrap(); + + Pipeline { + pipeline, + sink: app_sink, + } +} diff --git a/src/main.rs b/src/main.rs index c398123..1888488 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,29 @@ -use std::sync::{atomic::AtomicBool, Arc}; +use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; use anyhow::Error; -use gstreamer::{prelude::*, Element, ElementFactory, Pipeline, State}; +use gstreamer::{prelude::ElementExt, State}; +use gstreamer::Buffer; +use gstreamer_app as gst_app; +use tokio::sync::Notify; +use tracing::{error, info}; +use webrtc::{ + api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, + ice_transport::ice_connection_state::RTCIceConnectionState, + peer_connection::{peer_connection_state::RTCPeerConnectionState, sdp::session_description::RTCSessionDescription}, + rtp_transceiver::rtp_codec::RTCRtpCodecCapability, + track::track_local::{ + track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter, + }, +}; + +use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; mod config; -mod webrtc; +mod gst; +mod web_rtc; -fn main() -> Result<(), Error> { +#[tokio::main] +async fn main() -> Result<(), Error> { // TRACING SETUP let sub = tracing_subscriber::FmtSubscriber::new(); if let Err(e) = tracing::subscriber::set_global_default(sub) { @@ -18,24 +35,185 @@ fn main() -> Result<(), Error> { // EXIT HANDLER let to_quit = Arc::new(AtomicBool::new(false)); let to_quit_2 = to_quit.clone(); - - ctrlc::set_handler(move || { - to_quit_2.store(true, std::sync::atomic::Ordering::SeqCst); - })?; + let to_quit_3 = to_quit.clone(); + let to_quit_4 = to_quit.clone(); + let to_quit_5 = to_quit.clone(); // GSTREAMER SETUP gstreamer::init()?; - loop { - if to_quit.load(std::sync::atomic::Ordering::SeqCst) { - println!("Recieved Ctrl+C, stopping"); + let pipeline = gst::new_pipeline(); - break; + let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); + + let (send_oneshot, recv_oneshot) = tokio::sync::oneshot::channel::< + Result<(AppSender, AppReceiver), tokio_tungstenite::tungstenite::Error>, + >(); + + // connect to remote server + tokio::spawn(vcs_common::connect_to_server(format!("ws://{}:{}", config.destination_ip, config.destination_port), send_oneshot)); + + let app_sender: AppSender; + let app_receiver: AppReceiver; + match recv_oneshot.await { + Err(e) => { + panic!("Could not get connection status from oneshot!! {e}"); + } + Ok(Err(e)) => { + panic!("Could not connect to vcs websocket! {e}"); + } + Ok(Ok((sender, recvr))) => { + app_sender = sender; + app_receiver = recvr; } - std::thread::sleep(std::time::Duration::from_millis(200)); } - println!("Success!"); + + + let peer_connection = web_rtc::setup_webrtc().await; + + + + while let Ok(msg) = app_receiver.recv().await { + match msg { + ApplicationMessage::WebRTCPacket(pkt) => { + peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}"); + break; + } + } + } + + let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection"); + + let mut gather_complete = peer_connection.gathering_complete_promise().await; + + peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection"); + + let _ = gather_complete.recv().await; + + if let Some(local_desc) = peer_connection.local_description().await { + app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel"); + } + + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_ice_connection_state_change(Box::new( + move |connection_state: RTCIceConnectionState| { + println!("Connection State has changed {connection_state}"); + if connection_state == RTCIceConnectionState::Failed { + to_quit_3.store(true, Ordering::SeqCst); + } + Box::pin(async {}) + }, + )); + + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + println!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + println!("Peer Connection has gone to failed exiting: Done forwarding"); + to_quit_4.store(true, Ordering::SeqCst) + } + + Box::pin(async {}) + })); + + let notify_tx = Arc::new(Notify::new()); + let notify_video = notify_tx.clone(); + + let video_track = Arc::new(TrackLocalStaticRTP::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP8.to_owned(), + ..Default::default() + }, + "video_test".to_owned(), + "webrtc_test".to_owned(), + )); + + let rtp_sender = peer_connection + .add_track(Arc::clone(&video_track) as Arc) + .await + .unwrap(); + + tokio::spawn(async move { + // handle incoming packets that have been processed by the + // interceptors + let mut rtcp_buf = vec![0u8; 1500]; + while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} + webrtc::util::Result::<()>::Ok(()) + }); + + tokio::spawn(async move { + notify_video.notified().await; + + info!("Starting video stream!"); + + // It is important to use a time.Ticker instead of time.Sleep because + // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data + // * works around latency issues with Sleep + // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. + // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. + + while let Some(map) = stream.recv().await { + if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { + stream.close(); + break; + } + if let Ok(buf) = map.map_readable() { + if let Err(err) = video_track.write(&buf).await { + if webrtc::Error::ErrClosedPipe == err { + println!("The peerConnection has been closed."); + } else { + println!("video_track write err: {}", err); + } + } + }; + } + }); + + pipeline.sink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |app_sink| { + let sample = app_sink + .pull_sample() + .map_err(|_| gstreamer::FlowError::Eos)?; + let buffer = sample.buffer_owned().ok_or_else(|| { + gstreamer::element_error!( + app_sink, + gstreamer::ResourceError::Failed, + ("Failed to get buffer from appsink") + ); + gstreamer::FlowError::Error + })?; + + if let Err(e) = to_stream.blocking_send(buffer) { + error!("Error sending to stream of buffers: {e}"); + to_quit_5.store(true, Ordering::SeqCst); + } + + Ok(gstreamer::FlowSuccess::Ok) + }) + .build(), + ); + + if let Err(e) = pipeline.pipeline.set_state(State::Playing) { + panic!("Could not start pipeline! {e}"); + } + + tokio::spawn(async move { + match tokio::signal::ctrl_c().await { + Ok(()) => to_quit.store(true, std::sync::atomic::Ordering::SeqCst), + Err(e) => { + error!("Could not watch ctrl_c signal! {e}"); + } + } + }); Ok(()) } diff --git a/src/web_rtc.rs b/src/web_rtc.rs new file mode 100644 index 0000000..f95dc81 --- /dev/null +++ b/src/web_rtc.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use tracing::info; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder, + }, + ice_transport::ice_server::RTCIceServer, + interceptor::registry::Registry, + peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, +}; + +pub async fn setup_webrtc() -> Arc { + // WebRTC stuff + let mut m = MediaEngine::default(); + + if let Err(e) = m.register_default_codecs() { + // replace this with VP8 specific? + panic!("Could not register default codecs for webrtc! {e}"); + } + + info!("Default codecs registered"); + + // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. + // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` + // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry + // for each PeerConnection. + let mut registry = Registry::new(); // may be able to prune for + + registry = register_default_interceptors(registry, &mut m).unwrap(); + + let api = APIBuilder::new() + .with_media_engine(m) + .with_interceptor_registry(registry) + .build(); + + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and + // password info here too + ..Default::default() + }], + ..Default::default() + }; + + // TODO: remove this unwrap + // this is the offering for the remove device + return Arc::new(api.new_peer_connection(config).await.unwrap()); +} diff --git a/src/webrtc.rs b/src/webrtc.rs deleted file mode 100644 index 1702055..0000000 --- a/src/webrtc.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::{sync::Arc, time::Duration}; -use std::io::Write; - -use gstreamer::Buffer; - -use tokio::sync::Notify; -use tracing::{error, info}; -use webrtc::{ - api::{ - interceptor_registry::register_default_interceptors, - media_engine::{MediaEngine, MIME_TYPE_VP9}, - APIBuilder, - }, - ice_transport::ice_server::RTCIceServer, - interceptor::registry::Registry, - peer_connection::configuration::RTCConfiguration, - rtp_transceiver::rtp_codec::RTCRtpCodecCapability, - track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter}, -}; - -pub async fn setup_webrtc() { - let mut m = MediaEngine::default(); - - if let Err(e) = m.register_default_codecs() { - // replace this with VP8 specific? - panic!("Could not register default codecs for webrtc!"); - } - - info!("Default codecs registered"); - - // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. - // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` - // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry - // for each PeerConnection. - let mut registry = Registry::new(); // may be able to prune for - - registry = register_default_interceptors(registry, &mut m).unwrap(); - - let api = APIBuilder::new() - .with_media_engine(m) - .with_interceptor_registry(registry) - .build(); - - let config = RTCConfiguration { - ice_servers: vec![RTCIceServer { - urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and - // password info here too - ..Default::default() - }], - ..Default::default() - }; - - // TODO: remove this unwrap - // this is the offering for the remove device - let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap()); - - let notify_tx = Arc::new(Notify::new()); - let notify_video = notify_tx.clone(); - let notify_audio = notify_tx.clone(); - - let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); - let video_done_tx = done_tx.clone(); - let audio_done_tx = done_tx.clone(); - - let video_track = Arc::new(TrackLocalStaticRTP::new( - RTCRtpCodecCapability { - mime_type: MIME_TYPE_VP9.to_owned(), - ..Default::default() - }, - "video_test".to_owned(), - "webrtc_test".to_owned(), - )); - - let rtp_sender = peer_connection - .add_track(Arc::clone(&video_track) as Arc) - .await - .unwrap(); - - tokio::spawn(async move { - // handle incoming packets that have been processed by the - // interceptors - let mut rtcp_buf = vec![0u8; 1500]; - while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} - webrtc::util::Result::<()>::Ok(()) - }); - - let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); - - tokio::spawn(async move { - notify_video.notified().await; - - info!("Starting video stream!"); - - // It is important to use a time.Ticker instead of time.Sleep because - // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data - // * works around latency issues with Sleep - // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. - // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. - - while let Some(map) = stream.recv().await { - if let Ok(buf) = map.map_readable() { - if let Err(err) = video_track.write(&buf).await { - if webrtc::Error::ErrClosedPipe == err { - println!("The peerConnection has been closed."); - } else { - println!("video_track write err: {}", err); - } - } - }; - } - }); - - // TODO: Initialize the gstreamer app, and start sending frames to the to_stream mpsc -}