got a working streams version
This commit is contained in:
parent
f809c1dd28
commit
6d0b2d652a
9 changed files with 606 additions and 846 deletions
1072
Cargo.lock
generated
1072
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -8,10 +8,13 @@ edition = "2021"
|
|||
[dependencies]
|
||||
async-channel = "2.2.0"
|
||||
config = "0.14.0"
|
||||
futures-core = "0.3.30"
|
||||
futures-util = { version = "0.3.30", features = ["tokio-io"] }
|
||||
gilrs = "0.10.6"
|
||||
gtk = { version = "0.8.1", package = "gtk4", features = ["v4_12"] }
|
||||
log = "0.4.21"
|
||||
serde = { version = "1.0.197", features = ["derive"] }
|
||||
simplelog = "0.12.2"
|
||||
tokio = { version = "1.37.0", features = ["rt-multi-thread"] }
|
||||
tokio-tungstenite = "0.21.0"
|
||||
toml = "0.8.12"
|
||||
websocket = "0.27.0"
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
ip = "localhost"
|
||||
ip = "10.0.0.30"
|
||||
port = 8765
|
||||
|
|
91
src/coordinator.rs
Normal file
91
src/coordinator.rs
Normal file
|
@ -0,0 +1,91 @@
|
|||
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
|
||||
use std::pin::pin;
|
||||
|
||||
use async_channel::{Receiver, Sender};
|
||||
use futures_util::stream::StreamExt;
|
||||
use log::{error, info};
|
||||
use tokio::runtime::Handle;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
use crate::socket_loop;
|
||||
|
||||
pub struct MoveEvent {
|
||||
pub x: i32,
|
||||
pub y: i32,
|
||||
}
|
||||
|
||||
pub enum ApplicationEvent {
|
||||
StartSocket(String),
|
||||
SocketState(bool),
|
||||
SocketMessage(Message),
|
||||
MoveEvent(MoveEvent),
|
||||
}
|
||||
|
||||
async fn start_socket(connection_string: String, to_mec: Sender<ApplicationEvent>, sck_snd_alive: Arc<AtomicBool>, sck_rcv_alive: Arc<AtomicBool>, rt: Handle) -> Sender<Message> {
|
||||
info!("Starting socket");
|
||||
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
|
||||
let rt2 = rt.clone();
|
||||
|
||||
socket_loop::socket_loop(
|
||||
connection_string,
|
||||
to_mec.clone(),
|
||||
socket_sender_rx,
|
||||
sck_snd_alive,
|
||||
sck_rcv_alive,
|
||||
rt2
|
||||
).await;
|
||||
|
||||
to_socket
|
||||
}
|
||||
|
||||
// Main_Event_Channel
|
||||
pub async fn start_coordinator(mec: Receiver<ApplicationEvent>, to_mec: Sender<ApplicationEvent>, to_gui: Sender<ApplicationEvent>, runtime: Handle) {
|
||||
info!("Starting coordinator!");
|
||||
|
||||
let mut to_socket: Option<Sender<Message>> = None;
|
||||
let sck_snd_alive = Arc::new(AtomicBool::new(false));
|
||||
let sck_rcv_alive = Arc::new(AtomicBool::new(false));
|
||||
|
||||
let mut mec = pin!(mec);
|
||||
|
||||
|
||||
while let Some(msg) = mec.next().await {
|
||||
match msg {
|
||||
ApplicationEvent::StartSocket(conn) => {
|
||||
if !(sck_rcv_alive.load(Ordering::SeqCst) && sck_snd_alive.load(Ordering::SeqCst)) {
|
||||
to_socket = Some(start_socket(conn, to_mec.clone(), sck_snd_alive.clone(), sck_rcv_alive.clone(), runtime.clone()).await);
|
||||
}
|
||||
}
|
||||
ApplicationEvent::SocketState(v) => {
|
||||
if !v {
|
||||
sck_snd_alive.store(false, Ordering::SeqCst);
|
||||
sck_rcv_alive.store(false, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
ApplicationEvent::SocketMessage(socket_message) => {
|
||||
if let Err(e) = to_gui.send(ApplicationEvent::SocketState(true)).await {
|
||||
error!("Could not send to gui thread! Closing coordinator: {e}");
|
||||
sck_snd_alive.store(false, Ordering::SeqCst);
|
||||
sck_rcv_alive.store(false, Ordering::SeqCst);
|
||||
break;
|
||||
}
|
||||
|
||||
if let Some(tx) = to_socket.take() {
|
||||
if let Err(e) = tx.send(socket_message).await {
|
||||
error!("Failed to send to socket: {e}");
|
||||
}
|
||||
to_socket = Some(tx);
|
||||
}
|
||||
}
|
||||
ApplicationEvent::MoveEvent(coord) => {
|
||||
if let Err(e) = to_gui.send(ApplicationEvent::MoveEvent(coord)).await {
|
||||
panic!("Could not set message to gui channel; Unrecoverable: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
info!("Stopping Coordinator");
|
||||
|
||||
}
|
|
@ -4,15 +4,13 @@ use crate::JoystickThreadUpdate;
|
|||
|
||||
use async_channel::{Receiver, Sender};
|
||||
use gilrs::{ev::filter::FilterFn, Axis, Button, Event, EventType, Filter, Gilrs, GilrsBuilder};
|
||||
use log::{error, info};
|
||||
use std::{
|
||||
net::TcpStream,
|
||||
panic::{self, AssertUnwindSafe},
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
time::{Duration, Instant},
|
||||
panic::{self, AssertUnwindSafe},
|
||||
net::TcpStream,
|
||||
};
|
||||
use websocket::client::{sync::Client, ClientBuilder};
|
||||
use websocket::Message;
|
||||
use log::{info, error};
|
||||
|
||||
static MAX_RETRY_ATTEMPTS: u32 = 10;
|
||||
|
||||
|
@ -45,6 +43,18 @@ impl SocketState {
|
|||
if let Ok(mut val) = ClientBuilder::new(format!("ws://{}:{}", &self.ip, self.port).as_str())
|
||||
{
|
||||
if let Ok(val2) = val.connect_insecure() {
|
||||
if let Err(e) = val2
|
||||
.stream_ref()
|
||||
.set_read_timeout(Some(Duration::from_millis(10)))
|
||||
{
|
||||
error!("Error setting read timeout: {}", e);
|
||||
}
|
||||
if let Err(e) = val2
|
||||
.stream_ref()
|
||||
.set_write_timeout(Some(Duration::from_millis(10)))
|
||||
{
|
||||
error!("Error setting write timeout: {}", e);
|
||||
}
|
||||
self.socket = Some(val2);
|
||||
true
|
||||
} else {
|
||||
|
@ -128,7 +138,10 @@ pub fn joystick_websocket_loop(
|
|||
port: state.socket.port as u32,
|
||||
});
|
||||
|
||||
info!("Connecting to: ws://{}:{}", state.socket.ip, state.socket.port);
|
||||
info!(
|
||||
"Connecting to: ws://{}:{}",
|
||||
state.socket.ip, state.socket.port
|
||||
);
|
||||
|
||||
if msg.start_websocket {
|
||||
if !state.socket.is_connected() {
|
||||
|
@ -142,6 +155,26 @@ pub fn joystick_websocket_loop(
|
|||
Err(async_channel::TryRecvError::Empty) => {}
|
||||
}
|
||||
|
||||
if state.socket.is_connected() {
|
||||
if let Some(ref mut websocket_tx) = state.socket.socket {
|
||||
match websocket_tx.recv_message() {
|
||||
Ok(data) => match data {
|
||||
OwnedMessage::Ping(d) => {
|
||||
info!("Recieved a Ping message");
|
||||
|
||||
if let Err(e) = websocket_tx.send_message(&Message::pong(d)) {
|
||||
error!("Couldn't response the ping packet: {}", e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!("successfully read a packet: {:#?}", data);
|
||||
}
|
||||
},
|
||||
Err(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if state.try_reconnect {
|
||||
if state.retry_attempts > MAX_RETRY_ATTEMPTS {
|
||||
state.try_reconnect = false;
|
||||
|
@ -202,9 +235,20 @@ pub fn joystick_websocket_loop(
|
|||
}
|
||||
|
||||
if let Some(ref mut websocket_tx) = state.socket.socket {
|
||||
if websocket_tx.send_message(&Message::text(message)).is_ok() {
|
||||
} else {
|
||||
match websocket_tx.send_message(&Message::text(message)) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
match e {
|
||||
websocket::WebSocketError::IoError(e) => {
|
||||
error!("There was an IO error in websocket send: {}", e);
|
||||
}
|
||||
_ => {
|
||||
error!("There was an error in websocket send: {}", e.to_string());
|
||||
}
|
||||
}
|
||||
state.socket.close_websocket();
|
||||
state.try_reconnect = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
state.last_update_time = Instant::now();
|
||||
|
|
16
src/main.rs
16
src/main.rs
|
@ -4,9 +4,12 @@ use gtk::{prelude::*, CssProvider};
|
|||
use log::info;
|
||||
use simplelog::SimpleLogger;
|
||||
use std::env;
|
||||
use tokio::runtime;
|
||||
|
||||
mod config;
|
||||
mod joystick_loop;
|
||||
mod coordinator;
|
||||
// mod joystick_loop;
|
||||
mod socket_loop;
|
||||
mod ui_code;
|
||||
const APP_ID: &str = "net.nickiel.joystick-controller-client";
|
||||
|
||||
|
@ -26,18 +29,27 @@ fn main() -> glib::ExitCode {
|
|||
}
|
||||
}
|
||||
|
||||
let rt = runtime::Runtime::new().expect("Could not start tokio runtime");
|
||||
let handle = rt.handle().clone();
|
||||
|
||||
// Create a new application
|
||||
let app = Application::builder().application_id(APP_ID).build();
|
||||
|
||||
// Connect to "activate" signal of `app`
|
||||
app.connect_startup(|_| load_css());
|
||||
app.connect_activate(ui_code::build_ui);
|
||||
app.connect_activate(move |app| {
|
||||
ui_code::build_ui(app, handle.clone());
|
||||
});
|
||||
|
||||
// Run the application
|
||||
let exit_code = app.run();
|
||||
|
||||
info!("Closing down");
|
||||
|
||||
rt.block_on(async {});
|
||||
|
||||
info!("Tokio runtime has shut down");
|
||||
|
||||
exit_code
|
||||
}
|
||||
|
||||
|
|
96
src/socket_loop.rs
Normal file
96
src/socket_loop.rs
Normal file
|
@ -0,0 +1,96 @@
|
|||
use std::sync::{atomic::{AtomicBool, Ordering}, Arc};
|
||||
|
||||
use async_channel::{Receiver, Sender};
|
||||
use futures_util::{
|
||||
stream::{SplitSink, SplitStream},
|
||||
SinkExt, StreamExt,
|
||||
};
|
||||
use log::{error, info};
|
||||
use tokio::{net::TcpStream, runtime::Handle};
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
|
||||
|
||||
use crate::coordinator::ApplicationEvent;
|
||||
|
||||
async fn socket_listen(
|
||||
mec: Sender<ApplicationEvent>,
|
||||
socket_recv_is_alive: Arc<AtomicBool>,
|
||||
mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
|
||||
) {
|
||||
if socket_recv_is_alive.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
while let Some(msg) = reader.next().await {
|
||||
match msg {
|
||||
Ok(val) => {
|
||||
if let Err(e) = mec.send(ApplicationEvent::SocketMessage(val)).await {
|
||||
error!("There was an error sending to Main Event Channel, closing socket recv thread: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Websocket error: {:#?}", e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
socket_recv_is_alive.store(false, Ordering::SeqCst);
|
||||
}
|
||||
info!("Closed socket reading thread");
|
||||
}
|
||||
|
||||
async fn socket_write(
|
||||
to_send: Receiver<Message>,
|
||||
socket_send_is_alive: Arc<AtomicBool>,
|
||||
mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
|
||||
) {
|
||||
while socket_send_is_alive.load(std::sync::atomic::Ordering::SeqCst) {
|
||||
match to_send.recv().await {
|
||||
Ok(msg) => {
|
||||
if let Err(e) = sender.send(msg).await {
|
||||
error!("There was an error sending to the socket: {:#?}", e);
|
||||
break; // Exit loop on error
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
// Channel closed, exit loop
|
||||
error!("To Socket channel is closed! Exiting: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
socket_send_is_alive.store(false, Ordering::SeqCst);
|
||||
if let Err(e) = sender.close().await {
|
||||
error!("Error closing websocket sender: {}", e);
|
||||
}
|
||||
info!("Closed socket writing thread");
|
||||
}
|
||||
|
||||
pub async fn socket_loop(
|
||||
connection_string: String,
|
||||
mec: Sender<ApplicationEvent>,
|
||||
send_to_socket: Receiver<Message>,
|
||||
socket_send_is_alive: Arc<AtomicBool>,
|
||||
socket_recv_is_alive: Arc<AtomicBool>,
|
||||
rt: Handle
|
||||
) {
|
||||
info!("Starting Socket Loop");
|
||||
|
||||
|
||||
socket_send_is_alive.store(true, Ordering::SeqCst);
|
||||
socket_recv_is_alive.store(true, Ordering::SeqCst);
|
||||
|
||||
let socket: Option<WebSocketStream<MaybeTlsStream<TcpStream>>> = match connect_async(connection_string).await {
|
||||
Ok((val, _)) => {
|
||||
info!("Socket connection made successfully");
|
||||
Some(val)
|
||||
}
|
||||
Err(_) => {
|
||||
error!("Couldn't connect to URL");
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(sckt) = socket {
|
||||
let (outbound, inbound) = sckt.split();
|
||||
rt.spawn(socket_listen(mec, socket_recv_is_alive, inbound));
|
||||
rt.spawn(socket_write(send_to_socket, socket_send_is_alive, outbound));
|
||||
}
|
||||
}
|
|
@ -1,17 +1,15 @@
|
|||
use gtk::{glib, prelude::*, Box, Entry, Label, ListBox};
|
||||
use gtk::{Application, ApplicationWindow, Button};
|
||||
use log::{error, info};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::{Arc, atomic::AtomicBool};
|
||||
use log::error;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
use std::sync::{atomic::AtomicBool, Arc};
|
||||
|
||||
use crate::config::load_config;
|
||||
use crate::{joystick_loop, JoystickThreadUpdate};
|
||||
// use crate::{joystick_loop, JoystickThreadUpdate};
|
||||
use crate::coordinator::{ApplicationEvent, start_coordinator};
|
||||
|
||||
pub struct SocketConnectionUpdate {
|
||||
pub ip: String,
|
||||
pub port: u32,
|
||||
pub start_websocket: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct AppState {
|
||||
|
@ -28,17 +26,22 @@ impl Default for AppState {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn build_ui(app: &Application) {
|
||||
pub fn build_ui(app: &Application, runtime: Handle) {
|
||||
let initial_settings = load_config();
|
||||
let main_box = ListBox::new();
|
||||
|
||||
let do_run: Arc<AtomicBool> = Arc::new(AtomicBool::new(true));
|
||||
let do_run2 = do_run.clone();
|
||||
let (tx, rx) = async_channel::bounded::<JoystickThreadUpdate>(4);
|
||||
// let do_run2 = do_run.clone();
|
||||
// let (tx, rx) = async_channel::bounded::<JoystickThreadUpdate>(4);
|
||||
// let (tx2, rx2) = async_channel::bounded::<SocketConnectionUpdate>(1);
|
||||
// let _ = std::thread::spawn(move || joystick_loop::joystick_websocket_loop(tx, do_run2, rx2));
|
||||
|
||||
let (tx2, rx2) = async_channel::bounded::<SocketConnectionUpdate>(1);
|
||||
// Main Event Channel
|
||||
let (to_mec, mec) = async_channel::unbounded::<ApplicationEvent>();
|
||||
let (to_gui, gui_recv) = async_channel::bounded::<ApplicationEvent>(10);
|
||||
|
||||
runtime.spawn(start_coordinator(mec, to_mec.clone(), to_gui, runtime.clone()));
|
||||
|
||||
let _ = std::thread::spawn(move || joystick_loop::joystick_websocket_loop(tx, do_run2, rx2));
|
||||
|
||||
// let conn_status_label = Label::new(Some(&"No Connection".to_string()));
|
||||
let conn_status_label = Label::builder()
|
||||
|
@ -66,6 +69,7 @@ pub fn build_ui(app: &Application) {
|
|||
.can_focus(true)
|
||||
.build();
|
||||
let button = Button::builder().margin_top(12).build();
|
||||
let button2 = Button::builder().margin_top(12).build();
|
||||
|
||||
content_box.append(&ip_entry);
|
||||
content_box.append(&port_entry);
|
||||
|
@ -80,37 +84,47 @@ pub fn build_ui(app: &Application) {
|
|||
main_box.append(&conn_status_label);
|
||||
main_box.append(&content_box);
|
||||
main_box.append(&axis_label);
|
||||
main_box.append(&button2);
|
||||
|
||||
// Connect to "clicked" signal of `button`
|
||||
button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong tx2 => move |_button| {
|
||||
button.connect_clicked(glib::clone!(@weak ip_entry, @weak port_entry, @strong to_mec => move |_button| {
|
||||
// Set the label to "Hello World!" after the button has been clicked on
|
||||
|
||||
let ip_text = ip_entry.text();
|
||||
let port_text = port_entry.text();
|
||||
// &format!("ws://{}:{}", "localhost", "5000"),
|
||||
|
||||
if ip_text.len() > 0 {
|
||||
if let Ok(val) = port_text.parse::<u32>() {
|
||||
match tx2.try_send(SocketConnectionUpdate {
|
||||
ip: ip_text.to_string(),
|
||||
port: val,
|
||||
start_websocket: ip_entry.get_sensitive(),
|
||||
}) {
|
||||
match to_mec.try_send(ApplicationEvent::StartSocket(
|
||||
format!("ws://{}:{}", ip_text, val),
|
||||
)) {
|
||||
Ok(_) => { }
|
||||
Err(async_channel::TrySendError::Closed(_)) => {panic!("Joystick thread was closed. Unrecoverable")}
|
||||
Err(e) => {error!("There was an error: {e}")}
|
||||
Err(async_channel::TrySendError::Closed(_)) => {panic!("Coordinator MEC is closed. Unrecoverable error.")}
|
||||
Err(e) => {error!("There was an error sending to the MEC: {}", e)}
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
button2.connect_clicked(glib::clone!(@strong to_mec => move |_button| {
|
||||
if let Err(e) = to_mec.try_send(ApplicationEvent::SocketMessage(Message::text("U45:L10"))) {
|
||||
panic!("There was an error in connect clicked: {e}");
|
||||
}
|
||||
}));
|
||||
|
||||
glib::spawn_future_local(
|
||||
glib::clone!(@weak axis_label, @weak button, @weak conn_status_label, @weak ip_entry, @weak port_entry, @strong rx => async move {
|
||||
while let Ok(msg) = rx.recv().await {
|
||||
glib::clone!(@weak axis_label, @weak button, @weak conn_status_label, @weak ip_entry, @weak port_entry, @strong gui_recv => async move {
|
||||
while let Ok(d) = gui_recv.recv().await {
|
||||
match d {
|
||||
ApplicationEvent::MoveEvent(msg) => {
|
||||
axis_label.set_text(
|
||||
format!("X: {:>4} Y: {:>4}", msg.x_axis.unwrap_or("0".to_string()), msg.y_axis.unwrap_or("0".to_string())).as_str()
|
||||
format!("X: {:>4} Y: {:>4}", msg.x, msg.y).as_str()
|
||||
);
|
||||
button.set_label({
|
||||
if msg.connected {
|
||||
}
|
||||
ApplicationEvent::SocketState(v) => {
|
||||
let label = {
|
||||
if v {
|
||||
ip_entry.set_sensitive(false);
|
||||
port_entry.set_sensitive(false);
|
||||
"Currently Connected"
|
||||
|
@ -119,8 +133,11 @@ pub fn build_ui(app: &Application) {
|
|||
port_entry.set_sensitive(true);
|
||||
"Currently Disconnected"
|
||||
}
|
||||
});
|
||||
if msg.connected {
|
||||
};
|
||||
|
||||
button.set_label(label);
|
||||
conn_status_label.set_label(label);
|
||||
if v {
|
||||
conn_status_label.set_css_classes(&["YesConnection"]);
|
||||
button.set_css_classes(&["YesConnection"]);
|
||||
} else {
|
||||
|
@ -128,6 +145,11 @@ pub fn build_ui(app: &Application) {
|
|||
button.set_css_classes(&["NoConnection"]);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!("Note, the gui_recv received an unhandled update");
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
|
|
|
@ -26,5 +26,5 @@ label.JoystickCurrent {
|
|||
|
||||
button {
|
||||
color: black;
|
||||
font-size: 16pt
|
||||
font-size: 16pt;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue