improved code modularity and comments

This commit is contained in:
Nickiel12 2024-08-26 00:17:40 +00:00
parent f2f6e5a817
commit 8993750feb
6 changed files with 273 additions and 194 deletions

10
Cargo.lock generated
View file

@ -61,12 +61,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"
[[package]]
name = "arc-swap"
version = "1.7.1"
@ -2479,7 +2473,6 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
name = "vcs-camera-satellite"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"config",
"gstreamer",
@ -2493,6 +2486,7 @@ dependencies = [
"toml",
"tracing",
"tracing-subscriber",
"uuid",
"vcs-common",
"webrtc",
]
@ -2500,7 +2494,7 @@ dependencies = [
[[package]]
name = "vcs-common"
version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#59b0b88c53032514bf5ba68efe17b6f089734bfe"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#a89e970be35e01eb8a59c368e7b817e93984c137"
dependencies = [
"async-channel",
"bincode",

View file

@ -6,7 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
config = "0.14.0"
gstreamer = { version = "0.23.0", features = ["v1_22"] }
@ -24,3 +23,4 @@ webrtc = "0.11.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
async-channel = "2.3.1"
uuid = "1.10.0"

View file

@ -9,6 +9,7 @@ use tracing::{info, instrument};
pub struct AppConfig {
pub destination_ip: String,
pub destination_port: u32,
pub aspect_ratio: f64,
}
impl Default for AppConfig {
@ -16,6 +17,8 @@ impl Default for AppConfig {
AppConfig {
destination_ip: "localhost".to_string(),
destination_port: 7891,
aspect_ratio: 16.0 / 9.0,
}
}
}

View file

@ -1,17 +1,21 @@
use gstreamer::{
self as gst,
prelude::{Cast, GstBinExtManual},
prelude::{Cast, ElementExtManual, GstBinExtManual},
ElementFactory,
};
use gstreamer_app as gst_app;
use crate::config::AppConfig;
pub struct Pipeline {
pub pipeline: gst::Pipeline,
pub sink: gst_app::AppSink,
}
pub fn new_pipeline() -> Pipeline {
const HEIGHT: usize = 480;
pub fn new_pipeline(config: &AppConfig) -> Pipeline {
let pipeline = gst::Pipeline::builder()
.name("camera_to_rtp_pipeine")
.build();
@ -20,6 +24,18 @@ pub fn new_pipeline() -> Pipeline {
let video_convert = ElementFactory::make("videoconvert").build().unwrap();
let video_scale = ElementFactory::make("videoscale")
.property("add-borders", true)
.build()
.unwrap();
let video_scale_caps = gstreamer::Caps::builder("video/x-raw")
.field("height", HEIGHT as i32)
.field("width", (HEIGHT as f64 * config.aspect_ratio) as i32)
.build();
// We are using VP8 because VP9 resulted in much worse video quality
// when testing -NY 8/25/2024
let vp8enc = ElementFactory::make("vp8enc").build().unwrap();
let rtp = ElementFactory::make("rtpvp8pay").build().unwrap();
@ -30,13 +46,20 @@ pub fn new_pipeline() -> Pipeline {
.add_many([
&source,
&video_convert,
&video_scale,
&vp8enc,
&rtp,
app_sink.upcast_ref(),
])
.expect("Could not add all the stuff to the pipeline");
gst::Element::link_many(&[&source, &video_convert, &vp8enc, &rtp, app_sink.upcast_ref()]).unwrap();
gst::Element::link_many(&[&source, &video_convert, &video_scale]).unwrap();
video_scale
.link_filtered(&vp8enc, &video_scale_caps)
.expect("Could not link videoscale to vp8enc with caps!");
gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]).unwrap();
Pipeline {
pipeline,

View file

@ -1,15 +1,17 @@
use std::{sync::{atomic::{AtomicBool, Ordering}, Arc}, time::Duration};
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use anyhow::Error;
use gstreamer::{prelude::ElementExt, State};
use gstreamer::Buffer;
use gstreamer::{prelude::ElementExt, State};
use gstreamer_app as gst_app;
use tokio::sync::Notify;
use tracing::{error, info};
use web_rtc::{kickoff_connection, setup_callbacks};
use webrtc::{
api::media_engine::{MIME_TYPE_VP8, MIME_TYPE_VP9}, ice_transport::{ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState}, media::audio::buffer::info, peer_connection::peer_connection_state::RTCPeerConnectionState, rtp_transceiver::rtp_codec::RTCRtpCodecCapability, track::track_local::{
track_local_static_rtp::TrackLocalStaticRTP, TrackLocal, TrackLocalWriter,
}
peer_connection::RTCPeerConnection,
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
@ -19,7 +21,9 @@ mod gst;
mod web_rtc;
#[tokio::main]
async fn main() -> Result<(), Error> {
async fn main() {
let uuid = uuid::Uuid::new_v4().to_string();
// TRACING SETUP
let _sub = tracing_subscriber::fmt()
.with_max_level(tracing_subscriber::filter::LevelFilter::DEBUG)
@ -31,138 +35,73 @@ async fn main() -> Result<(), Error> {
let to_quit = Arc::new(AtomicBool::new(false));
let to_quit_2 = to_quit.clone();
let to_quit_3 = to_quit.clone();
let to_quit_4 = to_quit.clone();
let to_quit_5 = to_quit.clone();
// GSTREAMER SETUP
gstreamer::init()?;
if let Err(e) = gstreamer::init() {
panic!("Could not start gstreamer! {e}");
}
let pipeline = gst::new_pipeline();
let pipeline = gst::new_pipeline(&config);
// frame buffers from the gstreamer pipeline to send over the webrtc connection
let (to_stream, mut stream) = tokio::sync::mpsc::channel::<Buffer>(10);
let (send_oneshot, recv_oneshot) = tokio::sync::oneshot::channel::<
Result<(AppSender, AppReceiver), tokio_tungstenite::tungstenite::Error>,
>();
let rt = tokio::runtime::Handle::current();
// connect to remote server
tokio::spawn(vcs_common::connect_to_server(format!("ws://{}:{}", config.destination_ip, config.destination_port), send_oneshot, rt));
let app_sender: AppSender;
let app_receiver: AppReceiver;
match recv_oneshot.await {
Err(e) => {
panic!("Could not get connection status from oneshot!! {e}");
}
Ok(Err(e)) => {
panic!("Could not connect to vcs websocket! {e}");
}
Ok(Ok((sender, recvr))) => {
// connect to remote server
match vcs_common::connect_to_server(
format!("ws://{}:{}", config.destination_ip, config.destination_port),
rt,
)
.await
{
Err(e) => panic!("There was an error connecting to the remote: {e}"),
Ok((sender, recvr)) => {
app_sender = sender;
app_receiver = recvr;
}
}
// Set up the webrtc peer connection
let peer_connection: Arc<RTCPeerConnection>;
let video_track: Arc<TrackLocalStaticRTP>;
match web_rtc::setup_webrtc(uuid.clone()).await {
Err(e) => {
panic!("Could not set up WebRTC connection! {e}");
}
Ok((pc, vt)) => {
peer_connection = pc;
video_track = vt;
}
}
let peer_connection = Arc::new(web_rtc::setup_webrtc().await);
let video_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
"video_test".to_owned(),
"webrtc_test".to_owned(),
));
let rtp_sender = peer_connection
.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
.await
.unwrap();
info!("VP8 track has been added to peer connection");
// Send the local description to the remote
if let Some(local_desc) = peer_connection.local_description().await {
app_sender.send(ApplicationMessage::WebRTCPacket(local_desc)).await.expect("Could not send message to the socket message channel");
app_sender
.send(ApplicationMessage::WebRTCPacket(local_desc))
.await
.expect("Could not send message to the socket message channel");
}
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_ice_connection_state_change(Box::new(
move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Failed {
error!("On Ice Candidate called for quit!");
to_quit_3.store(true, Ordering::SeqCst);
}
Box::pin(async {})
},
));
let pc = Arc::downgrade(&peer_connection);
let to_wc_2 = app_sender.clone();
peer_connection.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
//println!("on_ice_candidate {:?}", c);
let to_wc_3 = to_wc_2.clone();
Box::pin(async move {
if c.is_some() {
info!("trickle ICE notification");
if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c.unwrap())) {
error!("Could not send ice candidate to other end! {:?}", e);
}
}
})
}));
let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone();
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
peer_connection.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Connected {
notify_tx.notify_one();
}
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting: Done forwarding");
to_quit_4.store(true, Ordering::SeqCst)
}
Box::pin(async {})
}));
tokio::spawn(async move {
// handle incoming packets that have been processed by the
// interceptors
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
webrtc::util::Result::<()>::Ok(())
});
setup_callbacks(
peer_connection.clone(),
to_quit_3,
app_sender.clone(),
notify_tx,
)
.await;
tokio::spawn(async move {
notify_video.notified().await;
info!("Starting video stream!");
// It is important to use a time.Ticker instead of time.Sleep because
// * avoids accumulating skew, just calling time.Sleep didn't compensate for the time spent parsing the data
// * works around latency issues with Sleep
// Send our video file frame at a time. Pace our sending so we send it at the same speed it should be played back as.
// This isn't required since the video is timestamped, but we will such much higher loss if we send all at once.
while let Some(map) = stream.recv().await {
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
stream.close();
@ -205,61 +144,18 @@ async fn main() -> Result<(), Error> {
.build(),
);
info!("Starting WebRTC handshake");
let offer = peer_connection.create_offer(None).await.unwrap();
peer_connection.set_local_description(offer.clone()).await.expect("Could not set local description from offer");
if let Err(e) = app_sender.send(ApplicationMessage::WebRTCPacket(offer)).await {
error!("Could not send offer to app_sender!");
panic!("{}", e);
// send the offer and trickle ice candidates to the remote, and accept their description
if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await {
to_quit.store(true, Ordering::SeqCst);
panic!("There was an issue with WebRTC setup! {e}");
}
info!("Offer sent!");
loop {
match app_receiver.recv().await {
Ok(ApplicationMessage::WebRTCPacket(pkt)) => {
info!("Recieved response package! {:?}", pkt);
peer_connection.set_remote_description(pkt).await.expect("The remote description caused an error! {e}");
error!("Remote Description has been registered!");
}
Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => {
peer_connection.add_ice_candidate(pkt).await.expect("Could not assign new ice candidate");
info!("added ice candidate");
break;
}
Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => {
error!("Got a non init ice candidate. Now what?");
}
Err(e) => {
panic!("Channel was closed?!");
}
}
}
info!("Get a response assumedly");
tokio::time::sleep(Duration::from_millis(5000)).await;
// let answer = peer_connection.create_answer(None).await.expect("Couldn't create an answer from the peer connection");
// peer_connection.set_local_description(answer).await.expect("Could not set local description of peer connection");
// Start the gstreamer pipeline
if let Err(e) = pipeline.pipeline.set_state(State::Playing) {
panic!("Could not start pipeline! {e}");
}
// set up handler for ctrl_c to quit the application
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
@ -271,19 +167,20 @@ async fn main() -> Result<(), Error> {
}
});
// loop through messages coming from the remote.
while let Ok(msg) = app_receiver.recv().await {
match msg {
ApplicationMessage::WebRTCPacket(pkt) => {
error!("don't know what to do with this packet!");
}
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
error!("Unhandled ice init candidate!");
if let Err(e) = peer_connection.add_ice_candidate(pkt).await {
error!("There was an error adding the trickle ICE candidate! {e}");
}
}
ApplicationMessage::WebRTCIceCandidate(pkg) => {
error!("Unhandled ice candidate!");
}
}
}
Ok(())
}

