set up basic functionality

This commit is contained in:
Nickiel12 2024-08-17 02:36:10 +00:00
parent 898d5181e0
commit 9f61405000
3 changed files with 260 additions and 0 deletions

View file

@ -2,5 +2,14 @@
name = "vcs-common" name = "vcs-common"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
authors = [ "Nickiel nickiel@nickiel.net" ]
[dependencies] [dependencies]
async-channel = "2.3.1"
bincode = "1.3.3"
futures-util = "0.3.30"
serde = { version = "1.0.208", features = ["derive"] }
serde_json = "1.0.125"
tokio = { version = "1.39.2", features = ["net", "rt", "sync"] }
tokio-tungstenite = "0.23.1"
tracing = "0.1.40"

62
flake.lock Normal file
View file

@ -0,0 +1,62 @@
{
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1723637854,
"narHash": "sha256-med8+5DSWa2UnOqtdICndjDAEjxr5D7zaIiK4pn0Q7c=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "c3aa7b8938b17aebd2deecf7be0636000d62a2b9",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"nixpkgs_2": {
"locked": {
"lastModified": 1718428119,
"narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"nixpkgs": "nixpkgs_2"
},
"locked": {
"lastModified": 1723774846,
"narHash": "sha256-m53hVat6XXiKooV1oUDEMnPcdNKqSn/kAW+g8juSq84=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "519f4892641bc04a6ac7c2d260cc68356f9ae90f",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

View file

@ -1,3 +1,192 @@
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use async_channel::{Receiver, Sender};
use futures_util::stream::{SplitStream, StreamExt};
use futures_util::SinkExt;
use serde::{Serialize, Deserialize};
use tokio::{
net::TcpStream,
sync::oneshot
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::error::Error as tungstenite_error;
use tracing::field::debug;
use tracing::{error, info, instrument};
static MAX_MESSAGE: usize = 50;
#[derive(Serialize, Deserialize)]
pub enum ApplicationMessage {
WebRTCPacket(String),
}
type AppSender = Sender<ApplicationMessage>;
type AppReceiver = Receiver<ApplicationMessage>;
/// Takes a ws connection string and a result oneshot
/// It will attempt to connect to the ws string, and will either
/// return an `AppSender` or the websocket error.
/// If the oneshot is unable to be sent to, it will trace and error and close down
#[instrument(skip(result_oneshot))]
pub async fn connect_to_server(
connection_string: String,
result_oneshot: oneshot::Sender<Result<(AppSender, AppReceiver), tungstenite_error>>
) {
match connect_async(connection_string).await {
Err(e) => {
if let Err(e) = result_oneshot.send(Err(e)) {
error!("WS connection failed, and could not send error back to the main program! \n{:?}", e);
return;
}
}
Ok((mut ws, _)) => {
let (to_core_sender, to_core_reciever) = async_channel::bounded::<ApplicationMessage>(MAX_MESSAGE);
let (to_app_events, from_app_events) = async_channel::bounded::<ApplicationMessage>(MAX_MESSAGE);
let res: AppSender = to_core_sender; // to core: endpoint -> remote
let rec: AppReceiver = from_app_events; // from app: remote -> endpoint
if let Err(e) = result_oneshot.send(Ok((res.clone(), rec))) {
if let Err(e2) = ws.close(None).await {
error!("Could not close connection to websocket! {e2}");
}
error!("WS connection succeeded, and could not send error back to the main program! \n{:?}", e);
return;
}
let (mut ws_sender, ws_recv) = ws.split();
let to_app: AppSender = to_app_events;
let receiver_is_closed = Arc::new(AtomicBool::new(false));
let moved_copy = receiver_is_closed.clone();
tokio::spawn(async move {
listen_to_ws(to_app, moved_copy, ws_recv)
});
while let Ok(msg) = to_core_reciever.recv().await {
#[cfg(debug_assertions)]
{
// serialized message
match serde_json::to_string(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage to JSON! {e}"),
Ok(msg) => {
if let Err(e) = ws_sender.send(Message::text(msg)).await {
error!("Could not send text ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
#[cfg(not(debug_assertions))]
{
match bincode::serialize(&msg) {
Err(e) => error!("Could not serialize ApplicationMessage into binary! {e}"),
Ok(e) => {
if let Err(e) = ws_sender.send(Message::binary(msg)).await {
error!("Could not send binary ApplicationMessage to websocket! Closing websocket\n{e}");
break;
}
}
}
}
}
let _ = ws_sender.close().await;
info!("Websocket connect successfully");
}
}
}
#[instrument(skip_all)]
async fn listen_to_ws(
to_app: AppSender,
is_closed: Arc<AtomicBool>,
mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>
){
while let Some(msg) = reader.next().await {
match msg {
Err(e) => {
error!("There was an error getting a message from the remote! {e}");
}
Ok(msg) => match msg {
Message::Ping(_) | Message::Pong(_) => {}
Message::Close(_) => {
info!("Received WebSocket close message! Closing the websocket");
break;
}
Message::Frame(_) => {
info!("Received a Frame websocket message?");
}
Message::Text(text) => {
debug("Recieved text from websocket: {text}");
#[cfg(debug_assertions)]
{
match serde_json::from_str(&text) {
Ok(msg) => {
if let Err(e) = to_app.send(msg).await {
error!("Could not send message from ws to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed JSON message from the websocket!\n{text}\nmsg: {e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Text` message from the remote while running in release mode! " +
"Was the other endpoint running release mode?\n msg: {text}");
}
}
Message::Binary(msg) => {
#[cfg(debug_assertions)]
{
match bincode::deserialize::<ApplicationMessage>(&msg) {
Ok(m) => {
if let Err(e) = to_app.send(m).await {
error!("Could not send message to application! Closing and exiting\n{e}");
break;
}
}
Err(e) => {
error!("Received a malformed binary message from the websocket!\n{e}");
}
}
}
#[cfg(not(debug_assertions))]
{
warn!("Recieved a `Binary` message from the remote while running in debug mode! " +
"Was the other endpoing running debug mode?");
}
}
}
}
}
is_closed.store(true, std::sync::atomic::Ordering::SeqCst);
}
pub fn add(left: usize, right: usize) -> usize { pub fn add(left: usize, right: usize) -> usize {
left + right left + right
} }