use std::{sync::Arc, time::Duration}; use std::io::Write; use gstreamer::Buffer; use tokio::sync::Notify; use tracing::{error, info}; use webrtc::{ api::{ interceptor_registry::register_default_interceptors, media_engine::{MediaEngine, MIME_TYPE_VP9}, APIBuilder, }, ice_transport::ice_server::RTCIceServer, interceptor::registry::Registry, peer_connection::configuration::RTCConfiguration, rtp_transceiver::rtp_codec::RTCRtpCodecCapability, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter}, }; pub async fn setup_webrtc() { let mut m = MediaEngine::default(); if let Err(e) = m.register_default_codecs() { // replace this with VP8 specific? panic!("Could not register default codecs for webrtc!"); } info!("Default codecs registered"); // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline. // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection` // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry // for each PeerConnection. let mut registry = Registry::new(); // may be able to prune for registry = register_default_interceptors(registry, &mut m).unwrap(); let api = APIBuilder::new() .with_media_engine(m) .with_interceptor_registry(registry) .build(); let config = RTCConfiguration { ice_servers: vec![RTCIceServer { urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and // password info here too ..Default::default() }], ..Default::default() }; // TODO: remove this unwrap // this is the offering for the remove device let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap()); let notify_tx = Arc::new(Notify::new()); let notify_video = notify_tx.clone(); let notify_audio = notify_tx.clone(); let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1); let video_done_tx = done_tx.clone(); let audio_done_tx = done_tx.clone(); let video_track = Arc::new(TrackLocalStaticRTP::new( RTCRtpCodecCapability { mime_type: MIME_TYPE_VP9.to_owned(), ..Default::default() }, "video_test".to_owned(), "webrtc_test".to_owned(), )); let rtp_sender = peer_connection .add_track(Arc::clone(&video_track) as Arc) .await .unwrap(); tokio::spawn(async move { // handle incoming packets that have been processed by the // interceptors let mut rtcp_buf = vec![0u8; 1500]; while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {} webrtc::util::Result::<()>::Ok(()) }); let (to_stream, mut stream) = tokio::sync::mpsc::channel::(10); tokio::spawn(async move { notify_video.notified().await; info!("Starting video stream!"); // It is important to use a time.Ticker instead of time.Sleep because // * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data // * works around latency issues with Sleep // Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as. // This isn't required since the video is timestamped, but we will such much higher loss if we send all at once. while let Some(map) = stream.recv().await { if let Ok(buf) = map.map_readable() { if let Err(err) = video_track.write(&buf).await { if webrtc::Error::ErrClosedPipe == err { println!("The peerConnection has been closed."); } else { println!("video_track write err: {}", err); } } }; } }); // TODO: Initialize the gstreamer app, and start sending frames to the to_stream mpsc }