diff --git a/Cargo.toml b/Cargo.toml index dc5c672..a9195ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,6 @@ gtk = { version = "0.8.1", package = "gtk4", features = ["v4_12"] } log = "0.4.21" serde = { version = "1.0.197", features = ["derive"] } simplelog = "0.12.2" -tokio = { version = "1.37.0", features = ["rt-multi-thread"] } +tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] } tokio-tungstenite = "0.21.0" toml = "0.8.12" diff --git a/src/config.rs b/src/config.rs index 04910e0..a06a1a7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ use config::{Config, FileFormat}; use std::fs::File; use std::io::Write; +use log::{error, info}; use crate::ui_code::AppState; @@ -21,16 +22,26 @@ pub fn load_config() -> AppState { } pub fn save_config(config: &AppState) { - println!("{}", { - if let Ok(toml_str) = toml::to_string(&config) { - if let Ok(mut file) = File::create("./settings.toml") { - file.write_all(toml_str.as_bytes()).unwrap(); - "" - } else { - "File could not be opened" + match toml::to_string(&config) { + Ok(toml_str) => { + match File::create("./settings.toml") { + Ok(mut file) => { + match file.write_all(toml_str.as_bytes()) { + Ok(_) => { + info!("Config file saved succesfully"); + } + Err(e) => { + error!("Couldn't write config file contents to open file: {e}"); + } + } + } + Err(e) => { + error!("Couldn't open settings file: {e}"); + } } - } else { - "Settings could not be deserialized" } - }); + Err(e) => { + error!("Could not serialize app state: {e}"); + } + } } diff --git a/src/coordinator.rs b/src/coordinator.rs index 0fee40a..cc5d8e0 100644 --- a/src/coordinator.rs +++ b/src/coordinator.rs @@ -1,5 +1,5 @@ use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; -use std::pin::pin; +use std::pin::{pin, Pin}; use async_channel::{Receiver, Sender}; use futures_util::stream::StreamExt; @@ -7,8 +7,9 @@ use log::{error, info}; use tokio::runtime::Handle; use tokio_tungstenite::tungstenite::Message; -use crate::socket_loop; +use crate::{joystick_loop::joystick_loop, socket_loop, ui_code::GuiUpdate}; +#[derive(Clone)] pub struct MoveEvent { pub x: i32, pub y: i32, @@ -16,73 +17,151 @@ pub struct MoveEvent { pub enum ApplicationEvent { StartSocket(String), - SocketState(bool), SocketMessage(Message), MoveEvent(MoveEvent), } -async fn start_socket(connection_string: String, to_mec: Sender, sck_snd_alive: Arc, sck_rcv_alive: Arc, rt: Handle) -> Sender { - info!("Starting socket"); - let (to_socket, socket_sender_rx) = async_channel::bounded::(10); - let rt2 = rt.clone(); +struct CoordState<'a> { + pub to_socket: Option>, - socket_loop::socket_loop( - connection_string, - to_mec.clone(), - socket_sender_rx, - sck_snd_alive, - sck_rcv_alive, - rt2 - ).await; + pub sck_send_alive: Arc, + pub sck_recv_alive: Arc, + pub joystick_loop_alive: Arc, - to_socket + pub mec: Pin<&'a mut Receiver>, + pub to_mec: Sender, + pub to_gui: Sender, + pub rt: Handle, +} + +impl <'a> CoordState<'a> { + pub fn new(mec: Pin<&'a mut Receiver>, to_mec: Sender, to_gui: Sender, rt: Handle) -> Self { + CoordState { + to_socket: None, + + sck_send_alive: Arc::new(AtomicBool::new(false)), + sck_recv_alive: Arc::new(AtomicBool::new(false)), + joystick_loop_alive: Arc::new(AtomicBool::new(false)), + + mec, to_mec, to_gui, rt + } + } + + pub async fn socket_send(&mut self, message: Message) { + if self.sck_send_alive.load(Ordering::SeqCst) { + if let Some(tx) = self.to_socket.take() { + if let Err(e) = tx.send(message).await { + error!("There was an error sending to the socket send channel: {e}"); + self.socket_close().await; + } + self.to_socket = Some(tx); + } + } + } + + pub async fn socket_start(&mut self, conn: String) { + if !(self.sck_recv_alive.load(Ordering::SeqCst) && !self.sck_send_alive.load(Ordering::SeqCst)) { + + info!("Starting socket"); + let (to_socket, socket_sender_rx) = async_channel::bounded::(10); + + self.to_socket = Some(to_socket); + + socket_loop::socket_loop( + conn, + self.to_mec.clone(), + socket_sender_rx, + self.sck_send_alive.clone(), + self.sck_recv_alive.clone(), + self.rt.clone() + ).await; + } + } + + pub async fn check_states(&mut self) { + if !self.joystick_loop_alive.load(Ordering::SeqCst) { + self.rt.spawn(joystick_loop(self.to_mec.clone(), self.joystick_loop_alive.clone())); + } + + if !self.sck_recv_alive.load(Ordering::SeqCst) || !self.sck_send_alive.load(Ordering::SeqCst) { + self.socket_close().await; + + if let Err(e) = self.to_gui.send(GuiUpdate::SocketState(false)).await { + error!("Cannot send message to gui thread: {e}"); + } + } + } + + pub async fn close(&mut self) { + info!("closing coord state"); + self.socket_close().await; + + self.joystick_loop_alive.store(false, Ordering::SeqCst); + self.to_gui.close(); + self.mec.close(); + } + + pub async fn socket_close(&mut self) { + self.sck_send_alive.store(false, Ordering::SeqCst); + self.sck_recv_alive.store(false, Ordering::SeqCst); + if let Some(tx) = self.to_socket.take() { + tx.close(); + } + } } // Main_Event_Channel -pub async fn start_coordinator(mec: Receiver, to_mec: Sender, to_gui: Sender, runtime: Handle) { - info!("Starting coordinator!"); +pub async fn start_coordinator(mec: Receiver, to_mec: Sender, to_gui: Sender, runtime: Handle) { + info!("Starting coordinator!"); - let mut to_socket: Option> = None; - let sck_snd_alive = Arc::new(AtomicBool::new(false)); - let sck_rcv_alive = Arc::new(AtomicBool::new(false)); + let mec = pin!(mec); - let mut mec = pin!(mec); + let mut state = CoordState::new( + mec, + to_mec, + to_gui, + runtime + ); + while let Some(msg) = state.mec.next().await { + + state.check_states().await; - while let Some(msg) = mec.next().await { match msg { ApplicationEvent::StartSocket(conn) => { - if !(sck_rcv_alive.load(Ordering::SeqCst) && sck_snd_alive.load(Ordering::SeqCst)) { - to_socket = Some(start_socket(conn, to_mec.clone(), sck_snd_alive.clone(), sck_rcv_alive.clone(), runtime.clone()).await); - } - } - ApplicationEvent::SocketState(v) => { - if !v { - sck_snd_alive.store(false, Ordering::SeqCst); - sck_rcv_alive.store(false, Ordering::SeqCst); - } + state.socket_start(conn).await; } ApplicationEvent::SocketMessage(socket_message) => { - if let Err(e) = to_gui.send(ApplicationEvent::SocketState(true)).await { + if let Err(e) = state.to_gui.send(GuiUpdate::SocketState(true)).await { error!("Could not send to gui thread! Closing coordinator: {e}"); - sck_snd_alive.store(false, Ordering::SeqCst); - sck_rcv_alive.store(false, Ordering::SeqCst); + state.close().await; break; } - if let Some(tx) = to_socket.take() { - if let Err(e) = tx.send(socket_message).await { - error!("Failed to send to socket: {e}"); - } - to_socket = Some(tx); - } + state.socket_send(socket_message).await; } ApplicationEvent::MoveEvent(coord) => { - if let Err(e) = to_gui.send(ApplicationEvent::MoveEvent(coord)).await { + if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await { panic!("Could not set message to gui channel; Unrecoverable: {e}"); } - } + if let Some(tx) = state.to_socket.take() { + let message = format!( + "{}{}:{}{}", + if coord.y > 0 { "D" } else { "U" }, + coord.y.abs(), + if coord.x > 0 { "R" } else { "L" }, + coord.x.abs() + ); + + if let Err(e) = tx.send(Message::Text(message)).await { + error!("Couldn't send message to socket, closing: {e}"); + tx.close(); + } else { + state.to_socket = Some(tx); + } + } + } } } diff --git a/src/joystick_loop.rs b/src/joystick_loop.rs index bd8509e..842ffb8 100644 --- a/src/joystick_loop.rs +++ b/src/joystick_loop.rs @@ -1,85 +1,12 @@ -use crate::config::save_config; -use crate::ui_code::{AppState, SocketConnectionUpdate}; -use crate::JoystickThreadUpdate; +use crate::coordinator::{ApplicationEvent, MoveEvent}; -use async_channel::{Receiver, Sender}; +use async_channel::Sender; use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder}; -use log::{error, info}; +use log::{warn, info}; use std::{ - net::TcpStream, - panic::{self, AssertUnwindSafe}, - sync::{atomic::AtomicBool, Arc}, - time::{Duration, Instant}, + panic::{self, AssertUnwindSafe}, sync::{atomic::AtomicBool, Arc}, time::Duration }; -static MAX_RETRY_ATTEMPTS: u32 = 10; - -struct SocketState { - pub ip: String, - pub port: i32, - pub socket: Option>, -} - -impl SocketState { - fn is_connected(&self) -> bool { - self.socket.is_some() - } - - fn close_websocket(&mut self) { - if let Some(ref mut x) = self.socket { - info!("closing websocket"); - let _ = x.send_message(&Message::close()); - let _ = x.shutdown(); - self.socket = None; - } - } - - fn reconnect_websocket(&mut self) -> bool { - if self.ip.is_empty() { - self.socket = None; - return false; - } - - if let Ok(mut val) = ClientBuilder::new(format!("ws://{}:{}", &self.ip, self.port).as_str()) - { - if let Ok(val2) = val.connect_insecure() { - if let Err(e) = val2 - .stream_ref() - .set_read_timeout(Some(Duration::from_millis(10))) - { - error!("Error setting read timeout: {}", e); - } - if let Err(e) = val2 - .stream_ref() - .set_write_timeout(Some(Duration::from_millis(10))) - { - error!("Error setting write timeout: {}", e); - } - self.socket = Some(val2); - true - } else { - error!("couldn't connect websocket! : Step 1"); - self.socket = None; - false - } - } else { - error!("couldn't connect websocket! : Step 2"); - self.socket = None; - false - } - } -} - -struct JTState { - pub socket: SocketState, - pub try_reconnect: bool, - pub retry_attempts: u32, - - pub curr_x: i32, - pub curr_y: i32, - - pub last_update_time: Instant, -} struct UnknownSlayer; impl FilterFn for UnknownSlayer { @@ -105,91 +32,20 @@ impl FilterFn for UnknownSlayer { } } -pub fn joystick_websocket_loop( - tx: Sender, - do_run: Arc, - rx: Receiver, +pub async fn joystick_loop( + tx: Sender, + is_alive: Arc ) { let mut gilrs = GilrsBuilder::new().set_update_state(false).build().unwrap(); - let mut state = JTState { - socket: SocketState { - ip: String::new(), - port: 0, - socket: None, - }, - try_reconnect: false, - retry_attempts: 0, + is_alive.store(true, std::sync::atomic::Ordering::SeqCst); - curr_x: 0, - curr_y: 0, - - last_update_time: Instant::now(), - }; + let mut curr_x: i32 = 0; + let mut curr_y: i32 = 0; + let mut past_x: i32 = 0; + let mut past_y: i32 = 0; loop { - match rx.try_recv() { - Ok(msg) => { - state.socket.ip = msg.ip; - state.socket.port = msg.port as i32; - - save_config(&AppState { - ip: state.socket.ip.clone(), - port: state.socket.port as u32, - }); - - info!( - "Connecting to: ws://{}:{}", - state.socket.ip, state.socket.port - ); - - if msg.start_websocket { - if !state.socket.is_connected() { - state.socket.reconnect_websocket(); - } - } else if state.socket.is_connected() { - state.socket.close_websocket(); - } - } - Err(async_channel::TryRecvError::Closed) => break, - Err(async_channel::TryRecvError::Empty) => {} - } - - if state.socket.is_connected() { - if let Some(ref mut websocket_tx) = state.socket.socket { - match websocket_tx.recv_message() { - Ok(data) => match data { - OwnedMessage::Ping(d) => { - info!("Recieved a Ping message"); - - if let Err(e) = websocket_tx.send_message(&Message::pong(d)) { - error!("Couldn't response the ping packet: {}", e); - } - } - _ => { - info!("successfully read a packet: {:#?}", data); - } - }, - Err(_) => {} - } - } - } - - if state.try_reconnect { - if state.retry_attempts > MAX_RETRY_ATTEMPTS { - state.try_reconnect = false; - } - - if state.socket.is_connected() { - state.try_reconnect = false; - } else if state.socket.reconnect_websocket() { - state.try_reconnect = false; - state.retry_attempts = 0; - } else { - state.retry_attempts += 1; - } - } - // catch unwind because some buttons on the joystick will panic the gilrs object match panic::catch_unwind(AssertUnwindSafe(|| { // get the next event, and if it is an axis we are interested in, update the @@ -197,15 +53,15 @@ pub fn joystick_websocket_loop( while let Some(evt) = gilrs.next_event().filter_ev(&UnknownSlayer {}, &mut gilrs) { match evt.event { gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickY, val, _) => { - state.curr_y = (val * 100.0) as i32; - if state.curr_y > -10 && state.curr_y < 10 { - state.curr_y = 0; + curr_y = (val * 100.0) as i32; + if curr_y > -10 && curr_y < 10 { + curr_y = 0; } } gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickX, val, _) => { - state.curr_x = (val * 100.0) as i32; - if state.curr_x > -10 && state.curr_x < 10 { - state.curr_x = 0; + curr_x = (val * 100.0) as i32; + if curr_x > -10 && curr_x < 10 { + curr_x = 0; } } _ => {} @@ -218,57 +74,24 @@ pub fn joystick_websocket_loop( } } - if state.socket.is_connected() - && Instant::now().duration_since(state.last_update_time) >= Duration::from_millis(150) - { - let mut message: String; - if state.curr_y > 0 { - message = format!("D{}:", state.curr_y); - } else { - message = format!("U{}:", state.curr_y.abs()); - } + if curr_x != past_x || curr_y != past_y { + past_x = curr_x; + past_y = curr_y; - if state.curr_x > 0 { - message.push_str(&format!("R{}", state.curr_x)); - } else { - message.push_str(&format!("L{}", state.curr_x.abs())); - } - - if let Some(ref mut websocket_tx) = state.socket.socket { - match websocket_tx.send_message(&Message::text(message)) { - Ok(_) => {} - Err(e) => { - match e { - websocket::WebSocketError::IoError(e) => { - error!("There was an IO error in websocket send: {}", e); - } - _ => { - error!("There was an error in websocket send: {}", e.to_string()); - } - } - state.socket.close_websocket(); - state.try_reconnect = true; - } + match tx.try_send(ApplicationEvent::MoveEvent(MoveEvent { + x: curr_x, + y: curr_y, + })) { + Ok(_) => {} + Err(async_channel::TrySendError::Closed(_)) => { + info!("MEC is closed, stopping Joystick loop"); + break } + Err(async_channel::TrySendError::Full(_)) => {warn!("[joystick loop] The MEC is full!")} } - state.last_update_time = Instant::now(); - continue; } - match tx.try_send(JoystickThreadUpdate { - connected: state.socket.is_connected(), - x_axis: Some(state.curr_x.to_string()), - y_axis: Some(state.curr_y.to_string()), - }) { - Ok(_) => {} - Err(async_channel::TrySendError::Closed(_)) => break, - Err(async_channel::TrySendError::Full(_)) => {} - } - - if !do_run.load(std::sync::atomic::Ordering::SeqCst) { - info!("Exiting joystick thread"); - break; - } - std::thread::sleep(Duration::from_millis(25)); + tokio::time::sleep(Duration::from_millis(50)).await; } + is_alive.store(false, std::sync::atomic::Ordering::SeqCst); } diff --git a/src/main.rs b/src/main.rs index dfdd72b..164db4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,16 +8,11 @@ use tokio::runtime; mod config; mod coordinator; -// mod joystick_loop; +mod joystick_loop; mod socket_loop; mod ui_code; const APP_ID: &str = "net.nickiel.joystick-controller-client"; -pub struct JoystickThreadUpdate { - pub connected: bool, - pub x_axis: Option, - pub y_axis: Option, -} fn main() -> glib::ExitCode { env::set_var("gtk_csd", "0"); diff --git a/src/ui_code.rs b/src/ui_code.rs index 9c418fa..8a54313 100644 --- a/src/ui_code.rs +++ b/src/ui_code.rs @@ -1,14 +1,13 @@ use gtk::{glib, prelude::*, Box, Entry, Label, ListBox}; use gtk::{Application, ApplicationWindow, Button}; -use log::{error, info}; +use log::error; use serde::{Deserialize, Serialize}; use tokio::runtime::Handle; use tokio_tungstenite::tungstenite::Message; -use std::sync::{atomic::AtomicBool, Arc}; -use crate::config::load_config; +use crate::config::{load_config, save_config}; // use crate::{joystick_loop, JoystickThreadUpdate}; -use crate::coordinator::{ApplicationEvent, start_coordinator}; +use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent}; #[derive(Debug, Serialize, Deserialize, Clone)] @@ -26,19 +25,18 @@ impl Default for AppState { } } +pub enum GuiUpdate { + SocketState(bool), + MoveEvent(MoveEvent), +} + pub fn build_ui(app: &Application, runtime: Handle) { let initial_settings = load_config(); let main_box = ListBox::new(); - let do_run: Arc = Arc::new(AtomicBool::new(true)); - // let do_run2 = do_run.clone(); - // let (tx, rx) = async_channel::bounded::(4); - // let (tx2, rx2) = async_channel::bounded::(1); - // let _ = std::thread::spawn(move || joystick_loop::joystick_websocket_loop(tx, do_run2, rx2)); - // Main Event Channel let (to_mec, mec) = async_channel::unbounded::(); - let (to_gui, gui_recv) = async_channel::bounded::(10); + let (to_gui, gui_recv) = async_channel::bounded::(10); runtime.spawn(start_coordinator(mec, to_mec.clone(), to_gui, runtime.clone())); @@ -88,14 +86,16 @@ pub fn build_ui(app: &Application, runtime: Handle) { // Connect to "clicked" signal of `button` button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong to_mec => move |_button| { - // Set the label to "Hello World!" after the button has been clicked on - let ip_text = ip_entry.text(); let port_text = port_entry.text(); - // &format!("ws://{}:{}", "localhost", "5000"), if ip_text.len() > 0 { if let Ok(val) = port_text.parse::() { + save_config(&AppState { + ip: ip_text.to_string(), + port: val + }); + match to_mec.try_send(ApplicationEvent::StartSocket( format!("ws://{}:{}", ip_text, val), )) { @@ -117,12 +117,12 @@ pub fn build_ui(app: &Application, runtime: Handle) { glib::clone!(@weak axis_label, @weak button, @weak conn_status_label, @weak ip_entry, @weak port_entry, @strong gui_recv => async move { while let Ok(d) = gui_recv.recv().await { match d { - ApplicationEvent::MoveEvent(msg) => { + GuiUpdate::MoveEvent(msg) => { axis_label.set_text( format!("X: {:>4} Y: {:>4}", msg.x, msg.y).as_str() ); } - ApplicationEvent::SocketState(v) => { + GuiUpdate::SocketState(v) => { let label = { if v { ip_entry.set_sensitive(false); @@ -145,9 +145,6 @@ pub fn build_ui(app: &Application, runtime: Handle) { button.set_css_classes(&["NoConnection"]); } } - _ => { - info!("Note, the gui_recv received an unhandled update"); - } } } }), @@ -161,7 +158,6 @@ pub fn build_ui(app: &Application, runtime: Handle) { .build(); window.connect_close_request(move |_| { - do_run.store(false, std::sync::atomic::Ordering::SeqCst); glib::Propagation::Proceed });