stuff and pain

This commit is contained in:
Nickiel12 2024-08-19 20:40:52 -07:00
parent 7b41a78fc5
commit 39f26c19c6
6 changed files with 373 additions and 216 deletions

172
Cargo.lock generated
View file

@ -124,7 +124,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
"synstructure 0.13.1",
]
@ -147,7 +147,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -170,7 +170,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -295,9 +295,12 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.7"
version = "1.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc"
checksum = "72db2f7947ecee9b03b510377e8bb9077afa27176fdbff55c51027e976fdcc48"
dependencies = [
"shlex",
]
[[package]]
name = "ccm"
@ -313,9 +316,9 @@ dependencies = [
[[package]]
name = "cfg-expr"
version = "0.15.8"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02"
checksum = "345c78335be0624ed29012dc10c49102196c6882c12dde65d9f35b02da2aada8"
dependencies = [
"smallvec",
"target-lexicon",
@ -327,12 +330,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cfg_aliases"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e"
[[package]]
name = "cipher"
version = "0.4.4"
@ -409,9 +406,9 @@ dependencies = [
[[package]]
name = "cpufeatures"
version = "0.2.12"
version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504"
checksum = "51e852e6dc9a5bed1fae92dd2375037bf2b768725bf3be87811edee3249d09ad"
dependencies = [
"libc",
]
@ -475,16 +472,6 @@ dependencies = [
"cipher",
]
[[package]]
name = "ctrlc"
version = "3.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "672465ae37dc1bc6380a6547a8883d5dd397b0f1faaad4f265726cc7042a5345"
dependencies = [
"nix 0.28.0",
"windows-sys",
]
[[package]]
name = "curve25519-dalek"
version = "4.1.3"
@ -508,7 +495,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -584,7 +571,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -751,7 +738,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -824,9 +811,9 @@ checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd"
[[package]]
name = "gio-sys"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4feb96b31c32730ea3e1e89aecd2e4e37ecb1c473ad8f685e3430a159419f63"
checksum = "5237611e97e9b86ab5768adc3eef853ae713ea797aa3835404acdfacffc9fb38"
dependencies = [
"glib-sys",
"gobject-sys",
@ -837,9 +824,9 @@ dependencies = [
[[package]]
name = "glib"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fee90a615ce05be7a32932cfb8adf2c4bbb4700e80d37713c981fb24c0c56238"
checksum = "b19429cb83fcbf1f00b31ae3a123fab5cd3761bdd15b0cc07905804742f0d0e4"
dependencies = [
"bitflags 2.6.0",
"futures-channel",
@ -854,27 +841,26 @@ dependencies = [
"libc",
"memchr",
"smallvec",
"thiserror",
]
[[package]]
name = "glib-macros"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4da558d8177c0c8c54368818b508a4244e1286fce2858cef4e547023f0cfa5ef"
checksum = "960349f56469b75794157e93bf04f0bc8a622d0a6612d6a8f8d7eac41e0e1ee1"
dependencies = [
"heck",
"proc-macro-crate",
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
name = "glib-sys"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4958c26e5a01c9af00dea669a97369eccbec29a8e6d125c24ea2d85ee7467b60"
checksum = "44edae63bea922f18f7e63977ee60a257ec27c4613aff1a6a9bb572ad0d88269"
dependencies = [
"libc",
"system-deps",
@ -882,9 +868,9 @@ dependencies = [
[[package]]
name = "gobject-sys"
version = "0.20.0"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6908864f5ffff15b56df7e90346863904f49b949337ed0456b9287af61903b8"
checksum = "fa3d1dcd8a1eb2e7c22be3d5e792b14b186f3524f79b25631730f9a8c169d49a"
dependencies = [
"glib-sys",
"libc",
@ -1071,9 +1057,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.3.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3fc2e30ba82dd1b3911c8de1ffc143c74a914a14e99514d7637e3099df5ea0"
checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c"
dependencies = [
"equivalent",
"hashbrown 0.14.5",
@ -1149,9 +1135,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe"
[[package]]
name = "libc"
version = "0.2.155"
version = "0.2.158"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439"
[[package]]
name = "linked-hash-map"
@ -1217,9 +1203,9 @@ dependencies = [
[[package]]
name = "mio"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4"
checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec"
dependencies = [
"hermit-abi",
"libc",
@ -1246,18 +1232,6 @@ dependencies = [
"pin-utils",
]
[[package]]
name = "nix"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4"
dependencies = [
"bitflags 2.6.0",
"cfg-if",
"cfg_aliases",
"libc",
]
[[package]]
name = "nom"
version = "7.1.3"
@ -1324,9 +1298,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.2"
version = "0.36.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e"
checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9"
dependencies = [
"memchr",
]
@ -1498,7 +1472,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -1864,29 +1838,29 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b"
[[package]]
name = "serde"
version = "1.0.204"
version = "1.0.208"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12"
checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.204"
version = "1.0.208"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
name = "serde_json"
version = "1.0.122"
version = "1.0.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da"
checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed"
dependencies = [
"itoa",
"memchr",
@ -1934,6 +1908,21 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
name = "signal-hook-registry"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
dependencies = [
"libc",
]
[[package]]
name = "signature"
version = "2.2.0"
@ -1986,7 +1975,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -2062,9 +2051,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.72"
version = "2.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af"
checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9"
dependencies = [
"proc-macro2",
"quote",
@ -2091,14 +2080,14 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
name = "system-deps"
version = "7.0.1"
version = "7.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c81f13d9a334a6c242465140bd262fae382b752ff2011c4f7419919a9c97922"
checksum = "070a0a5e7da2d24be457809c4b3baa57a835fd2829ad8b86f9a049052fe71031"
dependencies = [
"cfg-expr",
"heck",
@ -2130,7 +2119,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -2200,9 +2189,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.39.2"
version = "1.39.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1"
checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5"
dependencies = [
"backtrace",
"bytes",
@ -2210,6 +2199,7 @@ dependencies = [
"mio",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys",
@ -2223,7 +2213,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -2315,7 +2305,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -2433,9 +2423,9 @@ checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]]
name = "unicode-xid"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
checksum = "229730647fbc343e3a80e463c1db7f78f3855d3f3739bee0dda773c9a037c90a"
[[package]]
name = "universal-hash"
@ -2492,7 +2482,6 @@ dependencies = [
"anyhow",
"async-channel",
"config",
"ctrlc",
"gstreamer",
"gstreamer-app",
"log",
@ -2511,7 +2500,18 @@ dependencies = [
[[package]]
name = "vcs-common"
version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#898d5181e08ed7cbfef745bf9036eae71a08be52"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#1e37189866720e8d99b22492a78f47d220742831"
dependencies = [
"async-channel",
"bincode",
"futures-util",
"serde",
"serde_json",
"tokio",
"tokio-tungstenite",
"tracing",
"webrtc",
]
[[package]]
name = "version-compare"
@ -2741,7 +2741,7 @@ dependencies = [
"lazy_static",
"libc",
"log",
"nix 0.26.4",
"nix",
"portable-atomic",
"rand",
"thiserror",
@ -2928,7 +2928,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]
[[package]]
@ -2948,5 +2948,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.72",
"syn 2.0.75",
]

View file

@ -8,7 +8,6 @@ edition = "2021"
[dependencies]
anyhow = "1.0.86"
config = "0.14.0"
ctrlc = "3.4.4"
gstreamer = { version = "0.23.0", features = ["v1_22"] }
gstreamer-app = { version = "0.23.0", features = ["v1_22"] }
@ -16,7 +15,7 @@ log = "0.4.22"
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.122"
snafu = "0.8.4"
tokio = "1.39.2"
tokio = { version = "1.39.2", features = ["signal"] }
tokio-tungstenite = "0.23.1"
toml = "0.8.19"
tracing = "0.1.40"

45
src/gst.rs Normal file
View file

@ -0,0 +1,45 @@
use gstreamer::{
self as gst,
prelude::{Cast, GstBinExtManual},
ElementFactory,
};
use gstreamer_app as gst_app;
pub struct Pipeline {
pub pipeline: gst::Pipeline,
pub sink: gst_app::AppSink,
}
pub fn new_pipeline() -> Pipeline {
let pipeline = gst::Pipeline::builder()
.name("camera_to_rtp_pipeine")
.build();
let source = ElementFactory::make("v4l2src").build().unwrap();
let video_convert = ElementFactory::make("videoconvert").build().unwrap();
let vp8enc = ElementFactory::make("vp8enc").build().unwrap();
let rtp = ElementFactory::make("rtpvp8pay").build().unwrap();
let app_sink = gst_app::AppSink::builder().build();
pipeline
.add_many([
&source,
&video_convert,
&vp8enc,
&rtp,
app_sink.upcast_ref(),
])
.expect("Could not add all the stuff to the pipeline");
gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp]).unwrap();
Pipeline {
pipeline,
sink: app_sink,
}
}

View file

@ -1,12 +1,29 @@
use std::sync::{atomic::AtomicBool, Arc};
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
use anyhow::Error;
use gstreamer::{prelude::*, Element, ElementFactory, Pipeline, State};
use gstreamer::{prelude::ElementExt, State};
use gstreamer::Buffer;
use gstreamer_app as gst_app;
use tokio::sync::Notify;
use tracing::{error, info};
use webrtc::{
api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9},
ice_transport::ice_connection_state::RTCIceConnectionState,
peer_connection::{peer_connection_state::RTCPeerConnectionState, sdp::session_description::RTCSessionDescription},
rtp_transceiver::rtp_codec::RTCRtpCodecCapability,
track::track_local::{
track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter,
},
};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
mod config;
mod webrtc;
mod gst;
mod web_rtc;
fn main() -> Result<(), Error> {
#[tokio::main]
async fn main() -> Result<(), Error> {
// TRACING SETUP
let sub = tracing_subscriber::FmtSubscriber::new();
if let Err(e) = tracing::subscriber::set_global_default(sub) {
@ -18,24 +35,185 @@ fn main() -> Result<(), Error> {
// EXIT HANDLER
let to_quit = Arc::new(AtomicBool::new(false));
let to_quit_2 = to_quit.clone();
ctrlc::set_handler(move || {
to_quit_2.store(true, std::sync::atomic::Ordering::SeqCst);
})?;
let to_quit_3 = to_quit.clone();
let to_quit_4 = to_quit.clone();
let to_quit_5 = to_quit.clone();
// GSTREAMER SETUP
gstreamer::init()?;
loop {
if to_quit.load(std::sync::atomic::Ordering::SeqCst) {
println!("Recieved Ctrl+C, stopping");
let pipeline = gst::new_pipeline();
break;
let (to_stream, mut stream) = tokio::sync::mpsc::channel::<Buffer>(10);
let (send_oneshot, recv_oneshot) = tokio::sync::oneshot::channel::<
Result<(AppSender, AppReceiver), tokio_tungstenite::tungstenite::Error>,
>();
// connect to remote server
tokio::spawn(vcs_common::connect_to_server(format!("ws://{}:{}", config.destination_ip, config.destination_port), send_oneshot));
let app_sender: AppSender;
let app_receiver: AppReceiver;
match recv_oneshot.await {
Err(e) => {
panic!("Could not get connection status from oneshot!! {e}");
}
Ok(Err(e)) => {
panic!("Could not connect to vcs websocket! {e}");
}
Ok(Ok((sender, recvr))) => {
app_sender = sender;
app_receiver = recvr;
}
std::thread::sleep(std::time::Duration::from_millis(200));
}
println!("Success!");
let peer_connection = web_rtc::setup_webrtc().await;
while let Ok(msg) = app_receiver.recv().await {
match msg {
ApplicationMessage::WebRTCPacket(pkt) => {
peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}");
break;
}
}
}
let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection");
let mut gather_complete = peer_connection.gathering_complete_promise().await;
peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection");
let _ = gather_complete.recv().await;
if let Some(local_desc) = peer_connection.local_description().await {
app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel");
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
println!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Failed {
to_quit_3.store(true, Ordering::SeqCst);
}
Box::pin(async {})
},
));
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
println!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
println!("Peer Connection has gone to failed exiting: Done forwarding");
to_quit_4.store(true, Ordering::SeqCst)
}
Box::pin(async {})
}));
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
let video_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
"video_test".to_owned(),
"webrtc_test".to_owned(),
));
let rtp_sender = peer_connection
.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
.await
.unwrap();
tokio::spawn(async move {
// handle incoming packets that have been processed by the
// interceptors
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
webrtc::util::Result::<()>::Ok(())
});
tokio::spawn(async move {
notify_video.notified().await;
info!("Starting video stream!");
// It is important to use a time.Ticker instead of time.Sleep because
// * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data
// * works around latency issues with Sleep
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
while let Some(map) = stream.recv().await {
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
stream.close();
break;
}
if let Ok(buf) = map.map_readable() {
if let Err(err) = video_track.write(&buf).await {
if webrtc::Error::ErrClosedPipe == err {
println!("The peerConnection has been closed.");
} else {
println!("video_track write err: {}", err);
}
}
};
}
});
pipeline.sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |app_sink| {
let sample = app_sink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer = sample.buffer_owned().ok_or_else(|| {
gstreamer::element_error!(
app_sink,
gstreamer::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gstreamer::FlowError::Error
})?;
if let Err(e) = to_stream.blocking_send(buffer) {
error!("Error sending to stream of buffers: {e}");
to_quit_5.store(true, Ordering::SeqCst);
}
Ok(gstreamer::FlowSuccess::Ok)
})
.build(),
);
if let Err(e) = pipeline.pipeline.set_state(State::Playing) {
panic!("Could not start pipeline! {e}");
}
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => to_quit.store(true, std::sync::atomic::Ordering::SeqCst),
Err(e) => {
error!("Could not watch ctrl_c signal! {e}");
}
}
});
Ok(())
}

49
src/web_rtc.rs Normal file
View file

@ -0,0 +1,49 @@
use std::sync::Arc;
use tracing::info;
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
},
ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
peer_connection::{configuration::RTCConfiguration, RTCPeerConnection},
};
pub async fn setup_webrtc() -> Arc<RTCPeerConnection> {
// WebRTC stuff
let mut m = MediaEngine::default();
if let Err(e) = m.register_default_codecs() {
// replace this with VP8 specific?
panic!("Could not register default codecs for webrtc! {e}");
}
info!("Default codecs registered");
// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new(); // may be able to prune for
registry = register_default_interceptors(registry, &mut m).unwrap();
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and
// password info here too
..Default::default()
}],
..Default::default()
};
// TODO: remove this unwrap
// this is the offering for the remove device
return Arc::new(api.new_peer_connection(config).await.unwrap());
}

