diff --git a/Cargo.lock b/Cargo.lock index e634a47..7221040 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -150,6 +150,18 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-trait" version = "0.1.81" @@ -331,6 +343,15 @@ dependencies = [ "inout", ] +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "config" version = "0.14.0" @@ -410,6 +431,12 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + [[package]] name = "crunchy" version = "0.2.2" @@ -616,6 +643,27 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "event-listener" +version = "5.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "ff" version = "0.13.0" @@ -1353,6 +1401,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2436,6 +2490,7 @@ name = "vcs-camera-satellite" version = "0.1.0" dependencies = [ "anyhow", + "async-channel", "config", "ctrlc", "gstreamer", diff --git a/Cargo.toml b/Cargo.toml index 7e23297..ac3638e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,3 +24,4 @@ tracing-subscriber = "0.3.18" webrtc = "0.11.0" vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } +async-channel = "2.3.1" diff --git a/src/main.rs b/src/main.rs index 396ff05..c398123 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,6 +4,7 @@ use anyhow::Error; use gstreamer::{prelude::*, Element, ElementFactory, Pipeline, State}; mod config; +mod webrtc; fn main() -> Result<(), Error> { // TRACING SETUP @@ -24,61 +25,6 @@ fn main() -> Result<(), Error> { // GSTREAMER SETUP gstreamer::init()?; -<<<<<<< HEAD - gstrswebrtc::plugin_register_static()?; - - gstrsrtp::plugin_register_static()?; - // gstrsrtsp::plugin_register_static()?; - // gstwebrtchttp::plugin_register_static()?; - // gsthlssink3::plugin_register_static()?; - - // working pipeline - // gst-launch-1.0 videotestsrc \ - // ! video/x-raw,width=1920,height=1080,format=I420 \ - // ! x264enc speed-preset=ultrafast bitrate=2000 \ - // ! video/x-h264,profile=baseline \ - // ! whipclientsink signaller::whip-endpoint=http://localhost:8889/mystream/whip - - let pipeline = Pipeline::with_name("rstp-pipeline"); - - let videotestsrc = ElementFactory::make("videotestsrc").build().unwrap(); - let x264enc = ElementFactory::make("x264enc").build().unwrap(); - let whipclientsink = ElementFactory::make("whipclientsink").build().unwrap(); - - // x264enc.set_property("speed-preset", &"ultrafast"); - x264enc.set_property("bitrate", 2000 as u32); - - if let Some(whipsink) = whipclientsink.dynamic_cast_ref::() { - let signaller = whipsink.property::("signaller"); - signaller.set_property_from_str( - "whip-endpoint", - "http://localhost:8080/whip" - ); - } - - for i in whipclientsink.list_properties() { - println!("{:?}", i.type_()); - } - - pipeline - .add_many(&[&videotestsrc, &x264enc, &whipclientsink]) - .unwrap(); - Element::link_many(&[&videotestsrc, &x264enc, &whipclientsink]) - .unwrap(); - - // PIPELINE START - pipeline.set_state(State::Playing).unwrap(); - - let bus = pipeline.bus().unwrap(); - - let _ = bus.timed_pop_filtered( - gstreamer::ClockTime::NONE, - &[gstreamer::MessageType::Eos] - ); -======= - - - loop { if to_quit.load(std::sync::atomic::Ordering::SeqCst) { @@ -88,7 +34,6 @@ fn main() -> Result<(), Error> { } std::thread::sleep(std::time::Duration::from_millis(200)); } ->>>>>>> 29a8ccf (added vcs-common) println!("Success!"); diff --git a/src/webrtc.rs b/src/webrtc.rs new file mode 100644 index 0000000..1702055 --- /dev/null +++ b/src/webrtc.rs @@ -0,0 +1,114 @@ +use std::{sync::Arc, time::Duration}; +use std::io::Write; + +use gstreamer::Buffer; + +use tokio::sync::Notify; +use tracing::{error, info}; +use webrtc::{ + api::{ + interceptor_registry::register_default_interceptors, + media_engine::{MediaEngine, MIME_TYPE_VP9}, + APIBuilder, + }, + ice_transport::ice_server::RTCIceServer, + interceptor::registry::Registry, + peer_connection::configuration::RTCConfiguration, + rtp_transceiver::rtp_codec::RTCRtpCodecCapability, + track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter}, +}; + +pub async fn setup_webrtc() { + let mut m = MediaEngine::default(); + + if let Err(e) = m.register_default_codecs() { + // replace this with VP8 specific? + panic!("Could not register default codecs for webrtc!"); + } + + info!("Default codecs registered"); + + // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. + // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` + // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry + // for each PeerConnection. + let mut registry = Registry::new(); // may be able to prune for + + registry = register_default_interceptors(registry, &mut m).unwrap(); + + let api = APIBuilder::new() + .with_media_engine(m) + .with_interceptor_registry(registry) + .build(); + + let config = RTCConfiguration { + ice_servers: vec![RTCIceServer { + urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and + // password info here too + ..Default::default() + }], + ..Default::default() + }; + + // TODO: remove this unwrap + // this is the offering for the remove device + let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap()); + + let notify_tx = Arc::new(Notify::new()); + let notify_video = notify_tx.clone(); + let notify_audio = notify_tx.clone(); + + let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); + let video_done_tx = done_tx.clone(); + let audio_done_tx = done_tx.clone(); + + let video_track = Arc::new(TrackLocalStaticRTP::new( + RTCRtpCodecCapability { + mime_type: MIME_TYPE_VP9.to_owned(), + ..Default::default() + }, + "video_test".to_owned(), + "webrtc_test".to_owned(), + )); + + let rtp_sender = peer_connection + .add_track(Arc::clone(&video_track) as Arc) + .await + .unwrap(); + + tokio::spawn(async move { + // handle incoming packets that have been processed by the + // interceptors + let mut rtcp_buf = vec![0u8; 1500]; + while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} + webrtc::util::Result::<()>::Ok(()) + }); + + let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); + + tokio::spawn(async move { + notify_video.notified().await; + + info!("Starting video stream!"); + + // It is important to use a time.Ticker instead of time.Sleep because + // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data + // * works around latency issues with Sleep + // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. + // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. + + while let Some(map) = stream.recv().await { + if let Ok(buf) = map.map_readable() { + if let Err(err) = video_track.write(&buf).await { + if webrtc::Error::ErrClosedPipe == err { + println!("The peerConnection has been closed."); + } else { + println!("video_track write err: {}", err); + } + } + }; + } + }); + + // TODO: Initialize the gstreamer app, and start sending frames to the to_stream mpsc +}