Compare commits
No commits in common. "4530cf45a81cbd605d3fae4a371847239b47cb6e" and "0a99879449becce06b778c8d999bd785fd8ac49f" have entirely different histories.
4530cf45a8
...
0a99879449
4 changed files with 21 additions and 120 deletions
|
@ -10,8 +10,7 @@ use log::{error, info};
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
use tokio_tungstenite::tungstenite::Message;
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
|
|
||||||
use crate::socket_server;
|
use crate::{joystick_loop::joystick_loop, camera_socket, ui_code::GuiUpdate};
|
||||||
use crate::{camera_socket, joystick_loop::joystick_loop, ui_code::GuiUpdate};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct MoveEvent {
|
pub struct MoveEvent {
|
||||||
|
@ -22,15 +21,14 @@ pub struct MoveEvent {
|
||||||
pub enum ApplicationEvent {
|
pub enum ApplicationEvent {
|
||||||
StartSocket(String),
|
StartSocket(String),
|
||||||
SocketMessage(Message),
|
SocketMessage(Message),
|
||||||
MoveEvent(MoveEvent, u32),
|
MoveEvent(MoveEvent),
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CoordState<'a> {
|
struct CoordState<'a> {
|
||||||
pub to_socket: Option<Sender<Message>>,
|
pub to_socket: Option<Sender<Message>>,
|
||||||
|
|
||||||
pub sck_alive_sender: Arc<AtomicBool>,
|
pub sck_send_alive: Arc<AtomicBool>,
|
||||||
pub sck_alive_server: Arc<AtomicBool>,
|
pub sck_recv_alive: Arc<AtomicBool>,
|
||||||
pub sck_alive_recvr: Arc<AtomicBool>,
|
|
||||||
pub joystick_loop_alive: Arc<AtomicBool>,
|
pub joystick_loop_alive: Arc<AtomicBool>,
|
||||||
|
|
||||||
pub mec: Pin<&'a mut Receiver<ApplicationEvent>>,
|
pub mec: Pin<&'a mut Receiver<ApplicationEvent>>,
|
||||||
|
@ -49,9 +47,8 @@ impl<'a> CoordState<'a> {
|
||||||
CoordState {
|
CoordState {
|
||||||
to_socket: None,
|
to_socket: None,
|
||||||
|
|
||||||
sck_alive_sender: Arc::new(AtomicBool::new(false)),
|
sck_send_alive: Arc::new(AtomicBool::new(false)),
|
||||||
sck_alive_recvr: Arc::new(AtomicBool::new(false)),
|
sck_recv_alive: Arc::new(AtomicBool::new(false)),
|
||||||
sck_alive_server: Arc::new(AtomicBool::new(false)),
|
|
||||||
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
|
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
|
||||||
|
|
||||||
mec,
|
mec,
|
||||||
|
@ -62,7 +59,7 @@ impl<'a> CoordState<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn socket_send(&mut self, message: Message) {
|
pub async fn socket_send(&mut self, message: Message) {
|
||||||
if self.sck_alive_sender.load(Ordering::SeqCst) {
|
if self.sck_send_alive.load(Ordering::SeqCst) {
|
||||||
if let Some(tx) = self.to_socket.take() {
|
if let Some(tx) = self.to_socket.take() {
|
||||||
if let Err(e) = tx.send(message).await {
|
if let Err(e) = tx.send(message).await {
|
||||||
error!("There was an error sending to the socket send channel: {e}");
|
error!("There was an error sending to the socket send channel: {e}");
|
||||||
|
@ -74,8 +71,8 @@ impl<'a> CoordState<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn socket_start(&mut self, conn: String) {
|
pub async fn socket_start(&mut self, conn: String) {
|
||||||
if !(self.sck_alive_recvr.load(Ordering::SeqCst)
|
if !(self.sck_recv_alive.load(Ordering::SeqCst)
|
||||||
&& self.sck_alive_sender.load(Ordering::SeqCst))
|
&& self.sck_send_alive.load(Ordering::SeqCst))
|
||||||
{
|
{
|
||||||
info!("Starting socket");
|
info!("Starting socket");
|
||||||
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
|
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
|
||||||
|
@ -86,8 +83,8 @@ impl<'a> CoordState<'a> {
|
||||||
conn,
|
conn,
|
||||||
self.to_mec.clone(),
|
self.to_mec.clone(),
|
||||||
socket_sender_rx,
|
socket_sender_rx,
|
||||||
self.sck_alive_sender.clone(),
|
self.sck_send_alive.clone(),
|
||||||
self.sck_alive_recvr.clone(),
|
self.sck_recv_alive.clone(),
|
||||||
self.rt.clone(),
|
self.rt.clone(),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
@ -96,22 +93,14 @@ impl<'a> CoordState<'a> {
|
||||||
|
|
||||||
pub async fn check_states(&mut self) {
|
pub async fn check_states(&mut self) {
|
||||||
if !self.joystick_loop_alive.load(Ordering::SeqCst) {
|
if !self.joystick_loop_alive.load(Ordering::SeqCst) {
|
||||||
info!("Restarting joystick loop");
|
|
||||||
self.rt.spawn(joystick_loop(
|
self.rt.spawn(joystick_loop(
|
||||||
self.to_mec.clone(),
|
self.to_mec.clone(),
|
||||||
self.joystick_loop_alive.clone(),
|
self.joystick_loop_alive.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if !self.sck_alive_server.load(Ordering::SeqCst) {
|
if !self.sck_recv_alive.load(Ordering::SeqCst)
|
||||||
info!("Restarting socket server");
|
|| !self.sck_send_alive.load(Ordering::SeqCst)
|
||||||
self.sck_alive_server.store(true, Ordering::SeqCst);
|
|
||||||
self.rt.spawn(socket_server::start_socketserver(self.rt.clone(), self.to_mec.clone(), self.sck_alive_server.clone()));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if !self.sck_alive_recvr.load(Ordering::SeqCst)
|
|
||||||
|| !self.sck_alive_sender.load(Ordering::SeqCst)
|
|
||||||
{
|
{
|
||||||
self.socket_close().await;
|
self.socket_close().await;
|
||||||
|
|
||||||
|
@ -129,14 +118,13 @@ impl<'a> CoordState<'a> {
|
||||||
self.socket_close().await;
|
self.socket_close().await;
|
||||||
|
|
||||||
self.joystick_loop_alive.store(false, Ordering::SeqCst);
|
self.joystick_loop_alive.store(false, Ordering::SeqCst);
|
||||||
self.sck_alive_server.store(false, Ordering::SeqCst);
|
|
||||||
self.to_gui.close();
|
self.to_gui.close();
|
||||||
self.mec.close();
|
self.mec.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn socket_close(&mut self) {
|
pub async fn socket_close(&mut self) {
|
||||||
self.sck_alive_sender.store(false, Ordering::SeqCst);
|
self.sck_send_alive.store(false, Ordering::SeqCst);
|
||||||
self.sck_alive_recvr.store(false, Ordering::SeqCst);
|
self.sck_recv_alive.store(false, Ordering::SeqCst);
|
||||||
if let Some(tx) = self.to_socket.take() {
|
if let Some(tx) = self.to_socket.take() {
|
||||||
tx.close();
|
tx.close();
|
||||||
}
|
}
|
||||||
|
@ -172,7 +160,7 @@ pub async fn start_coordinator(
|
||||||
|
|
||||||
state.socket_send(socket_message).await;
|
state.socket_send(socket_message).await;
|
||||||
}
|
}
|
||||||
ApplicationEvent::MoveEvent(coord, priority) => {
|
ApplicationEvent::MoveEvent(coord) => {
|
||||||
if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await {
|
if let Err(e) = state.to_gui.send(GuiUpdate::MoveEvent(coord.clone())).await {
|
||||||
panic!("Could not set message to gui channel; Unrecoverable: {e}");
|
panic!("Could not set message to gui channel; Unrecoverable: {e}");
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,13 +85,10 @@ pub async fn joystick_loop(tx: Sender<ApplicationEvent>, is_alive: Arc<AtomicBoo
|
||||||
count_zeros = 0;
|
count_zeros = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
match tx.try_send(ApplicationEvent::MoveEvent(
|
match tx.try_send(ApplicationEvent::MoveEvent(MoveEvent {
|
||||||
MoveEvent {
|
|
||||||
x: curr_x,
|
x: curr_x,
|
||||||
y: curr_y,
|
y: curr_y,
|
||||||
},
|
})) {
|
||||||
0,
|
|
||||||
)) {
|
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(async_channel::TrySendError::Closed(_)) => {
|
Err(async_channel::TrySendError::Closed(_)) => {
|
||||||
info!("MEC is closed, stopping Joystick loop");
|
info!("MEC is closed, stopping Joystick loop");
|
||||||
|
|
|
@ -6,11 +6,10 @@ use simplelog::SimpleLogger;
|
||||||
use std::env;
|
use std::env;
|
||||||
use tokio::runtime;
|
use tokio::runtime;
|
||||||
|
|
||||||
mod camera_socket;
|
|
||||||
mod config;
|
mod config;
|
||||||
mod coordinator;
|
mod coordinator;
|
||||||
mod joystick_loop;
|
mod joystick_loop;
|
||||||
mod socket_server;
|
mod camera_socket;
|
||||||
mod ui_code;
|
mod ui_code;
|
||||||
const APP_ID: &str = "net.nickiel.joystick-controller-client";
|
const APP_ID: &str = "net.nickiel.joystick-controller-client";
|
||||||
|
|
||||||
|
|
|
@ -1,83 +0,0 @@
|
||||||
use std::{
|
|
||||||
net::SocketAddr,
|
|
||||||
sync::{
|
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_channel::Sender;
|
|
||||||
use futures_util::StreamExt;
|
|
||||||
use log::{error, info};
|
|
||||||
use tokio::{
|
|
||||||
net::{TcpListener, TcpStream},
|
|
||||||
runtime::Handle,
|
|
||||||
};
|
|
||||||
use tokio_tungstenite::{
|
|
||||||
accept_async,
|
|
||||||
tungstenite::{Error, Result},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::coordinator::{ApplicationEvent, MoveEvent};
|
|
||||||
|
|
||||||
async fn accept_connection(peer: SocketAddr, stream: TcpStream, mec: Sender<ApplicationEvent>) {
|
|
||||||
if let Err(e) = handle_connection(peer, stream, mec.clone()).await {
|
|
||||||
match e {
|
|
||||||
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
|
|
||||||
err => error!("Error processing connection: {}", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_connection(
|
|
||||||
peer: SocketAddr,
|
|
||||||
stream: TcpStream,
|
|
||||||
mec: Sender<ApplicationEvent>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
|
|
||||||
|
|
||||||
info!("New WebSocket connection: {}", peer);
|
|
||||||
|
|
||||||
while let Some(msg) = ws_stream.next().await {
|
|
||||||
let msg = msg?;
|
|
||||||
if msg.is_text() {
|
|
||||||
if let Err(e) = mec
|
|
||||||
.send(ApplicationEvent::MoveEvent(
|
|
||||||
process_incoming_string(msg.to_string()),
|
|
||||||
5,
|
|
||||||
))
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("MEC Unavailable, closing the connection on the socket-server: {e}");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn process_incoming_string(message: String) -> MoveEvent {
|
|
||||||
return MoveEvent { x: 10, y: 30 };
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start_socketserver(
|
|
||||||
rt: Handle,
|
|
||||||
mec: Sender<ApplicationEvent>,
|
|
||||||
stay_alive: Arc<AtomicBool>,
|
|
||||||
) {
|
|
||||||
let addr = "127.0.0.1:9002";
|
|
||||||
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
|
|
||||||
info!("Listening on: {}", addr);
|
|
||||||
|
|
||||||
while let Ok((stream, _)) = listener.accept().await {
|
|
||||||
let peer = stream
|
|
||||||
.peer_addr()
|
|
||||||
.expect("connected streams should have a peer address");
|
|
||||||
info!("Peer address: {}", peer);
|
|
||||||
|
|
||||||
rt.spawn(accept_connection(peer, stream, mec.clone()));
|
|
||||||
}
|
|
||||||
|
|
||||||
stay_alive.store(false, Ordering::SeqCst);
|
|
||||||
}
|
|
Loading…
Reference in a new issue