moved state into it's own struct

This commit is contained in:
Nickiel12 2024-04-06 10:49:19 -07:00
parent 6d0b2d652a
commit 4506d7784b
6 changed files with 193 additions and 289 deletions

View file

@ -15,6 +15,6 @@ gtk = { version = "0.8.1", package = "gtk4", features = ["v4_12"] }
log = "0.4.21" log = "0.4.21"
serde = { version = "1.0.197", features = ["derive"] } serde = { version = "1.0.197", features = ["derive"] }
simplelog = "0.12.2" simplelog = "0.12.2"
tokio = { version = "1.37.0", features = ["rt-multi-thread"] } tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] }
tokio-tungstenite = "0.21.0" tokio-tungstenite = "0.21.0"
toml = "0.8.12" toml = "0.8.12"

View file

@ -1,6 +1,7 @@
use config::{Config, FileFormat}; use config::{Config, FileFormat};
use std::fs::File; use std::fs::File;
use std::io::Write; use std::io::Write;
use log::{error, info};
use crate::ui_code::AppState; use crate::ui_code::AppState;
@ -21,16 +22,26 @@ pub fn load_config() -> AppState {
} }
pub fn save_config(config: &AppState) { pub fn save_config(config: &AppState) {
println!("{}", { match toml::to_string(&config) {
if let Ok(toml_str) = toml::to_string(&config) { Ok(toml_str) => {
if let Ok(mut file) = File::create("./settings.toml") { match File::create("./settings.toml") {
file.write_all(toml_str.as_bytes()).unwrap(); Ok(mut file) => {
"" match file.write_all(toml_str.as_bytes()) {
} else { Ok(_) => {
"File could not be opened" info!("Config file saved succesfully");
}
Err(e) => {
error!("Couldn't write config file contents to open file: {e}");
}
}
}
Err(e) => {
error!("Couldn't open settings file: {e}");
}
} }
} else {
"Settings could not be deserialized"
} }
}); Err(e) => {
error!("Could not serialize app state: {e}");
}
}
} }

View file

