diff --git a/Cargo.lock b/Cargo.lock index 3e6471d..1163457 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,7 +124,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", "synstructure 0.13.1", ] @@ -147,7 +147,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -170,7 +170,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.13" +version = "1.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48" +checksum = "50d2eb3cd3d1bf4529e31c215ee6f93ec5a3d536d9f578f93d9d33ee19562932" dependencies = [ "shlex", ] @@ -495,7 +495,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -571,7 +571,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -738,7 +738,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -853,7 +853,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -1472,7 +1472,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -1576,9 +1576,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.36" +version = "1.0.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" dependencies = [ "proc-macro2", ] @@ -1838,29 +1838,29 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.208" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" +checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.208" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" +checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] name = "serde_json" -version = "1.0.125" +version = "1.0.127" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" +checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad" dependencies = [ "itoa", "memchr", @@ -1975,7 +1975,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2051,9 +2051,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.75" +version = "2.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" +checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525" dependencies = [ "proc-macro2", "quote", @@ -2080,7 +2080,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2119,7 +2119,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2213,7 +2213,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2305,7 +2305,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2500,7 +2500,7 @@ dependencies = [ [[package]] name = "vcs-common" version = "0.1.0" -source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#0765f70fa773261f38dddb1819aaad47f88e12d7" +source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#59b0b88c53032514bf5ba68efe17b6f089734bfe" dependencies = [ "async-channel", "bincode", @@ -2928,7 +2928,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] [[package]] @@ -2948,5 +2948,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.75", + "syn 2.0.76", ] diff --git a/src/gst.rs b/src/gst.rs index 0efd2e6..2d094e6 100644 --- a/src/gst.rs +++ b/src/gst.rs @@ -16,7 +16,7 @@ pub fn new_pipeline() -> Pipeline { .name("camera_to_rtp_pipeine") .build(); - let source = ElementFactory::make("v4l2src").build().unwrap(); + let source = ElementFactory::make("mfvideosrc").build().unwrap(); let video_convert = ElementFactory::make("videoconvert").build().unwrap(); @@ -36,7 +36,7 @@ pub fn new_pipeline() -> Pipeline { ]) .expect("Could not add all the stuff to the pipeline"); - gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp]).unwrap(); + gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp, app_sink.upcast_ref()]).unwrap(); Pipeline { pipeline, diff --git a/src/main.rs b/src/main.rs index d102fc2..9046686 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; +use std::{sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration}; use anyhow::Error; use gstreamer::{prelude::ElementExt, State}; @@ -7,13 +7,9 @@ use gstreamer_app as gst_app; use tokio::sync::Notify; use tracing::{error, info}; use webrtc::{ - api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, - ice_transport::ice_connection_state::RTCIceConnectionState, - peer_connection::peer_connection_state::RTCPeerConnectionState, - rtp_transceiver::rtp_codec::RTCRtpCodecCapability, - track::track_local::{ + api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, ice_transport::{ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState}, media::audio::buffer::info, peer_connection::peer_connection_state::RTCPeerConnectionState, rtp_transceiver::rtp_codec::RTCRtpCodecCapability, track::track_local::{ track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter, - }, + } }; use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; @@ -26,7 +22,7 @@ mod web_rtc; async fn main() -> Result<(), Error> { // TRACING SETUP let _sub = tracing_subscriber::fmt() - .with_max_level(tracing_subscriber::filter::LevelFilter::TRACE) + .with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG) .init(); let config = config::load_config(); @@ -70,79 +66,8 @@ async fn main() -> Result<(), Error> { } - info!("Freshly delivered sender is closed?: {:?}", app_sender.is_closed()); - - let peer_connection = web_rtc::setup_webrtc().await; - - let offer = peer_connection.create_offer(None).await.unwrap(); - - info!("Freshly delivered sender is closed?: {:?}", app_sender.is_closed()); - if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await { - error!("Could not send offer to app_sender!"); - panic!("{}", e); - } - - info!("Offer sent!"); - - loop { - match app_receiver.recv().await { - Ok(ApplicationMessage::WebRTCPacket(pkt)) => { - info!("Recieved response package!"); - peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}"); - break; - } - Err(e) => { - panic!("Channel was closed?!"); - } - } - } - - info!("Get a response assumedly"); - - let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection"); - - let mut gather_complete = peer_connection.gathering_complete_promise().await; - - peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection"); - - let _ = gather_complete.recv().await; - - if let Some(local_desc) = peer_connection.local_description().await { - app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel"); - } - - - // Set the handler for ICE connection state - // This will notify you when the peer has connected/disconnected - peer_connection.on_ice_connection_state_change(Box::new( - move |connection_state: RTCIceConnectionState| { - println!("Connection State has changed {connection_state}"); - if connection_state == RTCIceConnectionState::Failed { - to_quit_3.store(true, Ordering::SeqCst); - } - Box::pin(async {}) - }, - )); - - // Set the handler for Peer connection state - // This will notify you when the peer has connected/disconnected - peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { - println!("Peer Connection State has changed: {s}"); - - if s == RTCPeerConnectionState::Failed { - // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. - // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. - // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. - println!("Peer Connection has gone to failed exiting: Done forwarding"); - to_quit_4.store(true, Ordering::SeqCst) - } - - Box::pin(async {}) - })); - - let notify_tx = Arc::new(Notify::new()); - let notify_video = notify_tx.clone(); + let peer_connection = Arc::new(web_rtc::setup_webrtc().await); let video_track = Arc::new(TrackLocalStaticRTP::new( RTCRtpCodecCapability { @@ -158,6 +83,67 @@ async fn main() -> Result<(), Error> { .await .unwrap(); + info!("VP8 track has been added to peer connection"); + + + if let Some(local_desc) = peer_connection.local_description().await { + app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel"); + } + + + // Set the handler for ICE connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_ice_connection_state_change(Box::new( + move |connection_state: RTCIceConnectionState| { + info!("Connection State has changed {connection_state}"); + if connection_state == RTCIceConnectionState::Failed { + error!("On Ice Candidate called for quit!"); + to_quit_3.store(true, Ordering::SeqCst); + } + Box::pin(async {}) + }, + )); + + let pc = Arc::downgrade(&peer_connection); + let to_wc_2 = app_sender.clone(); + peer_connection.on_ice_candidate(Box::new(move |c: Option| { + //println!("on_ice_candidate {:?}", c); + let to_wc_3 = to_wc_2.clone(); + + Box::pin(async move { + if c.is_some() { + info!("trickle ICE notification"); + if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c.unwrap())) { + error!("Could not send ice candidate to other end! {:?}", e); + } + } + }) + })); + + + let notify_tx = Arc::new(Notify::new()); + let notify_video = notify_tx.clone(); + + // Set the handler for Peer connection state + // This will notify you when the peer has connected/disconnected + peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| { + info!("Peer Connection State has changed: {s}"); + + if s == RTCPeerConnectionState::Connected { + notify_tx.notify_one(); + } + + if s == RTCPeerConnectionState::Failed { + // Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart. + // Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout. + // Note that the PeerConnection may come back from PeerConnectionStateDisconnected. + error!("Peer Connection has gone to failed exiting: Done forwarding"); + to_quit_4.store(true, Ordering::SeqCst) + } + + Box::pin(async {}) + })); + tokio::spawn(async move { // handle incoming packets that have been processed by the // interceptors @@ -185,9 +171,9 @@ async fn main() -> Result<(), Error> { if let Ok(buf) = map.map_readable() { if let Err(err) = video_track.write(&buf).await { if webrtc::Error::ErrClosedPipe == err { - println!("The peerConnection has been closed."); + error!("The peerConnection has been closed."); } else { - println!("video_track write err: {}", err); + error!("video_track write err: {}", err); } } }; @@ -219,18 +205,85 @@ async fn main() -> Result<(), Error> { .build(), ); + + + + + info!("Starting WebRTC handshake"); + let offer = peer_connection.create_offer(None).await.unwrap(); + + peer_connection.set_local_description(offer.clone()).await.expect("Could not set local description from offer"); + + if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await { + error!("Could not send offer to app_sender!"); + panic!("{}", e); + } + + info!("Offer sent!"); + + loop { + match app_receiver.recv().await { + Ok(ApplicationMessage::WebRTCPacket(pkt)) => { + info!("Recieved response package! {:?}", pkt); + peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}"); + error!("Remote Description has been registered!"); + } + Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => { + peer_connection.add_ice_candidate(pkt).await.expect("Could not assign new ice candidate"); + info!("added ice candidate"); + break; + } + Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { + error!("Got a non init ice candidate. Now what?"); + } + Err(e) => { + panic!("Channel was closed?!"); + } + } + } + + info!("Get a response assumedly"); + + tokio::time::sleep(Duration::from_millis(5000)).await; + + // let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection"); + + // peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection"); + + + + + + + if let Err(e) = pipeline.pipeline.set_state(State::Playing) { panic!("Could not start pipeline! {e}"); } tokio::spawn(async move { match tokio::signal::ctrl_c().await { - Ok(()) => to_quit.store(true, std::sync::atomic::Ordering::SeqCst), + Ok(()) => { + to_quit.store(true, std::sync::atomic::Ordering::SeqCst); + } Err(e) => { error!("Could not watch ctrl_c signal! {e}"); } } }); + while let Ok(msg) = app_receiver.recv().await { + match msg { + ApplicationMessage::WebRTCPacket(pkt) => { + error!("don't know what to do with this packet!"); + } + ApplicationMessage::WebRTCIceCandidateInit(pkt) => { + error!("Unhandled ice init candidate!"); + } + ApplicationMessage::WebRTCIceCandidate(pkg) => { + error!("Unhandled ice candidate!"); + } + } + } + Ok(()) }