working windows webrtc

This commit is contained in:
Nickiel12 2024-08-24 22:06:37 +00:00
parent 1dd6d1a06a
commit f2f6e5a817
3 changed files with 167 additions and 114 deletions

58
Cargo.lock generated
View file

@ -124,7 +124,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
"synstructure 0.13.1",
]
@ -147,7 +147,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -170,7 +170,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -295,9 +295,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.13"
version = "1.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48"
checksum = "50d2eb3cd3d1bf4529e31c215ee6f93ec5a3d536d9f578f93d9d33ee19562932"
dependencies = [
"shlex",
]
@ -495,7 +495,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -571,7 +571,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -738,7 +738,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -853,7 +853,7 @@ dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -1472,7 +1472,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -1576,9 +1576,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.36"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7"
checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af"
dependencies = [
"proc-macro2",
]
@ -1838,29 +1838,29 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.208"
version = "1.0.209"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2"
checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.208"
version = "1.0.209"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf"
checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
name = "serde_json"
version = "1.0.125"
version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed"
checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
dependencies = [
"itoa",
"memchr",
@ -1975,7 +1975,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2051,9 +2051,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.75"
version = "2.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9"
checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525"
dependencies = [
"proc-macro2",
"quote",
@ -2080,7 +2080,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2119,7 +2119,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2213,7 +2213,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2305,7 +2305,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2500,7 +2500,7 @@ dependencies = [
[[package]]
name = "vcs-common"
version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#0765f70fa773261f38dddb1819aaad47f88e12d7"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#59b0b88c53032514bf5ba68efe17b6f089734bfe"
dependencies = [
"async-channel",
"bincode",
@ -2928,7 +2928,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]
[[package]]
@ -2948,5 +2948,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.75",
"syn 2.0.76",
]

View file

@ -16,7 +16,7 @@ pub fn new_pipeline() -> Pipeline {
.name("camera_to_rtp_pipeine")
.build();
let source = ElementFactory::make("v4l2src").build().unwrap();
let source = ElementFactory::make("mfvideosrc").build().unwrap();
let video_convert = ElementFactory::make("videoconvert").build().unwrap();
@ -36,7 +36,7 @@ pub fn new_pipeline() -> Pipeline {
])
.expect("Could not add all the stuff to the pipeline");
gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp]).unwrap();
gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp, app_sink.upcast_ref()]).unwrap();
Pipeline {
pipeline,

View file

@ -1,4 +1,4 @@
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
use std::{sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
use anyhow::Error;
use gstreamer::{prelude::ElementExt, State};
@ -7,13 +7,9 @@ use gstreamer_app as gst_app;
use tokio::sync::Notify;
use tracing::{error, info};
use webrtc::{
api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9},
ice_transport::ice_connection_state::RTCIceConnectionState,
peer_connection::peer_connection_state::RTCPeerConnectionState,
rtp_transceiver::rtp_codec::RTCRtpCodecCapability,
track::track_local::{
api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, ice_transport::{ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState}, media::audio::buffer::info, peer_connection::peer_connection_state::RTCPeerConnectionState, rtp_transceiver::rtp_codec::RTCRtpCodecCapability, track::track_local::{
track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter,
},
}
};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
@ -26,7 +22,7 @@ mod web_rtc;
async fn main() -> Result<(), Error> {
// TRACING SETUP
let _sub = tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::TRACE)
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
.init();
let config = config::load_config();
@ -70,79 +66,8 @@ async fn main() -> Result<(), Error> {
}
info!("Freshly delivered sender is closed?: {:?}", app_sender.is_closed());
let peer_connection = web_rtc::setup_webrtc().await;
let offer = peer_connection.create_offer(None).await.unwrap();
info!("Freshly delivered sender is closed?: {:?}", app_sender.is_closed());
if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await {
error!("Could not send offer to app_sender!");
panic!("{}", e);
}
info!("Offer sent!");
loop {
match app_receiver.recv().await {
Ok(ApplicationMessage::WebRTCPacket(pkt)) => {
info!("Recieved response package!");
peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}");
break;
}
Err(e) => {
panic!("Channel was closed?!");
}
}
}
info!("Get a response assumedly");
let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection");
let mut gather_complete = peer_connection.gathering_complete_promise().await;
peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection");
let _ = gather_complete.recv().await;
if let Some(local_desc) = peer_connection.local_description().await {
app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel");
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
println!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Failed {
to_quit_3.store(true, Ordering::SeqCst);
}
Box::pin(async {})
},
));
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting: Done forwarding");
to_quit_4.store(true, Ordering::SeqCst)
}
Box::pin(async {})
}));
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
let peer_connection = Arc::new(web_rtc::setup_webrtc().await);
let video_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
@ -158,6 +83,67 @@ async fn main() -> Result<(), Error> {
.await
.unwrap();
info!("VP8 track has been added to peer connection");
if let Some(local_desc) = peer_connection.local_description().await {
app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel");
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Failed {
error!("On Ice Candidate called for quit!");
to_quit_3.store(true, Ordering::SeqCst);
}
Box::pin(async {})
},
));
let pc = Arc::downgrade(&peer_connection);
let to_wc_2 = app_sender.clone();
peer_connection.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
//println!("on_ice_candidate {:?}", c);
let to_wc_3 = to_wc_2.clone();
Box::pin(async move {
if c.is_some() {
info!("trickle ICE notification");
if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c.unwrap())) {
error!("Could not send ice candidate to other end! {:?}", e);
}
}
})
}));
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Connected {
notify_tx.notify_one();
}
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting: Done forwarding");
to_quit_4.store(true, Ordering::SeqCst)
}
Box::pin(async {})
}));
tokio::spawn(async move {
// handle incoming packets that have been processed by the
// interceptors
@ -185,9 +171,9 @@ async fn main() -> Result<(), Error> {
if let Ok(buf) = map.map_readable() {
if let Err(err) = video_track.write(&buf).await {
if webrtc::Error::ErrClosedPipe == err {
println!("The peerConnection has been closed.");
error!("The peerConnection has been closed.");
} else {
println!("video_track write err: {}", err);
error!("video_track write err: {}", err);
}
}
};
@ -219,18 +205,85 @@ async fn main() -> Result<(), Error> {
.build(),
);
info!("Starting WebRTC handshake");
let offer = peer_connection.create_offer(None).await.unwrap();
peer_connection.set_local_description(offer.clone()).await.expect("Could not set local description from offer");
if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await {
error!("Could not send offer to app_sender!");
panic!("{}", e);
}
info!("Offer sent!");
loop {
match app_receiver.recv().await {
Ok(ApplicationMessage::WebRTCPacket(pkt)) => {
info!("Recieved response package! {:?}", pkt);
peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}");
error!("Remote Description has been registered!");
}
Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => {
peer_connection.add_ice_candidate(pkt).await.expect("Could not assign new ice candidate");
info!("added ice candidate");
break;
}
Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => {
error!("Got a non init ice candidate. Now what?");
}
Err(e) => {
panic!("Channel was closed?!");
}
}
}
info!("Get a response assumedly");
tokio::time::sleep(Duration::from_millis(5000)).await;
// let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection");
// peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection");
if let Err(e) = pipeline.pipeline.set_state(State::Playing) {
panic!("Could not start pipeline! {e}");
}
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => to_quit.store(true, std::sync::atomic::Ordering::SeqCst),
Ok(()) => {
to_quit.store(true, std::sync::atomic::Ordering::SeqCst);
}
Err(e) => {
error!("Could not watch ctrl_c signal! {e}");
}
}
});
while let Ok(msg) = app_receiver.recv().await {
match msg {
ApplicationMessage::WebRTCPacket(pkt) => {
error!("don't know what to do with this packet!");
}
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
error!("Unhandled ice init candidate!");
}
ApplicationMessage::WebRTCIceCandidate(pkg) => {
error!("Unhandled ice candidate!");
}
}
}
Ok(())
}