View file

@ -1,114 +0,0 @@
use std::{sync::Arc, time::Duration};
use std::io::Write;
use gstreamer::Buffer;
use tokio::sync::Notify;
use tracing::{error, info};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
media_engine::{MediaEngine, MIME_TYPE_VP9},
APIBuilder,
},
ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
peer_connection::configuration::RTCConfiguration,
rtp_transceiver::rtp_codec::RTCRtpCodecCapability,
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter},
};
pub async fn setup_webrtc() {
let mut m = MediaEngine::default();
if let Err(e) = m.register_default_codecs() {
// replace this with VP8 specific?
panic!("Could not register default codecs for webrtc!");
}
info!("Default codecs registered");
// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new(); // may be able to prune for
registry = register_default_interceptors(registry, &mut m).unwrap();
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and
// password info here too
..Default::default()
}],
..Default::default()
};
// TODO: remove this unwrap
// this is the offering for the remove device
let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap());
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
let notify_audio = notify_tx.clone();
let (done_tx, mut done_rx) = tokio::sync::mpsc::channel::<()>(1);
let video_done_tx = done_tx.clone();
let audio_done_tx = done_tx.clone();
let video_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP9.to_owned(),
..Default::default()
},
"video_test".to_owned(),
"webrtc_test".to_owned(),
));
let rtp_sender = peer_connection
.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
.await
.unwrap();
tokio::spawn(async move {
// handle incoming packets that have been processed by the
// interceptors
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
webrtc::util::Result::<()>::Ok(())
});
let (to_stream, mut stream) = tokio::sync::mpsc::channel::<Buffer>(10);
tokio::spawn(async move {
notify_video.notified().await;
info!("Starting video stream!");
// It is important to use a time.Ticker instead of time.Sleep because
// * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data
// * works around latency issues with Sleep
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
while let Some(map) = stream.recv().await {
if let Ok(buf) = map.map_readable() {
if let Err(err) = video_track.write(&buf).await {
if webrtc::Error::ErrClosedPipe == err {
println!("The peerConnection has been closed.");
} else {
println!("video_track write err: {}", err);
}
}
};
}
});
// TODO: Initialize the gstreamer app, and start sending frames to the to_stream mpsc
}