moved to recving websocket model

This commit is contained in:
Nickiel12 2024-09-11 03:58:46 +00:00
parent 571d784347
commit 996e991ee4
5 changed files with 294 additions and 140 deletions

2
Cargo.lock generated
View file

@ -2463,7 +2463,9 @@ name = "vcs-camera-satellite"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-channel", "async-channel",
"bincode",
"config", "config",
"futures-util",
"gstreamer", "gstreamer",
"gstreamer-app", "gstreamer-app",
"log", "log",

View file

@ -24,3 +24,5 @@ webrtc = "0.11.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" } vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
async-channel = "2.3.1" async-channel = "2.3.1"
uuid = "1.10.0" uuid = "1.10.0"
futures-util = "0.3.30"
bincode = "1.3.3"

View file

@ -6,9 +6,8 @@ use std::sync::{
use gstreamer::Buffer; use gstreamer::Buffer;
use gstreamer::{prelude::ElementExt, State}; use gstreamer::{prelude::ElementExt, State};
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use tokio::sync::Notify; use tokio::{net::TcpListener, sync::Notify};
use tracing::{error, info}; use tracing::{error, info, debug};
use web_rtc::{kickoff_connection, setup_callbacks};
use webrtc::{ use webrtc::{
peer_connection::RTCPeerConnection, peer_connection::RTCPeerConnection,
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter}, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
@ -22,6 +21,7 @@ mod rig;
mod tracker; mod tracker;
mod web_rtc; mod web_rtc;
use web_rtc::{kickoff_connection, setup_callbacks, listen_for_socket};
use rig::{start_event_loop, ApplicationEvent}; use rig::{start_event_loop, ApplicationEvent};
use tracker::{tracker_loop, TrackerEvent}; use tracker::{tracker_loop, TrackerEvent};
@ -40,9 +40,8 @@ async fn main() {
// EXIT HANDLER // EXIT HANDLER
let to_quit = Arc::new(AtomicBool::new(false)); let to_quit = Arc::new(AtomicBool::new(false));
let to_quit_2 = to_quit.clone();
let to_quit_3 = to_quit.clone();
let to_quit_5 = to_quit.clone(); let to_quit_5 = to_quit.clone();
let to_quit_6 = to_quit.clone();
// GSTREAMER SETUP // GSTREAMER SETUP
if let Err(e) = gstreamer::init() { if let Err(e) = gstreamer::init() {
@ -52,26 +51,96 @@ async fn main() {
let pipeline = gst::new_pipeline(&config); let pipeline = gst::new_pipeline(&config);
// frame buffers from the gstreamer pipeline to send over the webrtc connection // frame buffers from the gstreamer pipeline to send over the webrtc connection
let (to_stream, mut stream) = tokio::sync::mpsc::channel::<Buffer>(10); let (to_stream, stream) = tokio::sync::mpsc::channel::<Buffer>(10);
let rt = tokio::runtime::Handle::current(); let rt = tokio::runtime::Handle::current();
let app_sender: AppSender;
let app_receiver: AppReceiver;
// connect to remote server let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> = Arc::new(tokio::sync::Mutex::new(stream));
match vcs_common::connect_to_server(
format!("ws://{}:{}", config.destination_ip, config.destination_port), pipeline.sink.set_callbacks(
rt, gst_app::AppSinkCallbacks::builder()
) .new_sample(move |app_sink| {
.await let sample = app_sink
{ .pull_sample()
Err(e) => panic!("There was an error connecting to the remote: {e}"), .map_err(|_| gstreamer::FlowError::Eos)?;
Ok((sender, recvr, _ws_open)) => { let buffer = sample.buffer_owned().ok_or_else(|| {
app_sender = sender; gstreamer::element_error!(
app_receiver = recvr; app_sink,
} gstreamer::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gstreamer::FlowError::Error
})?;
if let Err(e) = to_stream.blocking_send(buffer) {
error!("Error sending to stream of buffers, it was closed: {e}");
to_quit_5.store(true, Ordering::SeqCst);
} }
Ok(gstreamer::FlowSuccess::Ok)
})
.build(),
);
// Start the gstreamer pipeline
if let Err(e) = pipeline.pipeline.set_state(State::Playing) {
panic!("Could not start pipeline! {e}");
}
// set up handler for ctrl_c to quit the application
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
to_quit_6.store(true, std::sync::atomic::Ordering::SeqCst);
// Start the gstreamer pipeline
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
panic!("Could not start pipeline! {e}");
}
}
Err(e) => {
error!("Could not watch ctrl_c signal! {e}");
}
}
});
let (to_mec, mec) = async_channel::bounded::<ApplicationEvent>(20);
tokio::spawn(start_event_loop(mec, to_mec.clone()));
let (to_tec, tec) = async_channel::bounded::<TrackerEvent>(20);
tokio::spawn(tracker_loop(
to_tec.clone(),
tec,
to_mec.clone(),
));
loop {
let to_quit_2 = to_quit.clone();
let to_quit_3 = to_quit.clone();
let video_receiver_stream = stream.clone();
let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!");
info!("Started listening on 127.0.0.1:8765");
let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20);
let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20);
todo!("Need to find a way to cancel this future if to_quit is set manually");
listen_for_socket(&listener, to_core_reciever as AppReceiver, to_app_events as AppSender).await;
if to_quit_3.load(Ordering::SeqCst) {
break;
}
drop(listener);
if let Err(e) = to_tec.send(TrackerEvent::ChangeMEC(app_sender.clone())).await {
error!("There was an error sending a message to the TEC! {e}");
to_quit_2.store(true, Ordering::SeqCst);
}
info!("Setting up web rtc pc status");
// Set up the webrtc peer connection // Set up the webrtc peer connection
let peer_connection: Arc<RTCPeerConnection>; let peer_connection: Arc<RTCPeerConnection>;
let video_track: Arc<TrackLocalStaticRTP>; let video_track: Arc<TrackLocalStaticRTP>;
@ -92,6 +161,7 @@ async fn main() {
.await .await
.expect("Could not send message to the socket message channel"); .expect("Could not send message to the socket message channel");
} }
info!("local information sent to the remote");
let notify_tx = Arc::new(Notify::new()); let notify_tx = Arc::new(Notify::new());
let notify_video = notify_tx.clone(); let notify_video = notify_tx.clone();
@ -103,12 +173,15 @@ async fn main() {
notify_tx, notify_tx,
) )
.await; .await;
info!("webrtc callbacks registered");
tokio::spawn(async move { tokio::spawn(async move {
notify_video.notified().await; notify_video.notified().await;
info!("Starting video stream!"); info!("Starting video stream!");
let mut stream = video_receiver_stream.lock().await;
while let Some(map) = stream.recv().await { while let Some(map) = stream.recv().await {
if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) { if to_quit_2.load(std::sync::atomic::Ordering::SeqCst) {
stream.close(); stream.close();
@ -126,67 +199,18 @@ async fn main() {
} }
}); });
pipeline.sink.set_callbacks( info!("starting webrtc connection kickoff");
gst_app::AppSinkCallbacks::builder()
.new_sample(move |app_sink| {
let sample = app_sink
.pull_sample()
.map_err(|_| gstreamer::FlowError::Eos)?;
let buffer = sample.buffer_owned().ok_or_else(|| {
gstreamer::element_error!(
app_sink,
gstreamer::ResourceError::Failed,
("Failed to get buffer from appsink")
);
gstreamer::FlowError::Error
})?;
if let Err(e) = to_stream.blocking_send(buffer) {
error!("Error sending to stream of buffers: {e}");
to_quit_5.store(true, Ordering::SeqCst);
}
Ok(gstreamer::FlowSuccess::Ok)
})
.build(),
);
// send the offer and trickle ice candidates to the remote, and accept their description // send the offer and trickle ice candidates to the remote, and accept their description
if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await { if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await {
to_quit.store(true, Ordering::SeqCst); to_quit.store(true, Ordering::SeqCst);
panic!("There was an issue with WebRTC setup! {e}"); panic!("There was an issue with WebRTC setup! {e}");
} }
// Start the gstreamer pipeline info!("webrtc kickoff complete, entering watch mode");
if let Err(e) = pipeline.pipeline.set_state(State::Playing) {
panic!("Could not start pipeline! {e}");
}
// set up handler for ctrl_c to quit the application
tokio::spawn(async move {
match tokio::signal::ctrl_c().await {
Ok(()) => {
to_quit.store(true, std::sync::atomic::Ordering::SeqCst);
}
Err(e) => {
error!("Could not watch ctrl_c signal! {e}");
}
}
});
let (to_mec, mec) = async_channel::bounded::<ApplicationEvent>(20);
tokio::spawn(start_event_loop(mec, to_mec.clone()));
let (to_tec, tec) = async_channel::bounded::<TrackerEvent>(20);
tokio::spawn(tracker_loop(
to_tec.clone(),
tec,
to_mec.clone(),
app_sender.clone(),
));
// loop through messages coming from the remote. // loop through messages coming from the remote.
while let Ok(msg) = app_receiver.recv().await { while let Ok(msg) = app_receiver.recv().await {
if to_quit.load(Ordering::SeqCst) { break; }
match msg { match msg {
ApplicationMessage::WebRTCPacket(_pkt) => { ApplicationMessage::WebRTCPacket(_pkt) => {
error!("don't know what to do with this packet!"); error!("don't know what to do with this packet!");
@ -227,4 +251,5 @@ async fn main() {
} }
} }
} }
}
} }

