redesigned the directory structure and fixed clippy

This commit is contained in:
Nickiel12 2024-07-21 23:27:34 +00:00
parent 293a550d3b
commit 5c516f9777
18 changed files with 98 additions and 278 deletions

52
Cargo.lock generated
View file

@ -1413,32 +1413,6 @@ version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c"
[[package]]
name = "joystick-controller-client"
version = "2.0.0"
dependencies = [
"async-channel",
"async-recursion",
"config",
"console-subscriber",
"futures-core",
"futures-util",
"gilrs",
"gst-plugin-gtk4",
"gstreamer",
"gstreamer-app",
"gtk4",
"log",
"serde",
"snafu",
"tokio",
"tokio-tungstenite",
"toml",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "js-sys"
version = "0.3.69"
@ -2647,6 +2621,32 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcs-controller"
version = "2.0.0"
dependencies = [
"async-channel",
"async-recursion",
"config",
"console-subscriber",
"futures-core",
"futures-util",
"gilrs",
"gst-plugin-gtk4",
"gstreamer",
"gstreamer-app",
"gtk4",
"log",
"serde",
"snafu",
"tokio",
"tokio-tungstenite",
"toml",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "vec_map"
version = "0.8.2"

View file

@ -1,5 +1,5 @@
[package]
name = "joystick-controller-client"
name = "vcs-controller"
version = "2.0.0"
edition = "2021"

View file

@ -16,8 +16,8 @@ use tracing::{debug, error, info, instrument};
use crate::config::AppConfig;
use crate::coordinator::socket_listen;
use crate::coordinator::tracker_state::TrackerState;
use crate::{gstreamer_pipeline, remote_sources};
use crate::{joystick_source::joystick_loop, ui::GuiUpdate};
use crate::gstreamer_pipeline;
use crate::{sources::joystick_source::joystick_loop, ui::GuiUpdate};
use super::perf_state::TrackerMetrics;
use super::remote_video_processor::remote_video_loop;
@ -35,7 +35,6 @@ pub struct CoordState<'a> {
pub tracker_metrics: TrackerMetrics,
pub sck_outbound: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
pub remote_sources_state: Arc<SocketState>,
pub stay_alive_sck_recvr: Arc<AtomicBool>,
pub joystick_loop_alive: Arc<AtomicBool>,
@ -62,16 +61,12 @@ impl<'a> CoordState<'a> {
settings: Arc<RwLock<AppConfig>>,
jpeg_quality: i32,
) -> Self {
let this = CoordState {
CoordState {
settings,
tracker_metrics: TrackerMetrics::new(to_gui.clone()),
sck_outbound: None,
stay_alive_sck_recvr: Arc::new(AtomicBool::new(false)),
remote_sources_state: Arc::new(SocketState {
stay_connected: AtomicBool::new(false),
is_connected: AtomicBool::new(false),
}),
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
current_priority: ConnectionType::Local,
@ -97,8 +92,7 @@ impl<'a> CoordState<'a> {
stay_connected: AtomicBool::new(false),
is_connected: AtomicBool::new(false),
}),
};
this
}
}
#[instrument(skip(self))]
@ -129,7 +123,7 @@ impl<'a> CoordState<'a> {
format!(
"ws://{}:{}",
read_settings.camera_ip,
read_settings.camera_port.to_string()
read_settings.camera_port
)
};
@ -181,7 +175,7 @@ impl<'a> CoordState<'a> {
format!(
"ws://{}:{}",
read_settings.tracker_ip,
read_settings.tracker_port.to_string()
read_settings.tracker_port
)
};
@ -216,24 +210,6 @@ impl<'a> CoordState<'a> {
self.start_video_loop().await;
}
//
if self
.remote_sources_state
.stay_connected
.load(Ordering::SeqCst)
&& !self
.remote_sources_state
.is_connected
.load(Ordering::SeqCst)
{
info!("Restarting socket server");
self.rt.spawn(remote_sources::start_socketserver(
self.rt.clone(),
self.to_mec.clone(),
self.remote_sources_state.clone(),
));
}
// if stay alive is false, and there is a connection, kill it
if !self.stay_alive_sck_recvr.load(Ordering::SeqCst) && self.sck_outbound.is_some() {
self.socket_close().await;
@ -248,9 +224,6 @@ impl<'a> CoordState<'a> {
self.socket_close().await;
self.joystick_loop_alive.store(false, Ordering::SeqCst);
self.remote_sources_state
.stay_connected
.store(false, Ordering::SeqCst);
self.to_gui.close();
self.mec.close();
}

View file

@ -16,11 +16,11 @@ use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, instrument};
mod coord_state;
mod perf_state;
mod process_box_string;
mod remote_video_processor;
pub mod tracker_state;
use crate::states::perf_state;
use crate::states::tracker_state;
use crate::config::AppConfig;
use crate::ui::{GuiUpdate, NormalizedBoxCoords};
pub use coord_state::{CoordState, SocketState};
@ -76,7 +76,7 @@ pub async fn start_coordinator(
let mec = pin!(mec);
let jpeg_quality = settings.read().await.tracker_jpeg_quality.clone();
let jpeg_quality = settings.read().await.tracker_jpeg_quality;
let mut state = CoordState::new(
mec,

View file

@ -6,7 +6,7 @@ use std::{
use async_recursion::async_recursion;
use async_channel::Sender;
use futures_util::{stream::{SplitSink, SplitStream}, SinkExt, StreamExt, TryStreamExt};
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink;
use tokio::{net::TcpStream, sync::Mutex, time::sleep_until};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
@ -49,19 +49,22 @@ pub async fn remote_video_loop(
loop {
last_iter = Instant::now();
// Do this in an encloser to not keep a lock on the appsink
let image_message = match {
let appsnk = appsink.lock().await;
let image_message = {
let res = {
let appsnk = appsink.lock().await;
get_video_frame(&appsnk)
} {
Ok(e) => e,
Err(e) => {
error!("Could not get video frame! {e}");
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
get_video_frame(&appsnk)
};
match res {
Ok(e) => e,
Err(e) => {
error!("Could not get video frame! {e}");
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
}
socket_state.is_connected.store(false, Ordering::SeqCst);
break;
}
socket_state.is_connected.store(false, Ordering::SeqCst);
break;
}
};
@ -75,7 +78,7 @@ pub async fn remote_video_loop(
break;
}
let do_not_break = handle_message(&mut recvr, &mut sender, &to_mec, last_iter).await;
let do_not_break = handle_message(&mut recvr, &to_mec, last_iter).await;
if !do_not_break { break; }
@ -131,7 +134,7 @@ fn get_video_frame(appsink: &AppSink) -> Result<Message, String> {
#[async_recursion]
async fn handle_message(
recvr: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
sender: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
// sender: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
to_mec: &Sender<ApplicationEvent>,
last_iter: Instant,
) -> bool {
@ -145,12 +148,12 @@ async fn handle_message(
Message::Pong(_) | Message::Frame(_) | Message::Text(_) => {
warn!("There was an unhandled message type from the camera: {}\n{}", message, message.to_string());
// this was not the expected response, recursion!
return handle_message(recvr, sender, to_mec, last_iter).await;
return handle_message(recvr, to_mec, last_iter).await;
}
Message::Ping(_) => {
// Ping/Pongs are handled by tokio tungstenite on reads and writes
// this was not the expected response, recursion!
return handle_message(recvr, sender, to_mec, last_iter).await;
return handle_message(recvr, to_mec, last_iter).await;
}
Message::Binary(bin) => {
let message = std::str::from_utf8(&bin);

View file

@ -97,7 +97,7 @@ impl WebcamPipeline {
sink_frame.set_property("caps", &appsrc_caps.to_value());
pipeline
.add_many(&[
.add_many([
&source,
&convert,
&rate,
@ -114,7 +114,7 @@ impl WebcamPipeline {
to: "pipeline",
})?;
Element::link_many(&[&source, &convert, &rate]).context(LinkSnafu {
Element::link_many([&source, &convert, &rate]).context(LinkSnafu {
from: "source et. al.",
to: "rate",
})?;
@ -133,13 +133,13 @@ impl WebcamPipeline {
let tee_src_1 = tee
.request_pad_simple("src_%u")
.ok_or(PipelineError::PadRequestError {
.ok_or(PipelineError::PadRequest {
element: "tee pad 1".to_string(),
})?;
let paintable_queue_sinkpad =
queue_app
.static_pad("sink")
.ok_or(PipelineError::PadRequestError {
.ok_or(PipelineError::PadRequest {
element: "gtk4 sink".to_string(),
})?;
@ -160,13 +160,13 @@ impl WebcamPipeline {
// -- BEGIN APPSINK PIPELINE
let tee_src_2 = tee
.request_pad_simple("src_%u")
.ok_or(PipelineError::PadRequestError {
.ok_or(PipelineError::PadRequest {
element: "tee pad 2".to_string(),
})?;
let appsink_queue_sinkpad =
appsink_queue
.static_pad("sink")
.ok_or(PipelineError::PadRequestError {
.ok_or(PipelineError::PadRequest {
element: "appsink queue".to_string(),
})?;
tee_src_2
@ -194,7 +194,7 @@ impl WebcamPipeline {
to: "resize_caps",
})?;
Element::link_many(&[&jpeg_enc, &sink_frame.upcast_ref()]).context(LinkSnafu {
Element::link_many([&jpeg_enc, &sink_frame.upcast_ref()]).context(LinkSnafu {
from: "jpeg_enc",
to: "appsink",
})?;
@ -210,19 +210,19 @@ impl WebcamPipeline {
#[derive(Debug, Snafu)]
pub enum PipelineError {
#[snafu(display("Error during element linking"))]
LinkError {
Link {
source: BoolError,
from: String,
to: String,
},
#[snafu(display("Error linking pads"))]
PadLinkError {
PadLink {
source: PadLinkError,
from: String,
to: String,
},
#[snafu(display("Error creating element"))]
BuildError { source: BoolError, element: String },
Build { source: BoolError, element: String },
#[snafu(display("Error getting pad from element"))]
PadRequestError { element: String },
PadRequest { element: String },
}

View file

@ -12,8 +12,8 @@ use crate::config::{load_config, AppConfig};
mod config;
mod coordinator;
mod gstreamer_pipeline;
mod joystick_source;
mod remote_sources;
mod sources;
mod states;
mod ui;
const APP_ID: &str = "net.nickiel.joystick-controller-client";

View file

@ -1,136 +0,0 @@
use std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
};
use async_channel::Sender;
use futures_core::FusedStream;
use futures_util::{SinkExt, StreamExt};
use log::{debug, error, info, warn};
use tokio::{
net::{TcpListener, TcpStream},
runtime::Handle,
};
use tokio_tungstenite::{
accept_async,
tungstenite::{Error, Message, Result},
};
use tracing::instrument;
mod remote_source;
use crate::coordinator::{
ApplicationEvent, ConnectionType, SocketState,
};
#[instrument(skip(rt, mec))]
pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
connection_state: Arc<SocketState>,
) {
let addr = "127.0.0.1:9002";
let listener = TcpListener::bind(&addr).await.expect("Can't listen");
info!("Listening on: {}", addr);
connection_state.is_connected.store(true, Ordering::SeqCst);
while let Ok((stream, _)) = listener.accept().await {
if !connection_state.stay_connected.load(Ordering::SeqCst) {
break;
}
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
debug!("Peer address: {}", peer);
rt.spawn(accept_connection(
peer,
stream,
mec.clone(),
));
}
connection_state.is_connected.store(false, Ordering::SeqCst);
}
#[instrument(skip(stream, mec))]
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
) {
if let Err(e) = handle_connection(peer, stream, mec.clone()).await {
match e {
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
err => error!("Error processing connection: {}", err),
}
}
}
#[instrument(skip(stream, mec))]
async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,
mec: Sender<ApplicationEvent>,
) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
if let Err(e) = ws_stream.send(Message::text("Type?")).await {
error!("Error requesting connection type from {}: {}", peer, e);
if let Err(e1) = ws_stream.close(None).await {
error!("Could not close websocket after not receiving type: {e1}");
}
return Ok(());
}
let mut connection_type: Option<ConnectionType> = None;
while let Some(msg) = ws_stream.next().await {
let msg = match msg {
Ok(msg) => msg,
Err(e) => {
error!("Error receiving message from {}: {}", peer, e);
if let Err(e1) = ws_stream.close(None).await {
error!("Could not close websocket after receiving error: {e1}");
}
return Ok(());
}
};
if msg.is_text() && msg.to_string().starts_with("Type: ") {
match msg.to_string().split(' ').collect::<Vec<&str>>()[1] {
"Automated" => {
debug!("Connection type is: Automated");
connection_type = Some(ConnectionType::Automated);
}
"Remote" => {
debug!("Connection type is: Remote");
connection_type = Some(ConnectionType::Remote);
}
_ => {
warn!("Unknown connection type, dropping connection");
ws_stream.close(None).await?;
}
}
}
if connection_type.is_some() {
break;
}
}
if !ws_stream.is_terminated() {
match connection_type.unwrap() {
ConnectionType::Automated => {
todo!();
}
ConnectionType::Remote => {
remote_source::handle_connection().await?;
}
_ => todo!(),
}
}
Ok(())
}

View file

@ -1,27 +0,0 @@
/*
use std::{
cmp::{min, max}, sync::{
Arc, Mutex,
}, time::{Duration, Instant}
};
use async_channel::Sender;
use futures_util::{SinkExt, StreamExt};
use log::{error, info};
use tokio::net::TcpStream;
use tokio_tungstenite::{
tungstenite::Result, WebSocketStream,
};
use crate::coordinator::ApplicationEvent;
use super::TrackerState;
*/
use tokio_tungstenite::tungstenite::Result;
pub async fn handle_connection(// mut ws_stream: WebSocketStream<TcpStream>,
// mec: Sender<ApplicationEvent>,
// tracker_state: Arc<Mutex<TrackerState>>
) -> Result<()> {
todo!("Remote connections not implemented yet");
}

2
src/sources/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod joystick_source;

4
src/states/mod.rs Normal file
View file

@ -0,0 +1,4 @@
pub mod perf_state;
pub mod tracker_state;

View file

@ -1,8 +1,9 @@
use std::{collections::VecDeque, time::Duration};
use async_channel::Sender;
use tracing::{error, info};
use tracing::error;
use crate::coordinator::TrackerUpdate;
use crate::ui::GuiUpdate;
const MAX_RECORDED_TIMES: usize = 10;
@ -30,7 +31,7 @@ impl TrackerMetrics {
}
fn update_gui(&mut self) {
if let Err(e) = self.to_gui.send_blocking(GuiUpdate::TrackerUpdate(super::TrackerUpdate::HeaderUpdate(self.header_text.clone()))) {
if let Err(e) = self.to_gui.send_blocking(GuiUpdate::TrackerUpdate(TrackerUpdate::HeaderUpdate(self.header_text.clone()))) {
error!("TrackerMetrics couldnt' send update to GUI: {e}");
}
}
@ -67,19 +68,19 @@ impl TrackerMetrics {
if avg_time == 0 {
self.header_text = format!(
"Status: Failed Avg Response: {} ms",
avg_time.to_string()
avg_time
);
}
if avg_time > DEGRADED_TRACKER_TIME {
self.header_text = format!(
"Status: Degraded Avg Response: {} ms",
avg_time.to_string()
avg_time
);
} else {
self.header_text = format!(
"Status: Nominal Avg Response: {} ms",
avg_time.to_string()
avg_time
);
}

View file

@ -14,7 +14,8 @@ use tracing::{error, event, span, Level};
#[cfg(feature = "tracker-state-debug")]
use tracing::debug;
use crate::coordinator::{tracker_state::TrackerState, ApplicationEvent};
use crate::coordinator::ApplicationEvent;
use crate::states::tracker_state::TrackerState;
#[derive(Debug)]
pub struct ControlPanel {

View file

@ -10,7 +10,8 @@ use gtk::{
AspectFrame, Box, DrawingArea, EventControllerMotion, GestureClick, Label, Overlay, Picture,
};
use crate::coordinator::{tracker_state::TrackerState, ApplicationEvent};
use crate::coordinator::ApplicationEvent;
use crate::states::tracker_state::TrackerState;
use super::NormalizedBoxCoords;
@ -154,19 +155,19 @@ impl LiveViewPanel {
}
fn calc_box_under_mouse(
boxes: &Vec<NormalizedBoxCoords>,
boxes: &[NormalizedBoxCoords],
x_coord: f32,
y_coord: f32,
) -> Option<u32> {
let mut mouse_over: Vec<NormalizedBoxCoords> = vec![];
for nb in boxes.iter() {
if nb.x1 < x_coord as f32
&& nb.y1 < y_coord as f32
&& nb.x2 > x_coord as f32
&& nb.y2 > y_coord as f32
if nb.x1 < x_coord
&& nb.y1 < y_coord
&& nb.x2 > x_coord
&& nb.y2 > y_coord
{
mouse_over.push(nb.clone());
mouse_over.push(*nb);
}
}
if mouse_over.len() > 1 {

View file

@ -12,7 +12,7 @@ use tokio::runtime::Handle;
use tokio::sync::RwLock;
use crate::config::AppConfig;
use crate::coordinator::tracker_state::TrackerState;
use crate::states::tracker_state::TrackerState;
use crate::coordinator::{start_coordinator, ApplicationEvent, MoveEvent, TrackerUpdate};
mod control_panel;
@ -60,7 +60,7 @@ pub struct NormalizedBoxCoords {
}
impl NormalizedBoxCoords {
fn into_relative(&self, width: i32, height: i32) -> BoxCoords {
fn absolute_coords(&self, width: i32, height: i32) -> BoxCoords {
BoxCoords {
id: self.id,
x1: (self.x1 * width as f32) as u32,
@ -136,7 +136,7 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
let connections_modal_config = config.clone();
let connections_activate = gio::ActionEntry::builder("connections")
.activate(clone!(@weak window => move |app: &gtk::Application, _, _| {
let connections_modal = settings_modal::ConnectionsModal::new(&app, &window, &rt, &connections_modal_config);
let connections_modal = settings_modal::ConnectionsModal::new(app, &window, &rt, &connections_modal_config);
connections_modal.window.present();
}))
@ -181,7 +181,7 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
let drawable_ts = tracker_state.clone();
drawable.set_draw_func(move |_, ctx, width, height| {
draw_boxes(width, height, &ctx, &drawable_ts);
draw_boxes(width, height, ctx, &drawable_ts);
});
liveview_panel.set_drawable(&drawable);
@ -230,10 +230,8 @@ pub fn build_ui(app: &Application, config: Arc<RwLock<AppConfig>>, runtime: Hand
}
if !ids.contains(&item) {
to_delete_indexes.push(i);
} else {
if let Some(pos) = ids.iter().position(|x| **x == item) {
ids.remove(pos);
}
} else if let Some(pos) = ids.iter().position(|x| **x == item) {
ids.remove(pos);
}
}
@ -386,7 +384,7 @@ fn draw_boxes(width: i32, height: i32, ctx: &Context, tracker_state: &Arc<Mutex<
ctx.set_source_rgb(1.0, 1.0, 0.0);
}
let b = nb.into_relative(width, height);
let b = nb.absolute_coords(width, height);
ctx.rectangle(
b.x1 as f64,
b.y1 as f64,

View file

@ -143,7 +143,7 @@ impl ConnectionsModal {
write_lock.tracker_port = new_tracker_port;
write_lock.tracker_refresh_rate_millis = new_tracker_millis;
// why does this feel like the borrow checker gave me a pass?
if let Err(e) = save_config(&write_lock.to_owned()) {
if let Err(e) = save_config(&write_lock) {
error!("Could not save config! {e}");
}
// FBI!!! OPEN UP!!!!