View file

@ -1,17 +1,38 @@
use std::sync::Arc;
use tracing::info;
use webrtc::{
api::{
interceptor_registry::register_default_interceptors, media_engine::MediaEngine, APIBuilder,
},
ice_transport::ice_server::RTCIceServer,
interceptor::registry::Registry,
peer_connection::{configuration::RTCConfiguration, RTCPeerConnection},
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
pub async fn setup_webrtc() -> Arc<RTCPeerConnection> {
// WebRTC stuff
use tokio::sync::Notify;
use tracing::{error, info, instrument};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
use webrtc::{
api::{
interceptor_registry::register_default_interceptors,
media_engine::{MediaEngine, MIME_TYPE_VP8},
APIBuilder,
},
ice_transport::{
ice_candidate::RTCIceCandidate, ice_connection_state::RTCIceConnectionState,
ice_server::RTCIceServer,
},
interceptor::registry::Registry,
peer_connection::{
configuration::RTCConfiguration, peer_connection_state::RTCPeerConnectionState,
RTCPeerConnection,
},
rtp_transceiver::rtp_codec::RTCRtpCodecCapability,
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal},
Error,
};
/// Boilerplate for creating the initial Peer Connection
/// Takes a unique ID representing this instance to name the tracks and streams
#[instrument]
pub async fn setup_webrtc(
machine_uuid: String,
) -> Result<(Arc<RTCPeerConnection>, Arc<TrackLocalStaticRTP>), Error> {
info!("Setting up webrtc engine and peer connection");
let mut m = MediaEngine::default();
if let Err(e) = m.register_default_codecs() {
@ -19,21 +40,20 @@ pub async fn setup_webrtc() -> Arc<RTCPeerConnection> {
panic!("Could not register default codecs for webrtc! {e}");
}
info!("Default codecs registered");
// Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
// This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
// this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
// for each PeerConnection.
let mut registry = Registry::new(); // may be able to prune for
registry = register_default_interceptors(registry, &mut m).unwrap();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
// TODO figure out a STUN server or lack of one
let config = RTCConfiguration {
ice_servers: vec![RTCIceServer {
urls: vec!["stun:stun.l.google.com:19302".to_owned()], // you can set username and
@ -43,7 +63,149 @@ pub async fn setup_webrtc() -> Arc<RTCPeerConnection> {
..Default::default()
};
// TODO: remove this unwrap
// this is the offering for the remove device
return Arc::new(api.new_peer_connection(config).await.unwrap());
// the connection object that stores all information and callbacks
// for a connection
let peer_connection = Arc::new(api.new_peer_connection(config).await?);
info!("Peer Connection initialized");
let video_track = Arc::new(TrackLocalStaticRTP::new(
RTCRtpCodecCapability {
mime_type: MIME_TYPE_VP8.to_owned(),
..Default::default()
},
machine_uuid.clone() + "_track",
machine_uuid.clone() + "_video_stream",
));
let rtp_sender = peer_connection
.add_track(Arc::clone(&video_track) as Arc<dyn TrackLocal + Send + Sync>)
.await?;
info!("Video track has been added to peer connection");
tokio::spawn(async move {
// handle incoming packets that have been processed by the
// interceptors
let mut rtcp_buf = vec![0u8; 1500];
while let Ok((_, _)) = rtp_sender.read(&mut rtcp_buf).await {}
webrtc::util::Result::<()>::Ok(())
});
return Ok((peer_connection, video_track));
}
/// Register callbacks for the Peer Connection
/// sets up on_ice_connection_state_change (live), on_ice_candidate (local)
/// and on_peer_connection_state_change (live)
/// takes the peer connection, the flag to indicate if the program should quit
/// a reference to the Sender Channel to the remote, and a notifier for when
/// the video should begin being sent
#[instrument(skip_all)]
pub async fn setup_callbacks(
pc: Arc<RTCPeerConnection>,
to_quit: Arc<AtomicBool>,
app_sender: AppSender,
notify_video: Arc<Notify>,
) {
let to_quit_3 = to_quit.clone();
// Set the handler for ICE connection state
// This will notify you when the peer has connected/disconnected
pc.on_ice_connection_state_change(Box::new(move |connection_state: RTCIceConnectionState| {
info!("Connection State has changed {connection_state}");
if connection_state == RTCIceConnectionState::Failed {
error!("On Ice Candidate called for quit!");
to_quit_3.store(true, Ordering::SeqCst);
}
Box::pin(async {})
}));
let to_wc_2 = app_sender.clone();
pc.on_ice_candidate(Box::new(move |c: Option<RTCIceCandidate>| {
//println!("on_ice_candidate {:?}", c);
let to_wc_3 = to_wc_2.clone();
Box::pin(async move {
if let Some(c) = c {
if let Err(e) = to_wc_3.send_blocking(ApplicationMessage::WebRTCIceCandidate(c)) {
error!("Could not send ice candidate to other end! {:?}", e);
}
info!("New trickle ICE packet sent");
}
})
}));
let to_quit_4 = to_quit.clone();
// Set the handler for Peer connection state
// This will notify you when the peer has connected/disconnected
pc.on_peer_connection_state_change(Box::new(move |s: RTCPeerConnectionState| {
info!("Peer Connection State has changed: {s}");
if s == RTCPeerConnectionState::Connected {
notify_video.notify_one();
}
if s == RTCPeerConnectionState::Failed {
// Wait until PeerConnection has had no network activity for 30 seconds or another failure. It may be reconnected using an ICE Restart.
// Use webrtc.PeerConnectionStateDisconnected if you are interested in detecting faster timeout.
// Note that the PeerConnection may come back from PeerConnectionStateDisconnected.
error!("Peer Connection has gone to failed exiting: Done forwarding");
to_quit_4.store(true, Ordering::SeqCst)
}
Box::pin(async {})
}));
}
/// WebRTC steps for offering a connection, trickle ICE, and
/// finish negotiion
pub async fn kickoff_connection(
pc: &Arc<RTCPeerConnection>,
app_sender: AppSender,
app_receiver: &AppReceiver,
) -> Result<(), Error> {
info!("Starting WebRTC handshake");
// Create and store the offer
let offer = pc.create_offer(None).await?;
pc.set_local_description(offer.clone()).await?;
// Send the offer to the remote
if let Err(e) = app_sender
.send(ApplicationMessage::WebRTCPacket(offer))
.await
{
error!("Could not send offer to app_sender!");
panic!("{}", e);
}
info!("WebRTC offer sent!");
// Wait for the response. The RemoteDescription will come first,followed by several
// trickle ICE candidates. Waiting for only one trickle ICE candidate is probably bad
// practice, but it's working right now
loop {
match app_receiver.recv().await {
Ok(ApplicationMessage::WebRTCPacket(pkt)) => {
pc.set_remote_description(pkt).await?;
info!("Remote Description has been registered!");
}
Ok(ApplicationMessage::WebRTCIceCandidateInit(pkt)) => {
pc.add_ice_candidate(pkt).await?;
info!("added ice candidate");
break;
}
Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => {
error!("Got a non init ice candidate. Now what?");
}
Err(e) => {
panic!("WC channel was closed?! {e}");
}
}
}
info!("WebRTC Handshake complete; remote description and ice candidates selected");
Ok(())
}