some webrtc testing
This commit is contained in:
parent
400b33adb2
commit
98c1a66f95
4 changed files with 170 additions and 123 deletions
28
src/gst.rs
28
src/gst.rs
|
@ -20,9 +20,17 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
||||||
.name("camera_to_rtp_pipeine")
|
.name("camera_to_rtp_pipeine")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
let source = ElementFactory::make("mfvideosrc").build().unwrap();
|
let source = ElementFactory::make("mfvideosrc")
|
||||||
|
.build()
|
||||||
|
.expect("Could not make mfvideosrc element!");
|
||||||
|
|
||||||
let video_convert = ElementFactory::make("videoconvert").build().unwrap();
|
let video_convert = ElementFactory::make("videoconvert")
|
||||||
|
.build()
|
||||||
|
.expect("Could not make videoconvert gst element!");
|
||||||
|
|
||||||
|
let video_rate = ElementFactory::make("videorate")
|
||||||
|
.build()
|
||||||
|
.expect("Could not make videoscale gst element!");
|
||||||
|
|
||||||
let video_scale = ElementFactory::make("videoscale")
|
let video_scale = ElementFactory::make("videoscale")
|
||||||
.property("add-borders", true)
|
.property("add-borders", true)
|
||||||
|
@ -32,13 +40,18 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
||||||
let video_scale_caps = gstreamer::Caps::builder("video/x-raw")
|
let video_scale_caps = gstreamer::Caps::builder("video/x-raw")
|
||||||
.field("height", HEIGHT as i32)
|
.field("height", HEIGHT as i32)
|
||||||
.field("width", (HEIGHT as f64 * config.aspect_ratio) as i32)
|
.field("width", (HEIGHT as f64 * config.aspect_ratio) as i32)
|
||||||
|
// .field("framerate", gstreamer::Fraction::new(30, 1))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
// We are using VP8 because VP9 resulted in much worse video quality
|
// We are using VP8 because VP9 resulted in much worse video quality
|
||||||
// when testing -NY 8/25/2024
|
// when testing -NY 8/25/2024
|
||||||
let vp8enc = ElementFactory::make("vp8enc").build().unwrap();
|
let vp8enc = ElementFactory::make("vp8enc")
|
||||||
|
.build()
|
||||||
|
.expect("Could not make vp8enc gst element!");
|
||||||
|
|
||||||
let rtp = ElementFactory::make("rtpvp8pay").build().unwrap();
|
let rtp = ElementFactory::make("rtpvp8pay")
|
||||||
|
.build()
|
||||||
|
.expect("Could not make rtpvp8pay gst element!");
|
||||||
|
|
||||||
let app_sink = gst_app::AppSink::builder().build();
|
let app_sink = gst_app::AppSink::builder().build();
|
||||||
|
|
||||||
|
@ -46,6 +59,7 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
||||||
.add_many([
|
.add_many([
|
||||||
&source,
|
&source,
|
||||||
&video_convert,
|
&video_convert,
|
||||||
|
&video_rate,
|
||||||
&video_scale,
|
&video_scale,
|
||||||
&vp8enc,
|
&vp8enc,
|
||||||
&rtp,
|
&rtp,
|
||||||
|
@ -53,13 +67,15 @@ pub fn new_pipeline(config: &AppConfig) -> Pipeline {
|
||||||
])
|
])
|
||||||
.expect("Could not add all the stuff to the pipeline");
|
.expect("Could not add all the stuff to the pipeline");
|
||||||
|
|
||||||
gst::Element::link_many(&[&source, &video_convert, &video_scale]).unwrap();
|
gst::Element::link_many(&[&source, &video_convert, &video_rate, &video_scale])
|
||||||
|
.expect("Could not link source through video scale!");
|
||||||
|
|
||||||
video_scale
|
video_scale
|
||||||
.link_filtered(&vp8enc, &video_scale_caps)
|
.link_filtered(&vp8enc, &video_scale_caps)
|
||||||
.expect("Could not link videoscale to vp8enc with caps!");
|
.expect("Could not link videoscale to vp8enc with caps!");
|
||||||
|
|
||||||
gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()]).unwrap();
|
gst::Element::link_many(&[&vp8enc, &rtp, app_sink.upcast_ref()])
|
||||||
|
.expect("Could not gst link vp8enc through appsink!");
|
||||||
|
|
||||||
Pipeline {
|
Pipeline {
|
||||||
pipeline,
|
pipeline,
|
||||||
|
|
151
src/main.rs
151
src/main.rs
|
@ -3,12 +3,12 @@ use std::sync::{
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use gstreamer::Buffer;
|
use gstreamer::{Buffer, FlowError, FlowSuccess};
|
||||||
use gstreamer::{prelude::ElementExt, State};
|
use gstreamer::{prelude::ElementExt, State};
|
||||||
use gstreamer_app as gst_app;
|
use gstreamer_app as gst_app;
|
||||||
use tokio::{net::TcpListener, sync::Notify};
|
use tokio::{net::TcpListener, sync::Notify};
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use tracing::{error, info, debug};
|
use tracing::{debug, error, info};
|
||||||
use webrtc::{
|
use webrtc::{
|
||||||
peer_connection::RTCPeerConnection,
|
peer_connection::RTCPeerConnection,
|
||||||
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
|
track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocalWriter},
|
||||||
|
@ -22,9 +22,9 @@ mod rig;
|
||||||
mod tracker;
|
mod tracker;
|
||||||
mod web_rtc;
|
mod web_rtc;
|
||||||
|
|
||||||
use web_rtc::{kickoff_connection, setup_callbacks, listen_for_socket};
|
|
||||||
use rig::{start_event_loop, ApplicationEvent};
|
use rig::{start_event_loop, ApplicationEvent};
|
||||||
use tracker::{tracker_loop, TrackerEvent};
|
use tracker::{tracker_loop, TrackerEvent};
|
||||||
|
use web_rtc::{kickoff_connection, listen_for_socket, setup_callbacks};
|
||||||
|
|
||||||
const SATELLITE_NAME: &str = "CameraSatellite_1";
|
const SATELLITE_NAME: &str = "CameraSatellite_1";
|
||||||
|
|
||||||
|
@ -57,12 +57,13 @@ async fn main() {
|
||||||
|
|
||||||
let rt = tokio::runtime::Handle::current();
|
let rt = tokio::runtime::Handle::current();
|
||||||
|
|
||||||
let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> = Arc::new(tokio::sync::Mutex::new(stream));
|
let stream: Arc<tokio::sync::Mutex<tokio::sync::mpsc::Receiver<Buffer>>> =
|
||||||
|
Arc::new(tokio::sync::Mutex::new(stream));
|
||||||
|
|
||||||
let cs1 = cancel_tasks.clone();
|
let cs1 = cancel_tasks.clone();
|
||||||
pipeline.sink.set_callbacks(
|
pipeline.sink.set_callbacks(
|
||||||
gst_app::AppSinkCallbacks::builder()
|
gst_app::AppSinkCallbacks::builder()
|
||||||
.new_sample(move |app_sink| {
|
.new_sample(move |app_sink| -> Result<FlowSuccess, FlowError> {
|
||||||
let sample = app_sink
|
let sample = app_sink
|
||||||
.pull_sample()
|
.pull_sample()
|
||||||
.map_err(|_| gstreamer::FlowError::Eos)?;
|
.map_err(|_| gstreamer::FlowError::Eos)?;
|
||||||
|
@ -79,6 +80,7 @@ async fn main() {
|
||||||
error!("Error sending to stream of buffers, it was closed: {e}");
|
error!("Error sending to stream of buffers, it was closed: {e}");
|
||||||
cs1.cancel();
|
cs1.cancel();
|
||||||
to_quit_5.store(true, Ordering::SeqCst);
|
to_quit_5.store(true, Ordering::SeqCst);
|
||||||
|
return Err(gstreamer::FlowError::Error);
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(gstreamer::FlowSuccess::Ok)
|
Ok(gstreamer::FlowSuccess::Ok)
|
||||||
|
@ -111,12 +113,7 @@ async fn main() {
|
||||||
tokio::spawn(start_event_loop(mec, to_mec.clone()));
|
tokio::spawn(start_event_loop(mec, to_mec.clone()));
|
||||||
|
|
||||||
let (to_tec, tec) = async_channel::bounded::<TrackerEvent>(20);
|
let (to_tec, tec) = async_channel::bounded::<TrackerEvent>(20);
|
||||||
tokio::spawn(tracker_loop(
|
tokio::spawn(tracker_loop(to_tec.clone(), tec, to_mec.clone()));
|
||||||
to_tec.clone(),
|
|
||||||
tec,
|
|
||||||
to_mec.clone(),
|
|
||||||
));
|
|
||||||
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let to_quit_2 = to_quit.clone();
|
let to_quit_2 = to_quit.clone();
|
||||||
|
@ -127,13 +124,14 @@ async fn main() {
|
||||||
let reset_connection_3 = reset_connection.clone();
|
let reset_connection_3 = reset_connection.clone();
|
||||||
|
|
||||||
let video_receiver_stream = stream.clone();
|
let video_receiver_stream = stream.clone();
|
||||||
let listener = TcpListener::bind("127.0.0.1:8765").await.expect("Could not bind tcp listener!");
|
let listener = TcpListener::bind("127.0.0.1:8765")
|
||||||
|
.await
|
||||||
|
.expect("Could not bind tcp listener!");
|
||||||
info!("Started listening on 127.0.0.1:8765");
|
info!("Started listening on 127.0.0.1:8765");
|
||||||
|
|
||||||
let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20);
|
let (app_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(20);
|
||||||
let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20);
|
let (to_app_events, app_receiver) = async_channel::bounded::<ApplicationMessage>(20);
|
||||||
|
|
||||||
|
|
||||||
if to_quit_3.load(Ordering::SeqCst) {
|
if to_quit_3.load(Ordering::SeqCst) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -151,7 +149,10 @@ async fn main() {
|
||||||
|
|
||||||
drop(listener);
|
drop(listener);
|
||||||
|
|
||||||
if let Err(e) = to_tec.send(TrackerEvent::ChangeMEC(app_sender.clone())).await {
|
if let Err(e) = to_tec
|
||||||
|
.send(TrackerEvent::ChangeMEC(app_sender.clone()))
|
||||||
|
.await
|
||||||
|
{
|
||||||
error!("There was an error sending a message to the TEC! {e}");
|
error!("There was an error sending a message to the TEC! {e}");
|
||||||
to_quit_2.store(true, Ordering::SeqCst);
|
to_quit_2.store(true, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
|
@ -170,7 +171,6 @@ async fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Send the local description to the remote
|
// Send the local description to the remote
|
||||||
if let Some(local_desc) = peer_connection.local_description().await {
|
if let Some(local_desc) = peer_connection.local_description().await {
|
||||||
app_sender
|
app_sender
|
||||||
|
@ -222,65 +222,90 @@ async fn main() {
|
||||||
|
|
||||||
info!("starting webrtc connection kickoff");
|
info!("starting webrtc connection kickoff");
|
||||||
// send the offer and trickle ice candidates to the remote, and accept their description
|
// send the offer and trickle ice candidates to the remote, and accept their description
|
||||||
if let Err(e) = kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await {
|
if let Err(e) =
|
||||||
|
kickoff_connection(&peer_connection, app_sender.clone(), &app_receiver).await
|
||||||
|
{
|
||||||
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
|
error!("There was an issue with WebRTC setup! Resetting connection: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("webrtc kickoff complete, entering watch mode");
|
info!("webrtc kickoff complete, entering watch mode");
|
||||||
|
|
||||||
// loop through messages coming from the remote.
|
// loop through messages coming from the remote.
|
||||||
while let Ok(msg) = app_receiver.recv().await {
|
loop {
|
||||||
if to_quit.load(Ordering::SeqCst) { break; }
|
tokio::select! {
|
||||||
match msg {
|
_ = cancel_tasks.cancelled() => {
|
||||||
ApplicationMessage::WebRTCPacket(_pkt) => {
|
info!("Event Loop cancelled flag caught");
|
||||||
error!("don't know what to do with this packet!");
|
|
||||||
}
|
|
||||||
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
|
|
||||||
if let Err(e) = peer_connection.add_ice_candidate(pkt).await {
|
|
||||||
error!("There was an error adding the trickle ICE candidate! {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApplicationMessage::WebRTCIceCandidate(_pkg) => {
|
|
||||||
error!("Unhandled ice candidate!");
|
|
||||||
}
|
|
||||||
ApplicationMessage::NameRequest(Some(name)) => info!("Got a message about '{}'", name),
|
|
||||||
ApplicationMessage::NameRequest(None) => {
|
|
||||||
if let Err(e) = app_sender
|
|
||||||
.send(ApplicationMessage::NameRequest(Some(
|
|
||||||
SATELLITE_NAME.to_owned(),
|
|
||||||
)))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("Could not let the remote know my name! {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApplicationMessage::ChangeTrackingID(id) => {
|
|
||||||
if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await {
|
|
||||||
error!("Could not send message to tracker state! Closing down. {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApplicationMessage::TrackingBoxes(_) => {
|
|
||||||
error!("I got a tracking boxes message?");
|
|
||||||
}
|
|
||||||
ApplicationMessage::ManualMovementOverride((x, y)) => {
|
|
||||||
if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await {
|
|
||||||
error!("Could not send manual override to state machine! {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ApplicationMessage::CloseConnection => {
|
|
||||||
info!("Received connection closing, breaking the loop");
|
|
||||||
to_quit.store(true, Ordering::SeqCst);
|
|
||||||
cancel_tasks.cancel();
|
|
||||||
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Ok(msg) = app_receiver.recv() => {
|
||||||
|
if to_quit.load(Ordering::SeqCst) {
|
||||||
|
info!("ToQuit set, breaking app_receiver set");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
match msg {
|
||||||
|
ApplicationMessage::WebRTCPacket(_pkt) => {
|
||||||
|
error!("don't know what to do with this packet!");
|
||||||
|
}
|
||||||
|
ApplicationMessage::WebRTCIceCandidateInit(pkt) => {
|
||||||
|
if let Err(e) = peer_connection.add_ice_candidate(pkt).await {
|
||||||
|
error!("There was an error adding the trickle ICE candidate! {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ApplicationMessage::WebRTCIceCandidate(_pkg) => {
|
||||||
|
error!("Unhandled ice candidate!");
|
||||||
|
}
|
||||||
|
ApplicationMessage::NameRequest(Some(name)) => {
|
||||||
|
info!("Got a message about '{}'", name)
|
||||||
|
}
|
||||||
|
ApplicationMessage::NameRequest(None) => {
|
||||||
|
if let Err(e) = app_sender
|
||||||
|
.send(ApplicationMessage::NameRequest(Some(
|
||||||
|
SATELLITE_NAME.to_owned(),
|
||||||
|
)))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
error!("Could not let the remote know my name! {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ApplicationMessage::ChangeTrackingID(id) => {
|
||||||
|
if let Err(e) = to_tec.send(TrackerEvent::ChangeID(id)).await {
|
||||||
|
error!("Could not send message to tracker state! Closing down. {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ApplicationMessage::TrackingBoxes(_) => {
|
||||||
|
error!("I got a tracking boxes message?");
|
||||||
|
}
|
||||||
|
ApplicationMessage::ManualMovementOverride((x, y)) => {
|
||||||
|
if let Err(e) = to_mec.send(ApplicationEvent::ManualMoveEvent((x, y))).await {
|
||||||
|
error!("Could not send manual override to state machine! {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ApplicationMessage::CloseConnection => {
|
||||||
|
info!("Received connection closing, breaking the loop");
|
||||||
|
to_quit.store(true, Ordering::SeqCst);
|
||||||
|
cancel_tasks.cancel();
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Err(e) = peer_connection.close().await {
|
||||||
|
error!("Got an error while closing the webrtc connection! {}", e);
|
||||||
|
}
|
||||||
|
app_sender.close();
|
||||||
|
app_receiver.close();
|
||||||
|
info!("Loop is exiting!");
|
||||||
|
|
||||||
}
|
}
|
||||||
info!("WebRTC loop exited, should be closing down\nTo_Quit: {}", to_quit.load(Ordering::SeqCst));
|
info!(
|
||||||
|
"WebRTC loop exited, should be closing down\nTo_Quit: {}",
|
||||||
|
to_quit.load(Ordering::SeqCst)
|
||||||
|
);
|
||||||
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
|
if let Err(e) = pipeline.pipeline.set_state(State::Null) {
|
||||||
panic!("Could not start pipeline! {e}");
|
panic!("Could not start pipeline! {e}");
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,8 @@ pub async fn tracker_loop(
|
||||||
if state.app_sender.is_some() {
|
if state.app_sender.is_some() {
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.app_sender
|
.app_sender
|
||||||
.as_ref().unwrap()
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
.send(ApplicationMessage::TrackingBoxes(
|
.send(ApplicationMessage::TrackingBoxes(
|
||||||
vcs_common::types::TrackingUpdate {
|
vcs_common::types::TrackingUpdate {
|
||||||
target_id: None,
|
target_id: None,
|
||||||
|
|
109
src/web_rtc.rs
109
src/web_rtc.rs
|
@ -3,12 +3,12 @@ use std::sync::{
|
||||||
Arc,
|
Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use tokio::sync::Notify;
|
|
||||||
use tokio::net::TcpListener;
|
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
|
||||||
use tokio_tungstenite::accept_async;
|
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use tracing::{error, info, debug, warn, instrument};
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
use tokio_tungstenite::accept_async;
|
||||||
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
use tracing::{debug, error, info, instrument, warn};
|
||||||
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
|
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
|
||||||
use webrtc::{
|
use webrtc::{
|
||||||
api::{
|
api::{
|
||||||
|
@ -239,7 +239,12 @@ pub async fn kickoff_connection(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to_ui: AppSender, reset_connection: Arc<AtomicBool>) {
|
pub async fn listen_for_socket(
|
||||||
|
listener: &TcpListener,
|
||||||
|
from_app: AppReceiver,
|
||||||
|
to_ui: AppSender,
|
||||||
|
reset_connection: Arc<AtomicBool>,
|
||||||
|
) {
|
||||||
if let Ok((stream, _)) = listener.accept().await {
|
if let Ok((stream, _)) = listener.accept().await {
|
||||||
match accept_async(stream).await {
|
match accept_async(stream).await {
|
||||||
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
|
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
|
||||||
|
@ -287,62 +292,63 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("There was an error getting a message from the remote! {e}");
|
error!("There was an error getting a message from the remote! {e}");
|
||||||
reset_connection.store(true, Ordering::SeqCst);
|
reset_connection.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
}
|
}
|
||||||
Ok(msg) => match msg {
|
Ok(msg) => {
|
||||||
Message::Ping(_) | Message::Pong(_) => {}
|
match msg {
|
||||||
Message::Close(_) => {
|
Message::Ping(_) | Message::Pong(_) => {}
|
||||||
info!("Received WebSocket close message! Closing the websocket");
|
Message::Close(_) => {
|
||||||
break;
|
info!("Received WebSocket close message! Closing the websocket");
|
||||||
}
|
break;
|
||||||
Message::Frame(_) => {
|
}
|
||||||
info!("Received a Frame websocket message?");
|
Message::Frame(_) => {
|
||||||
}
|
info!("Received a Frame websocket message?");
|
||||||
Message::Text(text) => {
|
}
|
||||||
debug!("Recieved text from websocket: {text}");
|
Message::Text(text) => {
|
||||||
#[cfg(debug_assertions)]
|
debug!("Recieved text from websocket: {text}");
|
||||||
{
|
#[cfg(debug_assertions)]
|
||||||
match serde_json::from_str(&text) {
|
{
|
||||||
Ok(msg) => {
|
match serde_json::from_str(&text) {
|
||||||
if let Err(e) = to_ui.send(msg).await {
|
Ok(msg) => {
|
||||||
error!("Could not send message from ws to application! Closing and exiting\n{e}");
|
if let Err(e) = to_ui.send(msg).await {
|
||||||
break;
|
error!("Could not send message from ws to application! Closing and exiting\n{e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
#[cfg(not(debug_assertions))]
|
||||||
#[cfg(not(debug_assertions))]
|
{
|
||||||
{
|
warn!("Recieved a `Text` message from the remote while running in release mode! " +
|
||||||
warn!("Recieved a `Text` message from the remote while running in release mode! " +
|
|
||||||
"Was the other endpoint running release mode?\n msg: {text}");
|
"Was the other endpoint running release mode?\n msg: {text}");
|
||||||
}
|
|
||||||
}
|
|
||||||
Message::Binary(msg) => {
|
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
{
|
|
||||||
match bincode::deserialize::<ApplicationMessage>(&msg) {
|
|
||||||
Ok(m) => {
|
|
||||||
if let Err(e) = to_ui.send(m).await {
|
|
||||||
error!("Could not send message to application! Closing and exiting\n{e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Received a malformed binary message from the websocket!\n{e}");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Message::Binary(msg) => {
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
{
|
||||||
|
match bincode::deserialize::<ApplicationMessage>(&msg) {
|
||||||
|
Ok(m) => {
|
||||||
|
if let Err(e) = to_ui.send(m).await {
|
||||||
|
error!("Could not send message to application! Closing and exiting\n{e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Received a malformed binary message from the websocket!\n{e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(not(debug_assertions))]
|
#[cfg(not(debug_assertions))]
|
||||||
{
|
{
|
||||||
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
|
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
|
||||||
"Was the other endpoing running debug mode?");
|
"Was the other endpoing running debug mode?");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
warn!("The websocket listener closed");
|
warn!("The websocket listener closed");
|
||||||
|
@ -351,4 +357,3 @@ pub async fn listen_for_socket(listener: &TcpListener, from_app: AppReceiver, to
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue