2024-04-15 20:18:01 -07:00
|
|
|
use std::{
|
|
|
|
net::SocketAddr,
|
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, Ordering},
|
|
|
|
Arc, Mutex,
|
|
|
|
},
|
|
|
|
time::Instant,
|
|
|
|
};
|
|
|
|
|
|
|
|
use async_channel::Sender;
|
|
|
|
use futures_core::FusedStream;
|
|
|
|
use futures_util::{SinkExt, StreamExt};
|
2024-04-16 17:26:03 -07:00
|
|
|
use log::{debug, error, info, warn};
|
2024-04-15 20:18:01 -07:00
|
|
|
use tokio::{
|
|
|
|
net::{TcpListener, TcpStream},
|
|
|
|
runtime::Handle,
|
|
|
|
};
|
|
|
|
use tokio_tungstenite::{
|
|
|
|
accept_async,
|
|
|
|
tungstenite::{Error, Message, Result},
|
|
|
|
};
|
|
|
|
|
|
|
|
mod automated_source;
|
|
|
|
mod remote_source;
|
|
|
|
|
|
|
|
use crate::coordinator::{ApplicationEvent, ConnectionType};
|
|
|
|
|
|
|
|
pub struct TrackerState {
|
|
|
|
pub has_active_connection: bool,
|
|
|
|
pub tracking_id: u32,
|
|
|
|
pub last_detect: Instant,
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn start_socketserver(
|
|
|
|
rt: Handle,
|
|
|
|
mec: Sender<ApplicationEvent>,
|
|
|
|
stay_alive: Arc<AtomicBool>,
|
|
|
|
) {
|
|
|
|
let addr = "127.0.0.1:9002";
|
|
|
|
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
|
|
|
|
info!("Listening on: {}", addr);
|
|
|
|
|
|
|
|
let tracker_state = Arc::new(Mutex::new(TrackerState {
|
|
|
|
tracking_id: 0,
|
|
|
|
last_detect: Instant::now(),
|
|
|
|
has_active_connection: false,
|
|
|
|
}));
|
|
|
|
|
|
|
|
while let Ok((stream, _)) = listener.accept().await {
|
|
|
|
let peer = stream
|
|
|
|
.peer_addr()
|
|
|
|
.expect("connected streams should have a peer address");
|
2024-04-16 17:26:03 -07:00
|
|
|
debug!("Peer address: {}", peer);
|
2024-04-15 20:18:01 -07:00
|
|
|
|
|
|
|
rt.spawn(accept_connection(
|
|
|
|
peer,
|
|
|
|
stream,
|
|
|
|
mec.clone(),
|
|
|
|
tracker_state.clone(),
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
stay_alive.store(false, Ordering::SeqCst);
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn accept_connection(
|
|
|
|
peer: SocketAddr,
|
|
|
|
stream: TcpStream,
|
|
|
|
mec: Sender<ApplicationEvent>,
|
|
|
|
tracker_state: Arc<Mutex<TrackerState>>,
|
|
|
|
) {
|
|
|
|
if let Err(e) = handle_connection(peer, stream, mec.clone(), tracker_state).await {
|
|
|
|
match e {
|
|
|
|
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
|
|
|
|
err => error!("Error processing connection: {}", err),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_connection(
|
|
|
|
peer: SocketAddr,
|
|
|
|
stream: TcpStream,
|
|
|
|
mec: Sender<ApplicationEvent>,
|
|
|
|
tracker_state: Arc<Mutex<TrackerState>>,
|
|
|
|
) -> Result<()> {
|
|
|
|
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
|
|
|
|
info!("New WebSocket connection: {}", peer);
|
|
|
|
|
|
|
|
if let Err(e) = ws_stream.send(Message::text("Type?")).await {
|
|
|
|
error!("Error requesting connection type from {}: {}", peer, e);
|
|
|
|
if let Err(e1) = ws_stream.close(None).await {
|
|
|
|
error!("Could not close websocket after not receiving type: {e1}");
|
|
|
|
}
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut connection_type: Option<ConnectionType> = None;
|
|
|
|
|
|
|
|
while let Some(msg) = ws_stream.next().await {
|
|
|
|
let msg = match msg {
|
|
|
|
Ok(msg) => msg,
|
|
|
|
Err(e) => {
|
|
|
|
error!("Error receiving message from {}: {}", peer, e);
|
|
|
|
if let Err(e1) = ws_stream.close(None).await {
|
|
|
|
error!("Could not close websocket after receiving error: {e1}");
|
|
|
|
}
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
if msg.is_text() && msg.to_string().starts_with("Type: ") {
|
2024-04-16 17:26:03 -07:00
|
|
|
match msg.to_string().split(' ').collect::<Vec<&str>>()[1] {
|
2024-04-15 20:18:01 -07:00
|
|
|
"Automated" => {
|
2024-04-16 17:26:03 -07:00
|
|
|
debug!("Connection type is: Automated");
|
2024-04-15 20:18:01 -07:00
|
|
|
connection_type = Some(ConnectionType::Automated);
|
|
|
|
}
|
|
|
|
"Remote" => {
|
2024-04-16 17:26:03 -07:00
|
|
|
debug!("Connection type is: Remote");
|
2024-04-15 20:18:01 -07:00
|
|
|
connection_type = Some(ConnectionType::Remote);
|
|
|
|
}
|
2024-04-16 17:26:03 -07:00
|
|
|
_ => {
|
|
|
|
warn!("Unknown connection type, dropping connection");
|
|
|
|
ws_stream.close(None).await?;
|
|
|
|
}
|
2024-04-15 20:18:01 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if connection_type.is_some() {
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !ws_stream.is_terminated() {
|
|
|
|
match connection_type.unwrap() {
|
|
|
|
ConnectionType::Automated => {
|
|
|
|
automated_source::handle_connection(ws_stream, mec, tracker_state).await?;
|
|
|
|
}
|
|
|
|
ConnectionType::Remote => {
|
|
|
|
remote_source::handle_connection().await?;
|
|
|
|
}
|
|
|
|
_ => todo!(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|