View file

@ -6,11 +6,12 @@ use crate::rig::ApplicationEvent;
pub enum TrackerEvent { pub enum TrackerEvent {
ChangeID(u16), ChangeID(u16),
ChangeMEC(AppSender),
} }
pub struct TrackerState { pub struct TrackerState {
pub target_id: Option<u16>, pub target_id: Option<u16>,
pub app_sender: AppSender, pub app_sender: Option<AppSender>,
} }
impl TrackerState {} impl TrackerState {}
@ -19,11 +20,10 @@ pub async fn tracker_loop(
to_tec: Sender<TrackerEvent>, to_tec: Sender<TrackerEvent>,
tec: Receiver<TrackerEvent>, tec: Receiver<TrackerEvent>,
to_mec: Sender<ApplicationEvent>, to_mec: Sender<ApplicationEvent>,
app_sender: AppSender,
) { ) {
let mut state = TrackerState { let mut state = TrackerState {
target_id: None, target_id: None,
app_sender, app_sender: None,
}; };
while let Ok(msg) = tec.recv().await { while let Ok(msg) = tec.recv().await {
@ -31,6 +31,9 @@ pub async fn tracker_loop(
TrackerEvent::ChangeID(new_id) => { TrackerEvent::ChangeID(new_id) => {
state.target_id = Some(new_id); state.target_id = Some(new_id);
} }
TrackerEvent::ChangeMEC(new_mec) => {
state.app_sender = Some(new_mec);
}
} }
// Pretend we processed a video frame! // Pretend we processed a video frame!
@ -46,8 +49,10 @@ pub async fn tracker_loop(
error!("Could not send tracking update to MEC! {e}"); error!("Could not send tracking update to MEC! {e}");
} }
if state.app_sender.is_some() {
if let Err(e) = state if let Err(e) = state
.app_sender .app_sender
.as_ref().unwrap()
.send(ApplicationMessage::TrackingBoxes( .send(ApplicationMessage::TrackingBoxes(
vcs_common::types::TrackingUpdate { vcs_common::types::TrackingUpdate {
target_id: None, target_id: None,
@ -66,5 +71,6 @@ pub async fn tracker_loop(
break; break;
} }
} }
}
info!("Tracking loop is shutting down"); info!("Tracking loop is shutting down");
} }

View file

@ -4,7 +4,11 @@ use std::sync::{
}; };
use tokio::sync::Notify; use tokio::sync::Notify;
use tracing::{error, info, instrument}; use tokio::net::TcpListener;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::accept_async;
use futures_util::{SinkExt, StreamExt};
use tracing::{error, info, debug, warn, instrument};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage}; use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
use webrtc::{ use webrtc::{
api::{ api::{
@ -168,6 +172,9 @@ pub async fn kickoff_connection(
) -> Result<(), Error> { ) -> Result<(), Error> {
info!("Starting WebRTC handshake"); info!("Starting WebRTC handshake");
warn!("Still using ice gathering complete");
pc.gathering_complete_promise().await;
// Create and store the offer // Create and store the offer
let offer = pc.create_offer(None).await?; let offer = pc.create_offer(None).await?;
@ -198,7 +205,7 @@ pub async fn kickoff_connection(
info!("added ice candidate"); info!("added ice candidate");
break; break;
} }
Ok(ApplicationMessage::WebRTCIceCandidate(_pkt)) => { Ok(ApplicationMessage::WebRTCIceCandidate(pkt)) => {
error!("Got a non init ice candidate. Now what?"); error!("Got a non init ice candidate. Now what?");
} }
Ok(ApplicationMessage::NameRequest(Some(name))) => { Ok(ApplicationMessage::NameRequest(Some(name))) => {
@ -227,3 +234,115 @@ pub async fn kickoff_connection(
Ok(()) Ok(())
} }
pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender) {
if let Ok((stream, _)) = listener.accept().await {
match accept_async(stream).await {
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
Ok(ws_stream) => {
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
tokio::spawn(async move {
while let Ok(msg) = from_app.recv().await {
#[cfg(debug_assertions)]
{
// serialized message
match serde_json::to_string(&msg) {
Err(e) => {
error!("Could not serialize ApplicationMessage to JSON! {e}")
}
Ok(msg) => {
if let Err(e) = ws_sender.send(Message::text(msg)).await {
error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
#[cfg(not(debug_assertions))]
{
match bincode::serialize(&msg) {
Err(e) => error!(
"Could not serialize ApplicationMessage into binary! {e}"
),
Ok(e) => {
if let Err(e) = sender.send(Message::binary(msg)).await {
error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
}
});
tokio::spawn(async move {
while let Some(msg) = ws_receiver.next().await {
match msg {
Err(e) => {
error!("There was an error getting a message from the remote! {e}");
}
Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => {
info!("Received WebSocket close message! Closing the websocket");
break;
}
Message::Frame(_) => {
info!("Received a Frame websocket message?");
}
Message::Text(text) => {
debug!("Recieved text from websocket: {text}");
#[cfg(debug_assertions)]
{
match serde_json::from_str(&text) {
Ok(msg) => {
if let Err(e) = to_ui.send(msg).await {
error!("Could not send message from ws to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Text` message from the remote while running in release mode! " +
"Was the other endpoint running release mode?\n msg: {text}");
}
}
Message::Binary(msg) => {
#[cfg(debug_assertions)]
{
match bincode::deserialize::<ApplicationMessage>(&msg) {
Ok(m) => {
if let Err(e) = to_ui.send(m).await {
error!("Could not send message to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed binary message from the websocket!\n{e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
"Was the other endpoing running debug mode?");
}
}
},
}
}
warn!("The websocket listener closed");
});
}
}
}
}