diff --git a/src/camera_socket.rs b/src/camera_socket.rs index d60caf2..5b89663 100644 --- a/src/camera_socket.rs +++ b/src/camera_socket.rs @@ -23,7 +23,7 @@ async fn socket_listen( while let Some(msg) = reader.next().await { match msg { Ok(val) => { - if let Err(e) = mec.send(ApplicationEvent::SocketMessage(val, 0)).await { + if let Err(e) = mec.send(ApplicationEvent::SocketMessage(val)).await { error!("There was an error sending to Main Event Channel, closing socket recv thread: {e}"); break; } diff --git a/src/coordinator.rs b/src/coordinator.rs index 6d482d5..d7d56fd 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -10,7 +10,8 @@ use log::{error, info}; use tokio::runtime::Handle; use tokio_tungstenite::tungstenite::Message; -use crate::{joystick_loop::joystick_loop, camera_socket, ui_code::GuiUpdate}; +use crate::socket_server; +use crate::{camera_socket, joystick_loop::joystick_loop, ui_code::GuiUpdate}; #[derive(Clone)] pub struct MoveEvent { @@ -20,15 +21,16 @@ pub struct MoveEvent { pub enum ApplicationEvent { StartSocket(String), - SocketMessage(Message, u32), - MoveEvent(MoveEvent), + SocketMessage(Message), + MoveEvent(MoveEvent, u32), } struct CoordState<'a> { pub to_socket: Option>, - pub sck_send_alive: Arc, - pub sck_recv_alive: Arc, + pub sck_alive_sender: Arc, + pub sck_alive_server: Arc, + pub sck_alive_recvr: Arc, pub joystick_loop_alive: Arc, pub mec: Pin<&'a mut Receiver>, @@ -47,8 +49,9 @@ impl<'a> CoordState<'a> { CoordState { to_socket: None, - sck_send_alive: Arc::new(AtomicBool::new(false)), - sck_recv_alive: Arc::new(AtomicBool::new(false)), + sck_alive_sender: Arc::new(AtomicBool::new(false)), + sck_alive_recvr: Arc::new(AtomicBool::new(false)), + sck_alive_server: Arc::new(AtomicBool::new(false)), joystick_loop_alive: Arc::new(AtomicBool::new(false)), mec, @@ -59,7 +62,7 @@ impl<'a> CoordState<'a> { } pub async fn socket_send(&mut self, message: Message) { - if self.sck_send_alive.load(Ordering::SeqCst) { + if self.sck_alive_sender.load(Ordering::SeqCst) { if let Some(tx) = self.to_socket.take() { if let Err(e) = tx.send(message).await { error!("There was an error sending to the socket send channel: {e}"); @@ -71,8 +74,8 @@ impl<'a> CoordState<'a> { } pub async fn socket_start(&mut self, conn: String) { - if !(self.sck_recv_alive.load(Ordering::SeqCst) - && self.sck_send_alive.load(Ordering::SeqCst)) + if !(self.sck_alive_recvr.load(Ordering::SeqCst) + && self.sck_alive_sender.load(Ordering::SeqCst)) { info!("Starting socket"); let (to_socket, socket_sender_rx) = async_channel::bounded::(10); @@ -83,8 +86,8 @@ impl<'a> CoordState<'a> { conn, self.to_mec.clone(), socket_sender_rx, - self.sck_send_alive.clone(), - self.sck_recv_alive.clone(), + self.sck_alive_sender.clone(), + self.sck_alive_recvr.clone(), self.rt.clone(), ) .await; @@ -93,14 +96,22 @@ impl<'a> CoordState<'a> { pub async fn check_states(&mut self) { if !self.joystick_loop_alive.load(Ordering::SeqCst) { + info!("Restarting joystick loop"); self.rt.spawn(joystick_loop( self.to_mec.clone(), self.joystick_loop_alive.clone(), )); } - if !self.sck_recv_alive.load(Ordering::SeqCst) - || !self.sck_send_alive.load(Ordering::SeqCst) + if !self.sck_alive_server.load(Ordering::SeqCst) { + info!("Restarting socket server"); + self.sck_alive_server.store(true, Ordering::SeqCst); + self.rt.spawn(socket_server::start_socketserver(self.rt.clone(), self.to_mec.clone(), self.sck_alive_server.clone())); + } + + + if !self.sck_alive_recvr.load(Ordering::SeqCst) + || !self.sck_alive_sender.load(Ordering::SeqCst) { self.socket_close().await; @@ -118,13 +129,14 @@ impl<'a> CoordState<'a> { self.socket_close().await; self.joystick_loop_alive.store(false, Ordering::SeqCst); + self.sck_alive_server.store(false, Ordering::SeqCst); self.to_gui.close(); self.mec.close(); } pub async fn socket_close(&mut self) { - self.sck_send_alive.store(false, Ordering::SeqCst); - self.sck_recv_alive.store(false, Ordering::SeqCst); + self.sck_alive_sender.store(false, Ordering::SeqCst); + self.sck_alive_recvr.store(false, Ordering::SeqCst); if let Some(tx) = self.to_socket.take() { tx.close(); } @@ -151,7 +163,7 @@ pub async fn start_coordinator( ApplicationEvent::StartSocket(conn) => { state.socket_start(conn).await; } - ApplicationEvent::SocketMessage(socket_message, priority) => { + ApplicationEvent::SocketMessage(socket_message) => { if let Err(e) = state.to_gui.send(GuiUpdate::SocketState(true)).await { error!("Could not send to gui thread! Closing coordinator: {e}"); state.close().await; @@ -160,7 +172,7 @@ pub async fn start_coordinator( state.socket_send(socket_message).await; } - ApplicationEvent::MoveEvent(coord) => { + ApplicationEvent::MoveEvent(coord, priority) => { if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await { panic!("Could not set message to gui channel; Unrecoverable: {e}"); } diff --git a/src/joystick_loop.rs b/src/joystick_loop.rs index c8f3249..5871140 100644 --- a/src/joystick_loop.rs +++ b/src/joystick_loop.rs @@ -85,10 +85,13 @@ pub async fn joystick_loop(tx: Sender, is_alive: Arc {} Err(async_channel::TrySendError::Closed(_)) => { info!("MEC is closed, stopping Joystick loop"); diff --git a/src/main.rs b/src/main.rs index c34439c..00fc806 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,10 +6,11 @@ use simplelog::SimpleLogger; use std::env; use tokio::runtime; +mod camera_socket; mod config; mod coordinator; mod joystick_loop; -mod camera_socket; +mod socket_server; mod ui_code; const APP_ID: &str = "net.nickiel.joystick-controller-client"; diff --git a/src/socket_server.rs b/src/socket_server.rs new file mode 100644 index 0000000..9367db8 --- /dev/null +++ b/src/socket_server.rs @@ -0,0 +1,83 @@ +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use async_channel::Sender; +use futures_util::StreamExt; +use log::{error, info}; +use tokio::{ + net::{TcpListener, TcpStream}, + runtime::Handle, +}; +use tokio_tungstenite::{ + accept_async, + tungstenite::{Error, Result}, +}; + +use crate::coordinator::{ApplicationEvent, MoveEvent}; + +async fn accept_connection(peer: SocketAddr, stream: TcpStream, mec: Sender) { + if let Err(e) = handle_connection(peer, stream, mec.clone()).await { + match e { + Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (), + err => error!("Error processing connection: {}", err), + } + } +} + +async fn handle_connection( + peer: SocketAddr, + stream: TcpStream, + mec: Sender, +) -> Result<()> { + let mut ws_stream = accept_async(stream).await.expect("Failed to accept"); + + info!("New WebSocket connection: {}", peer); + + while let Some(msg) = ws_stream.next().await { + let msg = msg?; + if msg.is_text() { + if let Err(e) = mec + .send(ApplicationEvent::MoveEvent( + process_incoming_string(msg.to_string()), + 5, + )) + .await + { + error!("MEC Unavailable, closing the connection on the socket-server: {e}"); + break; + } + } + } + + Ok(()) +} + +fn process_incoming_string(message: String) -> MoveEvent { + return MoveEvent { x: 10, y: 30 }; +} + +pub async fn start_socketserver( + rt: Handle, + mec: Sender, + stay_alive: Arc, +) { + let addr = "127.0.0.1:9002"; + let listener = TcpListener::bind(&addr).await.expect("Can't listen"); + info!("Listening on: {}", addr); + + while let Ok((stream, _)) = listener.accept().await { + let peer = stream + .peer_addr() + .expect("connected streams should have a peer address"); + info!("Peer address: {}", peer); + + rt.spawn(accept_connection(peer, stream, mec.clone())); + } + + stay_alive.store(false, Ordering::SeqCst); +} diff --git a/src/ui_code.rs b/src/ui_code.rs index 4b5c276..dcdc74b 100644 --- a/src/ui_code.rs +++ b/src/ui_code.rs @@ -114,7 +114,7 @@ pub fn build_ui(app: &Application, runtime: Handle) { })); button2.connect_clicked(glib::clone!(@strong to_mec => move |_button| { - if let Err(e) = to_mec.try_send(ApplicationEvent::SocketMessage(Message::text("U45:L10"), 1)) { + if let Err(e) = to_mec.try_send(ApplicationEvent::SocketMessage(Message::text("U45:L10"))) { panic!("There was an error in connect clicked: {e}"); } }));