re-organized the code a bit

This commit is contained in:
Nickiel12 2024-04-15 20:18:01 -07:00
parent 2278b4cd79
commit 1bebdd1fd8
8 changed files with 398 additions and 269 deletions

View file

@ -1,99 +0,0 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use async_channel::{Receiver, Sender};
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{error, info};
use tokio::{net::TcpStream, runtime::Handle};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::coordinator::ApplicationEvent;
async fn socket_listen(
mec: Sender<ApplicationEvent>,
socket_recv_is_alive: Arc<AtomicBool>,
mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) {
if socket_recv_is_alive.load(std::sync::atomic::Ordering::SeqCst) {
while let Some(msg) = reader.next().await {
match msg {
Ok(val) => {
if let Err(e) = mec.send(ApplicationEvent::SocketMessage(val)).await {
error!("There was an error sending to Main Event Channel, closing socket recv thread: {e}");
break;
}
}
Err(e) => {
error!("Websocket error: {:#?}", e);
}
}
}
socket_recv_is_alive.store(false, Ordering::SeqCst);
}
info!("Closed socket reading thread");
}
async fn socket_write(
to_send: Receiver<Message>,
socket_send_is_alive: Arc<AtomicBool>,
mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
while socket_send_is_alive.load(std::sync::atomic::Ordering::SeqCst) {
match to_send.recv().await {
Ok(msg) => {
if let Err(e) = sender.send(msg).await {
error!("There was an error sending to the socket: {:#?}", e);
break; // Exit loop on error
}
}
Err(e) => {
// Channel closed, exit loop
error!("To Socket channel is closed! Exiting: {e}");
break;
}
}
}
socket_send_is_alive.store(false, Ordering::SeqCst);
if let Err(e) = sender.close().await {
error!("Error closing websocket sender: {e}");
}
info!("Closed socket writing thread");
}
pub async fn socket_loop(
connection_string: String,
mec: Sender<ApplicationEvent>,
send_to_socket: Receiver<Message>,
socket_send_is_alive: Arc<AtomicBool>,
socket_recv_is_alive: Arc<AtomicBool>,
rt: Handle,
) {
info!("Starting Socket Loop");
socket_send_is_alive.store(true, Ordering::SeqCst);
socket_recv_is_alive.store(true, Ordering::SeqCst);
let socket: Option<WebSocketStream<MaybeTlsStream<TcpStream>>> =
match connect_async(connection_string).await {
Ok((val, _)) => {
info!("Socket connection made successfully");
Some(val)
}
Err(_) => {
error!("Couldn't connect to URL");
None
}
};
if let Some(sckt) = socket {
let (outbound, inbound) = sckt.split();
rt.spawn(socket_listen(mec, socket_recv_is_alive, inbound));
rt.spawn(socket_write(send_to_socket, socket_send_is_alive, outbound));
}
}

View file

