Compare commits

...

2 commits

Author SHA1 Message Date
Nickiel12
4530cf45a8 added basis of accpting socket-driven movement commands 2024-04-13 19:38:05 -07:00
Nickiel12
6f4430a725 added priority to socket messages 2024-04-13 19:13:34 -07:00
4 changed files with 120 additions and 21 deletions

View file

@ -10,7 +10,8 @@ use log::{error, info};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use crate::{joystick_loop::joystick_loop, camera_socket, ui_code::GuiUpdate}; use crate::socket_server;
use crate::{camera_socket, joystick_loop::joystick_loop, ui_code::GuiUpdate};
#[derive(Clone)] #[derive(Clone)]
pub struct MoveEvent { pub struct MoveEvent {
@ -21,14 +22,15 @@ pub struct MoveEvent {
pub enum ApplicationEvent { pub enum ApplicationEvent {
StartSocket(String), StartSocket(String),
SocketMessage(Message), SocketMessage(Message),
MoveEvent(MoveEvent), MoveEvent(MoveEvent, u32),
} }
struct CoordState<'a> { struct CoordState<'a> {
pub to_socket: Option<Sender<Message>>, pub to_socket: Option<Sender<Message>>,
pub sck_send_alive: Arc<AtomicBool>, pub sck_alive_sender: Arc<AtomicBool>,
pub sck_recv_alive: Arc<AtomicBool>, pub sck_alive_server: Arc<AtomicBool>,
pub sck_alive_recvr: Arc<AtomicBool>,
pub joystick_loop_alive: Arc<AtomicBool>, pub joystick_loop_alive: Arc<AtomicBool>,
pub mec: Pin<&'a mut Receiver<ApplicationEvent>>, pub mec: Pin<&'a mut Receiver<ApplicationEvent>>,
@ -47,8 +49,9 @@ impl<'a> CoordState<'a> {
CoordState { CoordState {
to_socket: None, to_socket: None,
sck_send_alive: Arc::new(AtomicBool::new(false)), sck_alive_sender: Arc::new(AtomicBool::new(false)),
sck_recv_alive: Arc::new(AtomicBool::new(false)), sck_alive_recvr: Arc::new(AtomicBool::new(false)),
sck_alive_server: Arc::new(AtomicBool::new(false)),
joystick_loop_alive: Arc::new(AtomicBool::new(false)), joystick_loop_alive: Arc::new(AtomicBool::new(false)),
mec, mec,
@ -59,7 +62,7 @@ impl<'a> CoordState<'a> {
} }
pub async fn socket_send(&mut self, message: Message) { pub async fn socket_send(&mut self, message: Message) {
if self.sck_send_alive.load(Ordering::SeqCst) { if self.sck_alive_sender.load(Ordering::SeqCst) {
if let Some(tx) = self.to_socket.take() { if let Some(tx) = self.to_socket.take() {
if let Err(e) = tx.send(message).await { if let Err(e) = tx.send(message).await {
error!("There was an error sending to the socket send channel: {e}"); error!("There was an error sending to the socket send channel: {e}");
@ -71,8 +74,8 @@ impl<'a> CoordState<'a> {
} }
pub async fn socket_start(&mut self, conn: String) { pub async fn socket_start(&mut self, conn: String) {
if !(self.sck_recv_alive.load(Ordering::SeqCst) if !(self.sck_alive_recvr.load(Ordering::SeqCst)
&& self.sck_send_alive.load(Ordering::SeqCst)) && self.sck_alive_sender.load(Ordering::SeqCst))
{ {
info!("Starting socket"); info!("Starting socket");
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10); let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
@ -83,8 +86,8 @@ impl<'a> CoordState<'a> {
conn, conn,
self.to_mec.clone(), self.to_mec.clone(),
socket_sender_rx, socket_sender_rx,
self.sck_send_alive.clone(), self.sck_alive_sender.clone(),
self.sck_recv_alive.clone(), self.sck_alive_recvr.clone(),
self.rt.clone(), self.rt.clone(),
) )
.await; .await;
@ -93,14 +96,22 @@ impl<'a> CoordState<'a> {
pub async fn check_states(&mut self) { pub async fn check_states(&mut self) {
if !self.joystick_loop_alive.load(Ordering::SeqCst) { if !self.joystick_loop_alive.load(Ordering::SeqCst) {
info!("Restarting joystick loop");
self.rt.spawn(joystick_loop( self.rt.spawn(joystick_loop(
self.to_mec.clone(), self.to_mec.clone(),
self.joystick_loop_alive.clone(), self.joystick_loop_alive.clone(),
)); ));
} }
if !self.sck_recv_alive.load(Ordering::SeqCst) if !self.sck_alive_server.load(Ordering::SeqCst) {
|| !self.sck_send_alive.load(Ordering::SeqCst) info!("Restarting socket server");
self.sck_alive_server.store(true, Ordering::SeqCst);
self.rt.spawn(socket_server::start_socketserver(self.rt.clone(), self.to_mec.clone(), self.sck_alive_server.clone()));
}
if !self.sck_alive_recvr.load(Ordering::SeqCst)
|| !self.sck_alive_sender.load(Ordering::SeqCst)
{ {
self.socket_close().await; self.socket_close().await;
@ -118,13 +129,14 @@ impl<'a> CoordState<'a> {
self.socket_close().await; self.socket_close().await;
self.joystick_loop_alive.store(false, Ordering::SeqCst); self.joystick_loop_alive.store(false, Ordering::SeqCst);
self.sck_alive_server.store(false, Ordering::SeqCst);
self.to_gui.close(); self.to_gui.close();
self.mec.close(); self.mec.close();
} }
pub async fn socket_close(&mut self) { pub async fn socket_close(&mut self) {
self.sck_send_alive.store(false, Ordering::SeqCst); self.sck_alive_sender.store(false, Ordering::SeqCst);
self.sck_recv_alive.store(false, Ordering::SeqCst); self.sck_alive_recvr.store(false, Ordering::SeqCst);
if let Some(tx) = self.to_socket.take() { if let Some(tx) = self.to_socket.take() {
tx.close(); tx.close();
} }
@ -160,7 +172,7 @@ pub async fn start_coordinator(
state.socket_send(socket_message).await; state.socket_send(socket_message).await;
} }
ApplicationEvent::MoveEvent(coord) => { ApplicationEvent::MoveEvent(coord, priority) => {
if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await { if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await {
panic!("Could not set message to gui channel; Unrecoverable: {e}"); panic!("Could not set message to gui channel; Unrecoverable: {e}");
} }

View file

@ -85,10 +85,13 @@ pub async fn joystick_loop(tx: Sender<ApplicationEvent>, is_alive: Arc<AtomicBoo
count_zeros = 0; count_zeros = 0;
} }
match tx.try_send(ApplicationEvent::MoveEvent(MoveEvent { match tx.try_send(ApplicationEvent::MoveEvent(
x: curr_x, MoveEvent {
y: curr_y, x: curr_x,
})) { y: curr_y,
},
0,
)) {
Ok(_) => {} Ok(_) => {}
Err(async_channel::TrySendError::Closed(_)) => { Err(async_channel::TrySendError::Closed(_)) => {
info!("MEC is closed, stopping Joystick loop"); info!("MEC is closed, stopping Joystick loop");

View file

@ -6,10 +6,11 @@ use simplelog::SimpleLogger;
use std::env; use std::env;
use tokio::runtime; use tokio::runtime;
mod camera_socket;
mod config; mod config;
mod coordinator; mod coordinator;
mod joystick_loop; mod joystick_loop;
mod camera_socket; mod socket_server;
mod ui_code; mod ui_code;
const APP_ID: &str = "net.nickiel.joystick-controller-client"; const APP_ID: &str = "net.nickiel.joystick-controller-client";

83
src/socket_server.rs Normal file
View file

@ -0,0 +1,83 @@
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use async_channel::Sender;
use futures_util::StreamExt;
use log::{error, info};
use tokio::{
net::{TcpListener, TcpStream},
runtime::Handle,
};
use tokio_tungstenite::{
accept_async,
tungstenite::{Error, Result},
};
use crate::coordinator::{ApplicationEvent, MoveEvent};
async fn accept_connection(peer: SocketAddr, stream: TcpStream, mec: Sender<ApplicationEvent>) {
if let Err(e) = handle_connection(peer, stream, mec.clone()).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => error!("Error processing connection: {}", err),
}
}
}
async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
if msg.is_text() {
if let Err(e) = mec
.send(ApplicationEvent::MoveEvent(
process_incoming_string(msg.to_string()),
5,
))
.await
{
error!("MEC Unavailable, closing the connection on the socket-server: {e}");
break;
}
}
}
Ok(())
}
fn process_incoming_string(message: String) -> MoveEvent {
return MoveEvent { x: 10, y: 30 };
}
pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
stay_alive: Arc<AtomicBool>,
) {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);
rt.spawn(accept_connection(peer, stream, mec.clone()));
}
stay_alive.store(false, Ordering::SeqCst);
}