got working webrtc websocket (but not video)

This commit is contained in:
Nickiel12 2024-08-20 20:58:38 -07:00
parent f00e981a3c
commit 95f628ce44
8 changed files with 1828 additions and 399 deletions

2006
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -21,7 +21,7 @@ async-channel = "2.2.0"
async-recursion = "1.1.1"
config = "0.14.0"
futures-core = "0.3.30"
futures-util = { version = "0.3.30", features = ["tokio-io"] }
futures-util = { version = "0.3.30" }
gilrs = "0.10.6"
gstreamer = { version = "0.22.4", features = ["v1_22"] }
gstreamer-app = { version = "0.22.0", features = ["v1_22"] }
@ -39,4 +39,7 @@ console-subscriber = "0.3.0"
tauri = { version = "1.6.1", features = [] }
lazy_static = "1.5.0"
vcs-common = { git = "https://git.nickiel.net/VCC/vcs-common.git", branch = "main" }
bincode = "1.3.3"

View file

@ -56,7 +56,7 @@ impl WebcamPipeline {
})?;
let webrtc_sink = ElementFactory::make("webrtcsink")
.property("meta", "meta,name=gst-stream")
// .property("meta", "meta,name=gst-stream")
.name("web rtc sink")
.build()
.context(BuildSnafu {

View file

@ -1,15 +1,17 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use std::sync::{Arc, Mutex};
use std::{sync::{Arc, Mutex}, time::Duration};
use async_channel::Sender;
use tokio::{runtime, sync:: RwLock};
use tracing::{self, info, error};
use tauri::Manager;
use tauri::{AppHandle, Manager, Window};
use lazy_static::lazy_static;
#[cfg(not(debug_assertions))]
use tracing_subscriber;
use vcs_common::ApplicationMessage;
use webrtc_remote::start_listener;
use crate::config::{load_config, AppConfig};
@ -19,11 +21,14 @@ mod gstreamer_pipeline;
mod sources;
mod states;
mod tauri_functions;
mod webrtc_remote;
use coordinator::{start_coordinator, ApplicationEvent};
lazy_static! {
static ref TO_MEC_REF: Mutex<Option<Sender<ApplicationEvent>>> = Mutex::new(None);
static ref TO_WEBRTC: Mutex<Option<Sender<ApplicationMessage>>> = Mutex::new(None);
static ref APP_HANDLE: Mutex<Option<AppHandle>> = Mutex::new(None);
}
fn main() {
@ -70,10 +75,69 @@ fn main() {
*TO_MEC_REF.lock().unwrap() = Some(to_mec.clone());
let (to_webrtc_send, to_webrtc_recv) = async_channel::bounded::<vcs_common::ApplicationMessage>(10);
let (from_webrtc_send, from_webrtc_recv) = async_channel::bounded::<vcs_common::ApplicationMessage>(10);
rt.handle().spawn(start_listener(to_webrtc_recv, from_webrtc_send));
*TO_WEBRTC.lock().unwrap() = Some(to_webrtc_send.clone());
rt.handle().spawn(async move {
while let Ok(msg) = from_webrtc_recv.recv().await {
let mut do_sleep = false;
{
if let Ok(mut e) = APP_HANDLE.lock() {
if e.is_none() {
do_sleep = true;
} else {
let handle = e.take().unwrap();
match msg {
vcs_common::ApplicationMessage::WebRTCPacket(msg) => {
handle.emit_all("frontend_message", serde_json::to_string(&msg).unwrap()).unwrap();
}
}
*e = Some(handle);
}
}
}
if do_sleep {
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
});
tauri::Builder::default()
.manage(tauri_functions::TauriState { to_mec: to_mec.clone() })
.manage(tauri_functions::TauriState { to_mec: to_mec.clone(), to_webrtc: to_webrtc_send })
.invoke_handler(tauri::generate_handler![tauri_functions::connect_to_camera])
.setup(|app| {
*APP_HANDLE.lock().unwrap() = Some(app.handle());
let _id2 = app.listen_global("webrtc-message", |event| {
match event.payload() {
Some(payload) => {
if let Ok(e) = TO_WEBRTC.lock() {
match e.as_ref() {
Some(to_webrtc) => {
if let Err(e) = to_webrtc.send_blocking(ApplicationMessage::WebRTCPacket(serde_json::from_str(payload).expect("Could not decode the browser's sdp"))) {
error!("Could not send to mec! {e}");
}
},
None => {
error!("TO_MEC_REF was none!");
}
}
}
}
None => {
info!("There was an empty payload!");
}
}
});
let _id = app.listen_global("webrtc-event", |event| {
match event.payload() {
Some(payload) => {

View file

@ -8,6 +8,7 @@ use crate::coordinator::ApplicationEvent;
pub struct TauriState {
pub to_mec: Sender<ApplicationEvent>,
pub to_webrtc: Sender<vcs_common::ApplicationMessage>,
}
#[tauri::command]

132
src/webrtc_remote.rs Normal file
View file

@ -0,0 +1,132 @@
use std::{cmp::Ordering, sync::{atomic::AtomicBool, Arc}};
use futures_util::{StreamExt, SinkExt};
use tokio::{net::TcpListener, sync::Mutex};
use tokio_tungstenite::{accept_async, tungstenite::Message};
use tracing::{error, info, instrument, debug};
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
#[instrument(skip_all)]
pub async fn start_listener(
from_app: AppReceiver,
to_ui: AppSender,
) {
info!("starting tcplistener");
let listener = TcpListener::bind("localhost:7891").await.unwrap();
info!("Listening for webrtc connections on localhost:7891");
if let Ok((stream, _)) = listener.accept().await {
match accept_async(stream).await {
Err(e) => error!("Could not convert incoming stream to websocket: {e}"),
Ok(ws_stream) => {
let (mut ws_sender, mut ws_receiver) = ws_stream.split();
tokio::spawn(async move {
while let Ok(msg) = from_app.recv().await {
#[cfg(debug_assertions)]
{
// serialized message
match serde_json::to_string(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage to JSON! {e}"),
Ok(msg) => {
if let Err(e) = ws_sender.send(Message::text(msg)).await {
error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
#[cfg(not(debug_assertions))]
{
match bincode::serialize(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage into binary! {e}"),
Ok(e) => {
if let Err(e) = sender.send(Message::binary(msg)).await {
error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
}
});
while let Some(msg) = ws_receiver.next().await {
match msg {
Err(e) => {
error!("There was an error getting a message from the remote! {e}");
}
Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => {
info!("Received WebSocket close message! Closing the websocket");
break;
}
Message::Frame(_) => {
info!("Received a Frame websocket message?");
}
Message::Text(text) => {
debug!("Recieved text from websocket: {text}");
#[cfg(debug_assertions)]
{
match serde_json::from_str(&text) {
Ok(msg) => {
if let Err(e) = to_ui.send(msg).await {
error!("Could not send message from ws to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Text` message from the remote while running in release mode! " +
"Was the other endpoint running release mode?\n msg: {text}");
}
}
Message::Binary(msg) => {
#[cfg(debug_assertions)]
{
match bincode::deserialize::<ApplicationMessage>(&msg) {
Ok(m) => {
if let Err(e) = to_ui.send(m).await {
error!("Could not send message to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed binary message from the websocket!\n{e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
"Was the other endpoing running debug mode?");
}
}
}
}
}
}
}
}
}

View file

@ -46,7 +46,6 @@
<script>
feather.replace();
</script>
<script src="/static/gstreamer-rtc.js"></script>
</body>
</html>

View file

@ -25,7 +25,7 @@ pc.onnegotionationneeded = async () => {
try {
makingOffer = true;
await pc.setLocalDescription();
emit("webrtc-event", { description: pc.localDescription });
emit("webrtc-message", { description: pc.localDescription });
} catch (err) {
console.error(err);
} finally {
@ -33,7 +33,7 @@ pc.onnegotionationneeded = async () => {
}
};
pc.onicecandidate = ({ candidate }) => emit("webrtc-event", { candidate });
pc.onicecandidate = ({ candidate }) => emit("webrtc-message", { candidate });
pc.oniceconnectionstatechange = () => {
if (pc.iceConnectionState === "failed") {
pc.restartIce();
@ -63,7 +63,7 @@ const application_message = await listen('frontend_message', async (event) => {
await pc.setRemoteDescription(description);
if (description.type === "offer") {
await pc.setLocalDescription();
emit( "webrtc-event", { description: pc.localDescription });
emit( "webrtc-message", { description: pc.localDescription });
}
} else if (candidate) {
try {
@ -82,4 +82,8 @@ const application_message = await listen('frontend_message', async (event) => {
async function start_rtc_connection() {
try {
const stream = await navigator.mediaDevices.getUserMedia({ audio: false, video: false });
}
}