@ -5,13 +5,17 @@ use std::sync::{
};
use async_channel::{Receiver, Sender};
use futures_util::stream::StreamExt;
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use log::{error, info};
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use crate::socket_server;
use crate::{camera_socket, joystick_loop::joystick_loop, ui_code::GuiUpdate};
use crate::remote_sources;
use crate::{joystick_source::joystick_loop, ui_code::GuiUpdate};
#[derive(Clone)]
pub struct MoveEvent {
@ -19,16 +23,20 @@ pub struct MoveEvent {
pub y: i32,
}
pub enum ConnectionType {
Local,
Remote,
Automated,
}
pub enum ApplicationEvent {
StartSocket(String),
SocketMessage(Message),
MoveEvent(MoveEvent, u32),
MoveEvent(MoveEvent, ConnectionType),
}
struct CoordState<'a> {
pub to_socket: Option<Sender<Message>>,
pub sck_alive_sender: Arc<AtomicBool>,
pub sck_outbound: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
pub sck_alive_server: Arc<AtomicBool>,
pub sck_alive_recvr: Arc<AtomicBool>,
pub joystick_loop_alive: Arc<AtomicBool>,
@ -47,9 +55,7 @@ impl<'a> CoordState<'a> {
rt: Handle,
) -> Self {
CoordState {
to_socket: None,
sck_alive_sender: Arc::new(AtomicBool::new(false)),
sck_outbound: None,
sck_alive_recvr: Arc::new(AtomicBool::new(false)),
sck_alive_server: Arc::new(AtomicBool::new(false)),
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
@ -62,38 +68,51 @@ impl<'a> CoordState<'a> {
}
pub async fn socket_send(&mut self, message: Message) {
if self.sck_alive_sender.load(Ordering::SeqCst) {
if let Some(tx) = self.to_socket.take() {
if let Err(e) = tx.send(message).await {
error!("There was an error sending to the socket send channel: {e}");
self.socket_close().await;
}
self.to_socket = Some(tx);
if let Some(mut socket) = self.sck_outbound.take() {
if let Err(e) = socket.send(message).await {
error!("There was an error sending to the socket: {:#?}", e);
} else {
self.sck_outbound = Some(socket);
}
}
}
pub fn socket_connected(&self) -> bool {
self.sck_outbound.is_some()
}
pub async fn socket_start(&mut self, conn: String) {
if !(self.sck_alive_recvr.load(Ordering::SeqCst)
&& self.sck_alive_sender.load(Ordering::SeqCst))
{
if !(self.sck_alive_recvr.load(Ordering::SeqCst)) {
info!("Starting socket");
let (to_socket, socket_sender_rx) = async_channel::bounded::<Message>(10);
self.to_socket = Some(to_socket);
match connect_async(conn).await {
Ok((val, _)) => {
info!("Socket connection to camera made successfully");
camera_socket::socket_loop(
conn,
self.to_mec.clone(),
socket_sender_rx,
self.sck_alive_sender.clone(),
self.sck_alive_recvr.clone(),
self.rt.clone(),
)
.await;
let (outbound, inbound) = val.split();
self.rt.spawn(socket_listen(
self.to_mec.clone(),
self.sck_alive_recvr.clone(),
inbound,
));
self.sck_outbound = Some(outbound);
}
Err(_) => {
error!("Couldn't connect to URL!");
}
}
}
}
pub async fn socket_close(&mut self) {
if let Some(mut socket) = self.sck_outbound.take() {
if let Err(e) = socket.close().await {
error!("Couldnt' close socket during shutdown: {e}");
}
}
self.sck_alive_recvr.store(false, Ordering::SeqCst);
}
pub async fn check_states(&mut self) {
if !self.joystick_loop_alive.load(Ordering::SeqCst) {
info!("Restarting joystick loop");
@ -103,16 +122,17 @@ impl<'a> CoordState<'a> {
));
}
if !self.sck_alive_server.load(Ordering::SeqCst) {
if !self.sck_alive_server.load(Ordering::SeqCst) || self.sck_outbound.is_none() {
info!("Restarting socket server");
self.sck_alive_server.store(true, Ordering::SeqCst);
self.rt.spawn(socket_server::start_socketserver(self.rt.clone(), self.to_mec.clone(), self.sck_alive_server.clone()));
self.rt.spawn(remote_sources::start_socketserver(
self.rt.clone(),
self.to_mec.clone(),
self.sck_alive_server.clone(),
));
}
if !self.sck_alive_recvr.load(Ordering::SeqCst)
|| !self.sck_alive_sender.load(Ordering::SeqCst)
{
if !self.sck_alive_recvr.load(Ordering::SeqCst) || self.sck_outbound.is_none() {
self.socket_close().await;
if let Err(e) = self.to_gui.send(GuiUpdate::SocketState(false)).await {
@ -133,18 +153,10 @@ impl<'a> CoordState<'a> {
self.to_gui.close();
self.mec.close();
}
pub async fn socket_close(&mut self) {
self.sck_alive_sender.store(false, Ordering::SeqCst);
self.sck_alive_recvr.store(false, Ordering::SeqCst);
if let Some(tx) = self.to_socket.take() {
tx.close();
}
}
}
// Main_Event_Channel
pub async fn start_coordinator(
// Main_Event_Channel
mec: Receiver<ApplicationEvent>,
to_mec: Sender<ApplicationEvent>,
to_gui: Sender<GuiUpdate>,
@ -156,6 +168,8 @@ pub async fn start_coordinator(
let mut state = CoordState::new(mec, to_mec, to_gui, runtime);
state.check_states().await;
while let Some(msg) = state.mec.next().await {
state.check_states().await;
@ -177,7 +191,7 @@ pub async fn start_coordinator(
panic!("Could not set message to gui channel; Unrecoverable: {e}");
}
if let Some(tx) = state.to_socket.take() {
if state.socket_connected() {
let message = format!(
"{}{}:{}{}",
if coord.y > 0 { "D" } else { "U" },
@ -186,12 +200,7 @@ pub async fn start_coordinator(
coord.x.abs()
);
if let Err(e) = tx.send(Message::Text(message)).await {
error!("Couldn't send message to socket, closing: {e}");
tx.close();
} else {
state.to_socket = Some(tx);
}
state.socket_send(Message::Text(message)).await;
}
}
}
@ -199,3 +208,28 @@ pub async fn start_coordinator(
info!("Stopping Coordinator");
}
async fn socket_listen(
mec: Sender<ApplicationEvent>,
socket_recv_is_alive: Arc<AtomicBool>,
mut reader: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
) {
if socket_recv_is_alive.load(std::sync::atomic::Ordering::SeqCst) {
while let Some(msg) = reader.next().await {
match msg {
Ok(val) => {
if let Err(e) = mec.send(ApplicationEvent::SocketMessage(val)).await {
error!("There was an error sending to Main Event Channel, closing socket recv thread: {e}");
break;
}
}
Err(e) => {
error!("Websocket error: {:#?}", e);
}
}
}
socket_recv_is_alive.store(false, Ordering::SeqCst);
}
info!("Closed socket reading thread");
}

View file

@ -90,7 +90,7 @@ pub async fn joystick_loop(tx: Sender<ApplicationEvent>, is_alive: Arc<AtomicBoo
x: curr_x,
y: curr_y,
},
0,
crate::coordinator::ConnectionType::Local,
)) {
Ok(_) => {}
Err(async_channel::TrySendError::Closed(_)) => {

View file

@ -6,11 +6,10 @@ use simplelog::SimpleLogger;
use std::env;
use tokio::runtime;
mod camera_socket;
mod config;
mod coordinator;
mod joystick_loop;
mod socket_server;
mod joystick_source;
mod remote_sources;
mod ui_code;
const APP_ID: &str = "net.nickiel.joystick-controller-client";

View file

@ -0,0 +1,139 @@
use std::{
cmp::{max, min},
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use async_channel::Sender;
use futures_util::StreamExt;
use log::{error, info};
use tokio::net::TcpStream;
use tokio_tungstenite::{tungstenite::Result, WebSocketStream};
use super::TrackerState;
use crate::coordinator::{ApplicationEvent, MoveEvent};
const HALF_FRAME_WIDTH: f64 = 320.0;
pub async fn handle_connection(
mut ws_stream: WebSocketStream<TcpStream>,
mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
) -> Result<()> {
if let Ok(mut ts) = tracker_state.lock() {
ts.has_active_connection = true;
}
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
if msg.is_text() {
let (x_off, y_off) =
match process_incoming_string(msg.to_string(), tracker_state.clone()) {
Ok(val) => val,
Err(e) => {
error!("{e}");
(0, 0)
}
};
if let Err(e) = mec
.send(ApplicationEvent::MoveEvent(
MoveEvent { x: x_off, y: y_off },
crate::coordinator::ConnectionType::Automated,
))
.await
{
error!("MEC Unavailable, closing the connection on the socket-server: {e}");
break;
}
}
}
if let Ok(mut ts) = tracker_state.lock() {
ts.has_active_connection = false;
}
Ok(())
}
struct BoxInfo {
id: u32,
x1: u32,
y1: u32,
x2: u32,
}
fn process_incoming_string(
message: String,
tracker_state: Arc<Mutex<TrackerState>>,
) -> core::result::Result<(i32, i32), String> {
let mut boxes: Vec<BoxInfo> = Vec::new();
for line in message.lines() {
let parts: Vec<&str> = line.split(' ').collect();
let id = parts[0]
.replace(['[', ']'], "")
.parse()
.map_err(|_| "Invalid ID")?;
if parts.len() != 3 {
return Err("Invalid socket input format: number of parts".to_string());
}
let coords: Vec<&str> = parts[1].split(':').collect();
if coords.len() != 2 {
return Err("Invalid socket input format: coords 1".to_string());
}
let x1: u32 = coords[0].parse().map_err(|_| "Invalid x coordinate")?;
let y1: u32 = coords[1].parse().map_err(|_| "Invalid y coordinate")?;
let coords2: Vec<&str> = parts[2].split(':').collect();
if coords2.len() != 2 {
return Err("Invalid socket input format: coords 2".to_string());
}
let x2: u32 = coords2[0].parse().map_err(|_| "Invalid width")?;
boxes.push(BoxInfo { id, x1, y1, x2 });
}
if let Ok(mut ts) = tracker_state.lock() {
if ts.last_detect + Duration::from_secs(2) < Instant::now() && !boxes.is_empty() {
info!("Setting new target: {}", boxes[0].id);
ts.tracking_id = boxes[0].id;
}
info!("boxes len: {}", boxes.len());
if let Some(target_box) = boxes.into_iter().find(|e| e.id == ts.tracking_id) {
let x_adjust = calc_x_adjust(target_box.x1, target_box.x2);
let y_adjust = calc_y_adjust(target_box.y1);
ts.last_detect = Instant::now();
Ok((x_adjust, y_adjust))
} else {
info!("Tracking ID: {}", ts.tracking_id);
Err("Couldn't find target in results".to_string())
}
} else {
Err("Couldn't lock tracker state".to_string())
}
}
fn calc_x_adjust(x1: u32, x2: u32) -> i32 {
let dist_from_center = ((x1 + x2) as f64 / 2.0) - HALF_FRAME_WIDTH;
let mut x_adjust = (dist_from_center / HALF_FRAME_WIDTH * 200.0) as i32;
if x_adjust < 15 && x_adjust > -15 {
x_adjust = 0;
}
min(max(x_adjust, -100), 100)
}
fn calc_y_adjust(y1: u32) -> i32 {
let mut y_adjust = y1 as i32 - 100;
if y_adjust < 0 {
y_adjust -= 20;
} else if y_adjust < 30 {
y_adjust = 0;
} else {
y_adjust = (y_adjust as f32 * 0.75) as i32;
}
min(max(y_adjust, -100), 100)
}

140
src/remote_sources/mod.rs Normal file
View file

@ -0,0 +1,140 @@
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
time::Instant,
};
use async_channel::Sender;
use futures_core::FusedStream;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use tokio::{
net::{TcpListener, TcpStream},
runtime::Handle,
};
use tokio_tungstenite::{
accept_async,
tungstenite::{Error, Message, Result},
};
mod automated_source;
mod remote_source;
use crate::coordinator::{ApplicationEvent, ConnectionType};
pub struct TrackerState {
pub has_active_connection: bool,
pub tracking_id: u32,
pub last_detect: Instant,
}
pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
stay_alive: Arc<AtomicBool>,
) {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
let tracker_state = Arc::new(Mutex::new(TrackerState {
tracking_id: 0,
last_detect: Instant::now(),
has_active_connection: false,
}));
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);
rt.spawn(accept_connection(
peer,
stream,
mec.clone(),
tracker_state.clone(),
));
}
stay_alive.store(false, Ordering::SeqCst);
}
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
) {
if let Err(e) = handle_connection(peer, stream, mec.clone(), tracker_state).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => error!("Error processing connection: {}", err),
}
}
}
async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
if let Err(e) = ws_stream.send(Message::text("Type?")).await {
error!("Error requesting connection type from {}: {}", peer, e);
if let Err(e1) = ws_stream.close(None).await {
error!("Could not close websocket after not receiving type: {e1}");
}
return Ok(());
}
let mut connection_type: Option<ConnectionType> = None;
while let Some(msg) = ws_stream.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
error!("Error receiving message from {}: {}", peer, e);
if let Err(e1) = ws_stream.close(None).await {
error!("Could not close websocket after receiving error: {e1}");
}
return Ok(());
}
};
if msg.is_text() && msg.to_string().starts_with("Type: ") {
match msg.to_string().split(' ').collect::<Vec<&str>>()[0] {
"Automated" => {
connection_type = Some(ConnectionType::Automated);
}
"Remote" => {
connection_type = Some(ConnectionType::Remote);
}
_ => {}
}
}
if connection_type.is_some() {
break;
}
}
if !ws_stream.is_terminated() {
match connection_type.unwrap() {
ConnectionType::Automated => {
automated_source::handle_connection(ws_stream, mec, tracker_state).await?;
}
ConnectionType::Remote => {
remote_source::handle_connection().await?;
}
_ => todo!(),
}
}
Ok(())
}

View file

@ -0,0 +1,27 @@
/*
use std::{
cmp::{min, max}, sync::{
Arc, Mutex,
}, time::{Duration, Instant}
};
use async_channel::Sender;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use tokio::net::TcpStream;
use tokio_tungstenite::{
tungstenite::Result, WebSocketStream,
};
use crate::coordinator::ApplicationEvent;
use super::TrackerState;
*/
use tokio_tungstenite::tungstenite::Result;
pub async fn handle_connection(// mut ws_stream: WebSocketStream<TcpStream>,
// mec: Sender<ApplicationEvent>,
// tracker_state: Arc<Mutex<TrackerState>>
) -> Result<()> {
todo!("Remote connections not implemented yet");
}

View file

@ -1,111 +0,0 @@
use std::{
net::SocketAddr, sync::{
atomic::{AtomicBool, Ordering},
Arc,
}
};
use async_channel::Sender;
use futures_util::StreamExt;
use log::{error, info};
use tokio::{
net::{TcpListener, TcpStream},
runtime::Handle,
};
use tokio_tungstenite::{
accept_async,
tungstenite::{Error, Result},
};
use crate::coordinator::{ApplicationEvent, MoveEvent};
const FRAME_WIDTH: u32 = 640;
const FRAME_HEIGHT: u32 = 480;
async fn accept_connection(peer: SocketAddr, stream: TcpStream, mec: Sender<ApplicationEvent>) {
if let Err(e) = handle_connection(peer, stream, mec.clone()).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => error!("Error processing connection: {}", err),
}
}
}
async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
if msg.is_text() {
let (x_off, y_off) = match process_incoming_string(msg.to_string()) {
Ok(val) => val,
Err(e) => {
error!("{e}");
(0, 0)
}
};
if let Err(e) = mec
.send(ApplicationEvent::MoveEvent(
MoveEvent {x: x_off, y: y_off},
5,
))
.await
{
error!("MEC Unavailable, closing the connection on the socket-server: {e}");
break;
}
}
}
Ok(())
}
fn process_incoming_string(message: String) -> core::result::Result<(i32, i32), String> {
let coords: Vec<&str> = message.split(',').collect();
if coords.len() != 4 {
return Err("Invalid socket input format".to_string());
}
let x1: u32 = coords[0].parse().map_err(|_| "Invalid x coordinate")?;
let y1: u32 = coords[1].parse().map_err(|_| "Invalid y coordinate")?;
let coords2: Vec<&str> = coords[2].split(':').collect();
if coords2.len() != 2 {
return Err("Invalid socket input format".to_string());
}
let x2: u32 = coords2[0].parse().map_err(|_| "Invalid width")?;
let y2: u32= coords2[1].parse().map_err(|_| "Invalid height")?;
let x_adjust = (FRAME_WIDTH - x2) as i32 - x1 as i32;
let y_adjust = (FRAME_HEIGHT - y2) as i32 - y1 as i32;
return Ok((x_adjust, y_adjust));
}
pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
stay_alive: Arc<AtomicBool>,
) {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", peer);
rt.spawn(accept_connection(peer, stream, mec.clone()));
}
stay_alive.store(false, Ordering::SeqCst);
}