diff --git a/Cargo.toml b/Cargo.toml index 7b593d9..a0a3f51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,5 +2,14 @@ name = "vcs-common" version = "0.1.0" edition = "2021" +authors = [ "Nickiel nickiel@nickiel.net" ] [dependencies] +async-channel = "2.3.1" +bincode = "1.3.3" +futures-util = "0.3.30" +serde = { version = "1.0.208", features = ["derive"] } +serde_json = "1.0.125" +tokio = { version = "1.39.2", features = ["net", "rt", "sync"] } +tokio-tungstenite = "0.23.1" +tracing = "0.1.40" diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..1ca3a78 --- /dev/null +++ b/flake.lock @@ -0,0 +1,62 @@ +{ + "nodes": { + "nixpkgs": { + "locked": { + "lastModified": 1723637854, + "narHash": "sha256-med8+5DSWa2UnOqtdICndjDAEjxr5D7zaIiK4pn0Q7c=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "c3aa7b8938b17aebd2deecf7be0636000d62a2b9", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "nixpkgs_2": { + "locked": { + "lastModified": 1718428119, + "narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixpkgs-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "nixpkgs": "nixpkgs", + "rust-overlay": "rust-overlay" + } + }, + "rust-overlay": { + "inputs": { + "nixpkgs": "nixpkgs_2" + }, + "locked": { + "lastModified": 1723774846, + "narHash": "sha256-m53hVat6XXiKooV1oUDEMnPcdNKqSn/kAW+g8juSq84=", + "owner": "oxalica", + "repo": "rust-overlay", + "rev": "519f4892641bc04a6ac7c2d260cc68356f9ae90f", + "type": "github" + }, + "original": { + "owner": "oxalica", + "repo": "rust-overlay", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/src/lib.rs b/src/lib.rs index 7d12d9a..cd99056 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,192 @@ + +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use async_channel::{Receiver, Sender}; +use futures_util::stream::{SplitStream, StreamExt}; +use futures_util::SinkExt; +use serde::{Serialize, Deserialize}; +use tokio::{ + net::TcpStream, + sync::oneshot +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::tungstenite::error::Error as tungstenite_error; +use tracing::field::debug; +use tracing::{error, info, instrument}; + +static MAX_MESSAGE: usize = 50; + +#[derive(Serialize, Deserialize)] +pub enum ApplicationMessage { + WebRTCPacket(String), + +} + +type AppSender = Sender; +type AppReceiver = Receiver; + + +/// Takes a ws connection string and a result oneshot +/// It will attempt to connect to the ws string, and will either +/// return an `AppSender` or the websocket error. +/// If the oneshot is unable to be sent to, it will trace and error and close down +#[instrument(skip(result_oneshot))] +pub async fn connect_to_server( + connection_string: String, + result_oneshot: oneshot::Sender> + ) { + + match connect_async(connection_string).await { + Err(e) => { + if let Err(e) = result_oneshot.send(Err(e)) { + error!("WS connection failed, and could not send error back to the main program! \n{:?}", e); + return; + } + } + Ok((mut ws, _)) => { + let (to_core_sender, to_core_reciever) = async_channel::bounded::(MAX_MESSAGE); + let (to_app_events, from_app_events) = async_channel::bounded::(MAX_MESSAGE); + + let res: AppSender = to_core_sender; // to core: endpoint -> remote + let rec: AppReceiver = from_app_events; // from app: remote -> endpoint + + if let Err(e) = result_oneshot.send(Ok((res.clone(), rec))) { + if let Err(e2) = ws.close(None).await { + error!("Could not close connection to websocket! {e2}"); + } + error!("WS connection succeeded, and could not send error back to the main program! \n{:?}", e); + return; + } + + let (mut ws_sender, ws_recv) = ws.split(); + + let to_app: AppSender = to_app_events; + let receiver_is_closed = Arc::new(AtomicBool::new(false)); + let moved_copy = receiver_is_closed.clone(); + + tokio::spawn(async move { + listen_to_ws(to_app, moved_copy, ws_recv) + }); + + while let Ok(msg) = to_core_reciever.recv().await { + #[cfg(debug_assertions)] + { + // serialized message + match serde_json::to_string(&msg) { + Err(e) => error!("Could not serialize ApplicationMessage to JSON! {e}"), + Ok(msg) => { + if let Err(e) = ws_sender.send(Message::text(msg)).await { + error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + + } + + #[cfg(not(debug_assertions))] + { + match bincode::serialize(&msg) { + Err(e) => error!("Could not serialize ApplicationMessage into binary! {e}"), + Ok(e) => { + if let Err(e) = ws_sender.send(Message::binary(msg)).await { + error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}"); + break; + } + } + } + + } + + } + + let _ = ws_sender.close().await; + + info!("Websocket connect successfully"); + } + } +} + + +#[instrument(skip_all)] +async fn listen_to_ws( + to_app: AppSender, + is_closed: Arc, + mut reader: SplitStream>> + ){ + + while let Some(msg) = reader.next().await { + match msg { + Err(e) => { + error!("There was an error getting a message from the remote! {e}"); + } + Ok(msg) => match msg { + Message::Ping(_) | Message::Pong(_) => {} + Message::Close(_) => { + info!("Received WebSocket close message! Closing the websocket"); + break; + } + Message::Frame(_) => { + info!("Received a Frame websocket message?"); + } + Message::Text(text) => { + debug("Recieved text from websocket: {text}"); + #[cfg(debug_assertions)] + { + match serde_json::from_str(&text) { + Ok(msg) => { + if let Err(e) = to_app.send(msg).await { + error!("Could not send message from ws to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}"); + } + } + + } + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Text` message from the remote while running in release mode! " + + "Was the other endpoint running release mode?\n msg: {text}"); + } + } + Message::Binary(msg) => { + #[cfg(debug_assertions)] + { + match bincode::deserialize::(&msg) { + Ok(m) => { + if let Err(e) = to_app.send(m).await { + error!("Could not send message to application! Closing and exiting\n{e}"); + break; + } + } + Err(e) => { + error!("Received a malformed binary message from the websocket!\n{e}"); + } + + } + } + + #[cfg(not(debug_assertions))] + { + warn!("Recieved a `Binary` message from the remote while running in debug mode! " + + "Was the other endpoing running debug mode?"); + } + + } + } + } + } + + is_closed.store(true, std::sync::atomic::Ordering::SeqCst); + +} + + + pub fn add(left: usize, right: usize) -> usize { left + right }