moved socket connection handling to vcs_common

This commit is contained in:
Nickiel12 2024-09-01 02:45:09 +00:00
parent 1507bc6bac
commit 3ab0b3f35e
3 changed files with 206 additions and 126 deletions

132
Cargo.lock generated
View file

@ -160,7 +160,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
"synstructure 0.13.1",
]
@ -183,7 +183,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -206,7 +206,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -228,7 +228,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -239,7 +239,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -455,9 +455,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "bytemuck"
version = "1.17.0"
version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fd4c6dcc3b0aea2f5c0b4b82c2b15fe39ddbc76041a310848f4706edf76bb31"
checksum = "773d90827bc3feecfb67fab12e24de0749aad83c74b9504ecde46237b5cd24e2"
[[package]]
name = "byteorder"
@ -516,9 +516,9 @@ dependencies = [
[[package]]
name = "cc"
version = "1.1.14"
version = "1.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50d2eb3cd3d1bf4529e31c215ee6f93ec5a3d536d9f578f93d9d33ee19562932"
checksum = "57b6a275aa2903740dc87da01c62040406b8812552e97129a63ea8850a17c6e6"
dependencies = [
"shlex",
]
@ -920,7 +920,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13b588ba4ac1a99f7f2964d24b3d896ddc6bf847ee3855dbd4366f058cfcd331"
dependencies = [
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -930,7 +930,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f"
dependencies = [
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -965,7 +965,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -989,7 +989,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -1000,7 +1000,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -1067,7 +1067,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustc_version",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -1117,7 +1117,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -1300,9 +1300,9 @@ dependencies = [
[[package]]
name = "filetime"
version = "0.2.24"
version = "0.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf401df4a4e3872c4fe8151134cf483738e74b67fc934d6532c882b3d24a4550"
checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586"
dependencies = [
"cfg-if",
"libc",
@ -1425,7 +1425,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -1624,9 +1624,9 @@ dependencies = [
[[package]]
name = "gilrs-core"
version = "0.5.13"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb5e8d912059b33b463831c16b838d15c4772d584ce332e4a80f6dffdae2bc1"
checksum = "732dadc05170599ddec9a89653f10d7a2af54da9181b3fa6e2bd49907ec8f7e4"
dependencies = [
"core-foundation",
"inotify",
@ -1832,7 +1832,7 @@ dependencies = [
"futures-sink",
"futures-util",
"http 0.2.12",
"indexmap 2.4.0",
"indexmap 2.5.0",
"slab",
"tokio",
"tokio-util",
@ -2118,9 +2118,9 @@ dependencies = [
[[package]]
name = "indexmap"
version = "2.4.0"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93ead53efc7ea8ed3cfb0c79fc8023fbb782a5432b52830b6518941cebe6505c"
checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
dependencies = [
"equivalent",
"hashbrown 0.14.5",
@ -2713,18 +2713,18 @@ dependencies = [
[[package]]
name = "object"
version = "0.36.3"
version = "0.36.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27b64972346851a39438c60b341ebc01bba47464ae329e55cf343eb93964efd9"
checksum = "084f1a5821ac4c651660a94a7153d27ac9d8a53736203f58b31945ded098070a"
dependencies = [
"memchr",
]
[[package]]
name = "oid-registry"
version = "0.7.0"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c958dd45046245b9c3c2547369bb634eb461670b2e7e0de552905801a648d1d"
checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9"
dependencies = [
"asn1-rs 0.6.2",
]
@ -2897,7 +2897,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3015,7 +3015,7 @@ dependencies = [
"phf_shared 0.11.2",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3062,7 +3062,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3100,7 +3100,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42cf17e9a1800f5f396bc67d193dc9411b59012a5876445ef450d449881e1016"
dependencies = [
"base64 0.22.1",
"indexmap 2.4.0",
"indexmap 2.5.0",
"quick-xml",
"serde",
"time",
@ -3236,7 +3236,7 @@ dependencies = [
"itertools",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3511,9 +3511,9 @@ checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f"
[[package]]
name = "rustc_version"
version = "0.4.0"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92"
dependencies = [
"semver",
]
@ -3529,9 +3529,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.34"
version = "0.38.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f"
checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f"
dependencies = [
"bitflags 2.6.0",
"errno",
@ -3562,9 +3562,9 @@ checksum = "fc0a2ce646f8655401bb81e7927b812614bd5d91dbc968696be50603510fcaf0"
[[package]]
name = "rustls-webpki"
version = "0.102.6"
version = "0.102.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e"
checksum = "84678086bd54edf2b415183ed7a94d0efb049f1b646a33e22a36f3794be6ae56"
dependencies = [
"ring",
"rustls-pki-types",
@ -3676,7 +3676,7 @@ checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3685,7 +3685,7 @@ version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8043c06d9f82bd7271361ed64f415fe5e12a77fdb52e573e7f06a516dea329ad"
dependencies = [
"indexmap 2.4.0",
"indexmap 2.5.0",
"itoa 1.0.11",
"memchr",
"ryu",
@ -3700,7 +3700,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3722,7 +3722,7 @@ dependencies = [
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.4.0",
"indexmap 2.5.0",
"serde",
"serde_derive",
"serde_json",
@ -3739,7 +3739,7 @@ dependencies = [
"darling",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3761,7 +3761,7 @@ checksum = "772ee033c0916d670af7860b6e1ef7d658a4629a6d0b4c8c3e67f09b3765b75d"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -3875,7 +3875,7 @@ dependencies = [
"heck 0.5.0",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4026,9 +4026,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.76"
version = "2.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "578e081a14e0cefc3279b0472138c513f37b41a08d5a3cca9b6e4e8ceb6cd525"
checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed"
dependencies = [
"proc-macro2",
"quote",
@ -4061,7 +4061,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4145,7 +4145,7 @@ checksum = "f4e16beb8b2ac17db28eab8bca40e62dbfbb34c0fcdc6d9826b11b7b5d047dfd"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4398,7 +4398,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4468,9 +4468,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.39.3"
version = "1.40.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babc99b9923bfa4804bd74722ff02c0381021eafa4db9949217e3be8e84fff5"
checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998"
dependencies = [
"backtrace",
"bytes",
@ -4502,7 +4502,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4601,7 +4601,7 @@ version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421"
dependencies = [
"indexmap 2.4.0",
"indexmap 2.5.0",
"serde",
"serde_spanned",
"toml_datetime",
@ -4614,7 +4614,7 @@ version = "0.22.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583c44c02ad26b0c3f3066fe629275e50627026c51ac2e595cca4c230ce1ce1d"
dependencies = [
"indexmap 2.4.0",
"indexmap 2.5.0",
"serde",
"serde_spanned",
"toml_datetime",
@ -4711,7 +4711,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -4914,7 +4914,7 @@ checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcs-common"
version = "0.1.0"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#59b0b88c53032514bf5ba68efe17b6f089734bfe"
source = "git+https://git.nickiel.net/VCC/vcs-common.git?branch=main#a70172133856dc5e776bcfc0ab16405e534e198a"
dependencies = [
"async-channel",
"bincode",
@ -5062,7 +5062,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
"wasm-bindgen-shared",
]
@ -5084,7 +5084,7 @@ checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -5513,7 +5513,7 @@ checksum = "2bbd5b46c938e506ecbce286b6628a02171d56153ba733b6c741fc627ec9579b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -5524,7 +5524,7 @@ checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -5775,9 +5775,9 @@ dependencies = [
[[package]]
name = "wry"
version = "0.24.10"
version = "0.24.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00711278ed357350d44c749c286786ecac644e044e4da410d466212152383b45"
checksum = "c55c80b12287eb1ff7c365fc2f7a5037cb6181bd44c9fce81c8d1cf7605ffad6"
dependencies = [
"base64 0.13.1",
"block",
@ -5909,7 +5909,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]
[[package]]
@ -5929,5 +5929,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.76",
"syn 2.0.77",
]

View file

@ -2,15 +2,16 @@ use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use async_channel::{Receiver, Sender, TryRecvError};
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::oneshot;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::error;
use tracing::info;
use crate::config::{AppConfig, ConnectionString};
use crate::config::AppConfig;
use crate::sources::joystick_source::joystick_loop;
mod satellite_connection;
use satellite_connection::SatelliteConnection;
pub enum ApplicationEvent {
WebRTCMessage,
Move(Point),
@ -22,18 +23,6 @@ pub struct Point {
pub y: i32,
}
pub struct SatelliteConnection {
connection: ConnectionString,
socket: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
socket_in_progress: Option<
oneshot::Receiver<
Result<
WebSocketStream<MaybeTlsStream<TcpStream>>,
tokio_tungstenite::tungstenite::Error,
>,
>,
>,
}
pub struct AppState {
to_mec: Sender<ApplicationEvent>,
@ -60,11 +49,7 @@ impl AppState {
.await
.cameras
.iter()
.map(|x| SatelliteConnection {
connection: x.clone(),
socket: None,
socket_in_progress: None,
})
.map(|x| SatelliteConnection::new(x.clone()))
.collect::<Vec<SatelliteConnection>>();
AppState {
@ -92,36 +77,6 @@ impl AppState {
));
}
for connection in self.camera_satellites.iter_mut() {
if connection.socket_in_progress.is_some() {
if connection.socket_in_progress.map(|x| x.try_recv()) {
}
}
if connection.socket.is_none() && connection.socket_in_progress.is_none() {
let (send, recv) = oneshot::channel::<
Result<
WebSocketStream<MaybeTlsStream<TcpStream>>,
tokio_tungstenite::tungstenite::Error,
>,
>();
connection.socket_in_progress = Some(recv);
let conn_string = connection.connection.build_conn_string();
tokio::spawn(async move {
let res = connect_async(conn_string).await;
match res {
Ok((res, _)) => {
let _ = send.send(Ok(res)); // can't even close the socket if send
// returns an error
}
Err(e) => {
let _ = send.send(Err(e));
}
}
});
}
}
}
}
@ -138,23 +93,41 @@ pub async fn run_main_event_loop(
match state.mec.try_recv() {
Err(TryRecvError::Empty) => tokio::time::sleep(Duration::from_millis(50)).await,
Err(TryRecvError::Closed) => {
state.joystick_task_is_alive.store(false, std::sync::atomic::Ordering::SeqCst);
let close_handles: Vec<_> = state
.camera_satellites
.iter_mut()
.filter(|x| x.socket.is_some())
.filter(|x| x.is_connected())
.map(|x| {
let skt = x.socket.take().unwrap();
close_socket(skt)
x.close()
})
.collect();
futures::future::join_all(close_handles).await;
break;
}
Ok(msg) => {}
Ok(msg) => match msg {
ApplicationEvent::Close => {
state.mec.close(); // cleanup is handled on reading from a closed mec
}
ApplicationEvent::WebRTCMessage => {
}
ApplicationEvent::Move(_coord) => {
}
}
}
for connection in state.camera_satellites.iter_mut() {
match connection.try_next().await {
Some(_msg) => {
info!("You have Mail!");
}
None => {}
}
}
}
}
async fn close_socket(mut skt: WebSocketStream<MaybeTlsStream<TcpStream>>) {
let _ = skt.close(None).await;
}

View file

@ -0,0 +1,107 @@
use std::sync::{atomic::AtomicBool, Arc};
use async_channel::TryRecvError;
use tokio::runtime::Handle;
use tracing::{error, instrument, warn};
use crate::config::ConnectionString;
use vcs_common::{AppReceiver, AppSender, ApplicationMessage};
enum SatelliteConnectionError {
SocketIsClosed,
}
pub struct SatelliteConnection {
name: String,
connection: ConnectionString,
to_socket: Option<AppSender>,
from_socket: Option<AppReceiver>,
socket_is_alive: Arc<AtomicBool>,
}
impl SatelliteConnection {
#[instrument]
pub fn new(conn_string: ConnectionString) -> Self {
SatelliteConnection {
name: String::new(),
connection: conn_string.clone(),
to_socket: None,
from_socket: None,
socket_is_alive: Arc::new(AtomicBool::new(false)),
}
}
#[instrument(skip(self))]
pub async fn close(&mut self) {
self.socket_is_alive.store(false, std::sync::atomic::Ordering::SeqCst);
self.to_socket.take(); // closing all senders will dispose of the
self.from_socket.take(); // channel
}
#[instrument(skip(self))]
pub fn is_connected(&self) -> bool {
return self.socket_is_alive.load(std::sync::atomic::Ordering::SeqCst);
}
#[instrument(skip(self, rt))]
pub async fn connect(&mut self, rt: Handle) {
match vcs_common::connect_to_server(self.connection.build_conn_string(), rt).await {
Ok((sender, recvr, is_alive)) => {
if let Err(e) = sender.send(ApplicationMessage::NameRequest(None)).await {
error!("Couldn't send message to fresh socket sender! '{}' \n {}", self.connection.build_conn_string(), e);
}
self.to_socket = Some(sender);
self.from_socket = Some(recvr);
self.socket_is_alive = is_alive;
}
Err(e) => {
error!("Could not connect to socket remote: '{}' \n {}", self.connection.build_conn_string(), e);
}
}
}
#[instrument(skip(self))]
pub async fn send(&mut self, msg: ApplicationMessage) -> Result<(), SatelliteConnectionError> {
if self.to_socket.is_some() {
if let Err(_) = self.to_socket.as_ref().unwrap().send(msg).await {
self.close().await;
return Err(SatelliteConnectionError::SocketIsClosed);
}
return Ok(());
}
Err(SatelliteConnectionError::SocketIsClosed)
}
#[instrument(skip(self))]
pub async fn try_next(&mut self) -> Option<ApplicationMessage> {
if self.from_socket.is_some() {
match self.from_socket.as_ref().unwrap().try_recv() {
Ok(msg) => {
match msg {
ApplicationMessage::NameRequest(Some(name)) => {
self.name = name;
None
}
ApplicationMessage::NameRequest(None) => {
warn!("Got a request for a name, ignoring");
None
},
_ => Some(msg)
}
},
Err(TryRecvError::Empty) => None,
Err(TryRecvError::Closed) => {
self.close().await;
return None;
}
}
} else {
None
}
}
}