@ -1,5 +1,5 @@
use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
use std::pin::pin; use std::pin::{pin, Pin};
use async_channel::{Receiver, Sender}; use async_channel::{Receiver, Sender};
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt;
@ -7,8 +7,9 @@ use log::{error, info};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use crate::socket_loop; use crate::{joystick_loop::joystick_loop, socket_loop, ui_code::GuiUpdate};
#[derive(Clone)]
pub struct MoveEvent { pub struct MoveEvent {
pub x: i32, pub x: i32,
pub y: i32, pub y: i32,
@ -16,73 +17,151 @@ pub struct MoveEvent {
pub enum ApplicationEvent { pub enum ApplicationEvent {
StartSocket(String), StartSocket(String),
SocketState(bool),
SocketMessage(Message), SocketMessage(Message),
MoveEvent(MoveEvent), MoveEvent(MoveEvent),
} }
async fn start_socket(connection_string: String, to_mec: Sender<ApplicationEvent>, sck_snd_alive: Arc<AtomicBool>, sck_rcv_alive: Arc<AtomicBool>, rt: Handle) -> Sender<Message> { struct CoordState<'a> {
info!("Starting socket"); pub to_socket: Option<Sender<Message>>,
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
let rt2 = rt.clone();
socket_loop::socket_loop( pub sck_send_alive: Arc<AtomicBool>,
connection_string, pub sck_recv_alive: Arc<AtomicBool>,
to_mec.clone(), pub joystick_loop_alive: Arc<AtomicBool>,
socket_sender_rx,
sck_snd_alive,
sck_rcv_alive,
rt2
).await;
to_socket pub mec: Pin<&'a mut Receiver<ApplicationEvent>>,
pub to_mec: Sender<ApplicationEvent>,
pub to_gui: Sender<GuiUpdate>,
pub rt: Handle,
}
impl <'a> CoordState<'a> {
pub fn new(mec: Pin<&'a mut Receiver<ApplicationEvent>>, to_mec: Sender<ApplicationEvent>, to_gui: Sender<GuiUpdate>, rt: Handle) -> Self {
CoordState {
to_socket: None,
sck_send_alive: Arc::new(AtomicBool::new(false)),
sck_recv_alive: Arc::new(AtomicBool::new(false)),
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
mec, to_mec, to_gui, rt
}
}
pub async fn socket_send(&mut self, message: Message) {
if self.sck_send_alive.load(Ordering::SeqCst) {
if let Some(tx) = self.to_socket.take() {
if let Err(e) = tx.send(message).await {
error!("There was an error sending to the socket send channel: {e}");
self.socket_close().await;
}
self.to_socket = Some(tx);
}
}
}
pub async fn socket_start(&mut self, conn: String) {
if !(self.sck_recv_alive.load(Ordering::SeqCst) && !self.sck_send_alive.load(Ordering::SeqCst)) {
info!("Starting socket");
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
self.to_socket = Some(to_socket);
socket_loop::socket_loop(
conn,
self.to_mec.clone(),
socket_sender_rx,
self.sck_send_alive.clone(),
self.sck_recv_alive.clone(),
self.rt.clone()
).await;
}
}
pub async fn check_states(&mut self) {
if !self.joystick_loop_alive.load(Ordering::SeqCst) {
self.rt.spawn(joystick_loop(self.to_mec.clone(), self.joystick_loop_alive.clone()));
}
if !self.sck_recv_alive.load(Ordering::SeqCst) || !self.sck_send_alive.load(Ordering::SeqCst) {
self.socket_close().await;
if let Err(e) = self.to_gui.send(GuiUpdate::SocketState(false)).await {
error!("Cannot send message to gui thread: {e}");
}
}
}
pub async fn close(&mut self) {
info!("closing coord state");
self.socket_close().await;
self.joystick_loop_alive.store(false, Ordering::SeqCst);
self.to_gui.close();
self.mec.close();
}
pub async fn socket_close(&mut self) {
self.sck_send_alive.store(false, Ordering::SeqCst);
self.sck_recv_alive.store(false, Ordering::SeqCst);
if let Some(tx) = self.to_socket.take() {
tx.close();
}
}
} }
// Main_Event_Channel // Main_Event_Channel
pub async fn start_coordinator(mec: Receiver<ApplicationEvent>, to_mec: Sender<ApplicationEvent>, to_gui: Sender<ApplicationEvent>, runtime: Handle) { pub async fn start_coordinator(mec: Receiver<ApplicationEvent>, to_mec: Sender<ApplicationEvent>, to_gui: Sender<GuiUpdate>, runtime: Handle) {
info!("Starting coordinator!"); info!("Starting coordinator!");
let mut to_socket: Option<Sender<Message>> = None; let mec = pin!(mec);
let sck_snd_alive = Arc::new(AtomicBool::new(false));
let sck_rcv_alive = Arc::new(AtomicBool::new(false));
let mut mec = pin!(mec); let mut state = CoordState::new(
mec,
to_mec,
to_gui,
runtime
);
while let Some(msg) = state.mec.next().await {
state.check_states().await;
while let Some(msg) = mec.next().await {
match msg { match msg {
ApplicationEvent::StartSocket(conn) => { ApplicationEvent::StartSocket(conn) => {
if !(sck_rcv_alive.load(Ordering::SeqCst) && sck_snd_alive.load(Ordering::SeqCst)) { state.socket_start(conn).await;
to_socket = Some(start_socket(conn, to_mec.clone(), sck_snd_alive.clone(), sck_rcv_alive.clone(), runtime.clone()).await);
}
}
ApplicationEvent::SocketState(v) => {
if !v {
sck_snd_alive.store(false, Ordering::SeqCst);
sck_rcv_alive.store(false, Ordering::SeqCst);
}
} }
ApplicationEvent::SocketMessage(socket_message) => { ApplicationEvent::SocketMessage(socket_message) => {
if let Err(e) = to_gui.send(ApplicationEvent::SocketState(true)).await { if let Err(e) = state.to_gui.send(GuiUpdate::SocketState(true)).await {
error!("Could not send to gui thread! Closing coordinator: {e}"); error!("Could not send to gui thread! Closing coordinator: {e}");
sck_snd_alive.store(false, Ordering::SeqCst); state.close().await;
sck_rcv_alive.store(false, Ordering::SeqCst);
break; break;
} }
if let Some(tx) = to_socket.take() { state.socket_send(socket_message).await;
if let Err(e) = tx.send(socket_message).await {
error!("Failed to send to socket: {e}");
}
to_socket = Some(tx);
}
} }
ApplicationEvent::MoveEvent(coord) => { ApplicationEvent::MoveEvent(coord) => {
if let Err(e) = to_gui.send(ApplicationEvent::MoveEvent(coord)).await { if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await {
panic!("Could not set message to gui channel; Unrecoverable: {e}"); panic!("Could not set message to gui channel; Unrecoverable: {e}");
} }
}
if let Some(tx) = state.to_socket.take() {
let message = format!(
"{}{}:{}{}",
if coord.y > 0 { "D" } else { "U" },
coord.y.abs(),
if coord.x > 0 { "R" } else { "L" },
coord.x.abs()
);
if let Err(e) = tx.send(Message::Text(message)).await {
error!("Couldn't send message to socket, closing: {e}");
tx.close();
} else {
state.to_socket = Some(tx);
}
}
}
} }
} }

View file

@ -1,85 +1,12 @@
use crate::config::save_config; use crate::coordinator::{ApplicationEvent, MoveEvent};
use crate::ui_code::{AppState, SocketConnectionUpdate};
use crate::JoystickThreadUpdate;
use async_channel::{Receiver, Sender}; use async_channel::Sender;
use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder}; use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder};
use log::{error, info}; use log::{warn, info};
use std::{ use std::{
net::TcpStream, panic::{self, AssertUnwindSafe}, sync::{atomic::AtomicBool, Arc}, time::Duration
panic::{self, AssertUnwindSafe},
sync::{atomic::AtomicBool, Arc},
time::{Duration, Instant},
}; };
static MAX_RETRY_ATTEMPTS: u32 = 10;
struct SocketState {
pub ip: String,
pub port: i32,
pub socket: Option<Client<TcpStream>>,
}
impl SocketState {
fn is_connected(&self) -> bool {
self.socket.is_some()
}
fn close_websocket(&mut self) {
if let Some(ref mut x) = self.socket {
info!("closing websocket");
let _ = x.send_message(&Message::close());
let _ = x.shutdown();
self.socket = None;
}
}
fn reconnect_websocket(&mut self) -> bool {
if self.ip.is_empty() {
self.socket = None;
return false;
}
if let Ok(mut val) = ClientBuilder::new(format!("ws://{}:{}", &self.ip, self.port).as_str())
{
if let Ok(val2) = val.connect_insecure() {
if let Err(e) = val2
.stream_ref()
.set_read_timeout(Some(Duration::from_millis(10)))
{
error!("Error setting read timeout: {}", e);
}
if let Err(e) = val2
.stream_ref()
.set_write_timeout(Some(Duration::from_millis(10)))
{
error!("Error setting write timeout: {}", e);
}
self.socket = Some(val2);
true
} else {
error!("couldn't connect websocket! : Step 1");
self.socket = None;
false
}
} else {
error!("couldn't connect websocket! : Step 2");
self.socket = None;
false
}
}
}
struct JTState {
pub socket: SocketState,
pub try_reconnect: bool,
pub retry_attempts: u32,
pub curr_x: i32,
pub curr_y: i32,
pub last_update_time: Instant,
}
struct UnknownSlayer; struct UnknownSlayer;
impl FilterFn for UnknownSlayer { impl FilterFn for UnknownSlayer {
@ -105,91 +32,20 @@ impl FilterFn for UnknownSlayer {
} }
} }
pub fn joystick_websocket_loop( pub async fn joystick_loop(
tx: Sender<JoystickThreadUpdate>, tx: Sender<ApplicationEvent>,
do_run: Arc<AtomicBool>, is_alive: Arc<AtomicBool>
rx: Receiver<SocketConnectionUpdate>,
) { ) {
let mut gilrs = GilrsBuilder::new().set_update_state(false).build().unwrap(); let mut gilrs = GilrsBuilder::new().set_update_state(false).build().unwrap();
let mut state = JTState { is_alive.store(true, std::sync::atomic::Ordering::SeqCst);
socket: SocketState {
ip: String::new(),
port: 0,
socket: None,
},
try_reconnect: false,
retry_attempts: 0,
curr_x: 0, let mut curr_x: i32 = 0;
curr_y: 0, let mut curr_y: i32 = 0;
let mut past_x: i32 = 0;
last_update_time: Instant::now(), let mut past_y: i32 = 0;
};
loop { loop {
match rx.try_recv() {
Ok(msg) => {
state.socket.ip = msg.ip;
state.socket.port = msg.port as i32;
save_config(&AppState {
ip: state.socket.ip.clone(),
port: state.socket.port as u32,
});
info!(
"Connecting to: ws://{}:{}",
state.socket.ip, state.socket.port
);
if msg.start_websocket {
if !state.socket.is_connected() {
state.socket.reconnect_websocket();
}
} else if state.socket.is_connected() {
state.socket.close_websocket();
}
}
Err(async_channel::TryRecvError::Closed) => break,
Err(async_channel::TryRecvError::Empty) => {}
}
if state.socket.is_connected() {
if let Some(ref mut websocket_tx) = state.socket.socket {
match websocket_tx.recv_message() {
Ok(data) => match data {
OwnedMessage::Ping(d) => {
info!("Recieved a Ping message");
if let Err(e) = websocket_tx.send_message(&Message::pong(d)) {
error!("Couldn't response the ping packet: {}", e);
}
}
_ => {
info!("successfully read a packet: {:#?}", data);
}
},
Err(_) => {}
}
}
}
if state.try_reconnect {
if state.retry_attempts > MAX_RETRY_ATTEMPTS {
state.try_reconnect = false;
}
if state.socket.is_connected() {
state.try_reconnect = false;
} else if state.socket.reconnect_websocket() {
state.try_reconnect = false;
state.retry_attempts = 0;
} else {
state.retry_attempts += 1;
}
}
// catch unwind because some buttons on the joystick will panic the gilrs object // catch unwind because some buttons on the joystick will panic the gilrs object
match panic::catch_unwind(AssertUnwindSafe(|| { match panic::catch_unwind(AssertUnwindSafe(|| {
// get the next event, and if it is an axis we are interested in, update the // get the next event, and if it is an axis we are interested in, update the
@ -197,15 +53,15 @@ pub fn joystick_websocket_loop(
while let Some(evt) = gilrs.next_event().filter_ev(&UnknownSlayer {}, &mut gilrs) { while let Some(evt) = gilrs.next_event().filter_ev(&UnknownSlayer {}, &mut gilrs) {
match evt.event { match evt.event {
gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickY, val, _) => { gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickY, val, _) => {
state.curr_y = (val * 100.0) as i32; curr_y = (val * 100.0) as i32;
if state.curr_y > -10 && state.curr_y < 10 { if curr_y > -10 && curr_y < 10 {
state.curr_y = 0; curr_y = 0;
} }
} }
gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickX, val, _) => { gilrs::EventType::AxisChanged(gilrs::Axis::LeftStickX, val, _) => {
state.curr_x = (val * 100.0) as i32; curr_x = (val * 100.0) as i32;
if state.curr_x > -10 && state.curr_x < 10 { if curr_x > -10 && curr_x < 10 {
state.curr_x = 0; curr_x = 0;
} }
} }
_ => {} _ => {}
@ -218,57 +74,24 @@ pub fn joystick_websocket_loop(
} }
} }
if state.socket.is_connected() if curr_x != past_x || curr_y != past_y {
&& Instant::now().duration_since(state.last_update_time) >= Duration::from_millis(150) past_x = curr_x;
{ past_y = curr_y;
let mut message: String;
if state.curr_y > 0 {
message = format!("D{}:", state.curr_y);
} else {
message = format!("U{}:", state.curr_y.abs());
}
if state.curr_x > 0 { match tx.try_send(ApplicationEvent::MoveEvent(MoveEvent {
message.push_str(&format!("R{}", state.curr_x)); x: curr_x,
} else { y: curr_y,
message.push_str(&format!("L{}", state.curr_x.abs())); })) {
} Ok(_) => {}
Err(async_channel::TrySendError::Closed(_)) => {
if let Some(ref mut websocket_tx) = state.socket.socket { info!("MEC is closed, stopping Joystick loop");
match websocket_tx.send_message(&Message::text(message)) { break
Ok(_) => {}
Err(e) => {
match e {
websocket::WebSocketError::IoError(e) => {
error!("There was an IO error in websocket send: {}", e);
}
_ => {
error!("There was an error in websocket send: {}", e.to_string());
}
}
state.socket.close_websocket();
state.try_reconnect = true;
}
} }
Err(async_channel::TrySendError::Full(_)) => {warn!("[joystick loop] The MEC is full!")}
} }
state.last_update_time = Instant::now();
continue;
} }
match tx.try_send(JoystickThreadUpdate { tokio::time::sleep(Duration::from_millis(50)).await;
connected: state.socket.is_connected(),
x_axis: Some(state.curr_x.to_string()),
y_axis: Some(state.curr_y.to_string()),
}) {
Ok(_) => {}
Err(async_channel::TrySendError::Closed(_)) => break,
Err(async_channel::TrySendError::Full(_)) => {}
}
if !do_run.load(std::sync::atomic::Ordering::SeqCst) {
info!("Exiting joystick thread");
break;
}
std::thread::sleep(Duration::from_millis(25));
} }
is_alive.store(false, std::sync::atomic::Ordering::SeqCst);
} }

View file

@ -8,16 +8,11 @@ use tokio::runtime;
mod config; mod config;
mod coordinator; mod coordinator;
// mod joystick_loop; mod joystick_loop;
mod socket_loop; mod socket_loop;
mod ui_code; mod ui_code;
const APP_ID: &str = "net.nickiel.joystick-controller-client"; const APP_ID: &str = "net.nickiel.joystick-controller-client";
pub struct JoystickThreadUpdate {
pub connected: bool,
pub x_axis: Option<String>,
pub y_axis: Option<String>,
}
fn main() -> glib::ExitCode { fn main() -> glib::ExitCode {
env::set_var("gtk_csd", "0"); env::set_var("gtk_csd", "0");

View file

@ -1,14 +1,13 @@
use gtk::{glib, prelude::*, Box, Entry, Label, ListBox}; use gtk::{glib, prelude::*, Box, Entry, Label, ListBox};
use gtk::{Application, ApplicationWindow, Button}; use gtk::{Application, ApplicationWindow, Button};
use log::{error, info}; use log::error;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use std::sync::{atomic::AtomicBool, Arc};
use crate::config::load_config; use crate::config::{load_config, save_config};
// use crate::{joystick_loop, JoystickThreadUpdate}; // use crate::{joystick_loop, JoystickThreadUpdate};
use crate::coordinator::{ApplicationEvent, start_coordinator}; use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -26,19 +25,18 @@ impl Default for AppState {
} }
} }
pub enum GuiUpdate {
SocketState(bool),
MoveEvent(MoveEvent),
}
pub fn build_ui(app: &Application, runtime: Handle) { pub fn build_ui(app: &Application, runtime: Handle) {
let initial_settings = load_config(); let initial_settings = load_config();
let main_box = ListBox::new(); let main_box = ListBox::new();
let do_run: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
// let do_run2 = do_run.clone();
// let (tx, rx) = async_channel::bounded::<JoystickThreadUpdate>(4);
// let (tx2, rx2) = async_channel::bounded::<SocketConnectionUpdate>(1);
// let _ = std::thread::spawn(move || joystick_loop::joystick_websocket_loop(tx, do_run2, rx2));
// Main Event Channel // Main Event Channel
let (to_mec, mec) = async_channel::unbounded::<ApplicationEvent>(); let (to_mec, mec) = async_channel::unbounded::<ApplicationEvent>();
let (to_gui, gui_recv) = async_channel::bounded::<ApplicationEvent>(10); let (to_gui, gui_recv) = async_channel::bounded::<GuiUpdate>(10);
runtime.spawn(start_coordinator(mec, to_mec.clone(), to_gui, runtime.clone())); runtime.spawn(start_coordinator(mec, to_mec.clone(), to_gui, runtime.clone()));
@ -88,14 +86,16 @@ pub fn build_ui(app: &Application, runtime: Handle) {
// Connect to "clicked" signal of `button` // Connect to "clicked" signal of `button`
button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong to_mec => move |_button| { button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong to_mec => move |_button| {
// Set the label to "Hello World!" after the button has been clicked on
let ip_text = ip_entry.text(); let ip_text = ip_entry.text();
let port_text = port_entry.text(); let port_text = port_entry.text();
// &format!("ws://{}:{}", "localhost", "5000"),
if ip_text.len() > 0 { if ip_text.len() > 0 {
if let Ok(val) = port_text.parse::<u32>() { if let Ok(val) = port_text.parse::<u32>() {
save_config(&AppState {
ip: ip_text.to_string(),
port: val
});
match to_mec.try_send(ApplicationEvent::StartSocket( match to_mec.try_send(ApplicationEvent::StartSocket(
format!("ws://{}:{}", ip_text, val), format!("ws://{}:{}", ip_text, val),
)) { )) {
@ -117,12 +117,12 @@ pub fn build_ui(app: &Application, runtime: Handle) {
glib::clone!(@weak axis_label, @weak button, @weak conn_status_label, @weak ip_entry, @weak port_entry, @strong gui_recv => async move { glib::clone!(@weak axis_label, @weak button, @weak conn_status_label, @weak ip_entry, @weak port_entry, @strong gui_recv => async move {
while let Ok(d) = gui_recv.recv().await { while let Ok(d) = gui_recv.recv().await {
match d { match d {
ApplicationEvent::MoveEvent(msg) => { GuiUpdate::MoveEvent(msg) => {
axis_label.set_text( axis_label.set_text(
format!("X: {:>4} Y: {:>4}", msg.x, msg.y).as_str() format!("X: {:>4} Y: {:>4}", msg.x, msg.y).as_str()
); );
} }
ApplicationEvent::SocketState(v) => { GuiUpdate::SocketState(v) => {
let label = { let label = {
if v { if v {
ip_entry.set_sensitive(false); ip_entry.set_sensitive(false);
@ -145,9 +145,6 @@ pub fn build_ui(app: &Application, runtime: Handle) {
button.set_css_classes(&["NoConnection"]); button.set_css_classes(&["NoConnection"]);
} }
} }
_ => {
info!("Note, the gui_recv received an unhandled update");
}
} }
} }
}), }),
@ -161,7 +158,6 @@ pub fn build_ui(app: &Application, runtime: Handle) {
.build(); .build();
window.connect_close_request(move |_| { window.connect_close_request(move |_| {
do_run.store(false, std::sync::atomic::Ordering::SeqCst);
glib::Propagation::Proceed glib::Propagation::Proceed
}); });