diff --git a/Cargo.toml b/Cargo.toml index 1e3a714..c6f7d37 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,7 @@ gst-plugin-gtk4 = { version = "0.12.2", features = ["gtk_v4_12"] } gtk = { version = "0.8.1", package = "gtk4", features = ["v4_12"] } log = "0.4.21" serde = { version = "1.0.197", features = ["derive"] } -tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] } +tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] } tokio-tungstenite = "0.21.0" toml = "0.8.12" tracing = "0.1.40" diff --git a/src/config.rs b/src/config.rs index 991ce9f..6da8b41 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,10 +1,9 @@ use config::{Config, FileFormat}; -use gtk::cairo::IoError; -use snafu::prelude::*; -use log::{error, info}; use serde::{Deserialize, Serialize}; +use snafu::prelude::*; use std::fs::File; use std::io::Write; +use tracing::{info, instrument}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct AppConfig { @@ -40,15 +39,23 @@ pub fn load_config() -> AppConfig { #[derive(Debug, Snafu)] pub enum SaveConfigError { #[snafu(display("Could not serialize app state: {source}"))] - SerdeError {source: toml::ser::Error }, + SerdeError { source: toml::ser::Error }, #[snafu(display("Could not write app state to file: {path}"))] - IoError {source: std::io::Error, path: String }, + IoError { + source: std::io::Error, + path: String, + }, } +#[instrument] pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> { let toml_str = toml::to_string(&config).context(SerdeSnafu)?; - let mut file = File::create("./settings.toml").context(IoSnafu {path: "./settings.toml" })?; - file.write_all(toml_str.as_bytes()).context(IoSnafu {path: "./settings.toml" })?; + let mut file = File::create("./settings.toml").context(IoSnafu { + path: "./settings.toml", + })?; + file.write_all(toml_str.as_bytes()).context(IoSnafu { + path: "./settings.toml", + })?; info!("Config file saved successfully"); Ok(()) } diff --git a/src/coordinator/mod.rs b/src/coordinator/mod.rs index 9bb05f3..0f09358 100644 --- a/src/coordinator/mod.rs +++ b/src/coordinator/mod.rs @@ -13,14 +13,15 @@ use futures_util::{ }; use gstreamer::prelude::ElementExt; use gstreamer::State; -use log::{error, info}; use tokio::net::TcpStream; use tokio::runtime::Handle; use tokio::sync::RwLock; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tracing::{debug, error, info, instrument}; mod process_box_string; mod remote_video_processor; + use crate::config::AppConfig; use crate::remote_sources::TrackerState; use crate::{gstreamer_pipeline, remote_sources}; @@ -45,22 +46,23 @@ pub enum ConnectionType { pub enum ApplicationEvent { StartCameraSocket, - StartTrackerSocket, SocketMessage(Message), MoveEvent(MoveEvent, ConnectionType), EnableAutomatic(bool), } +#[derive(Debug)] struct SocketState { - pub is_connected: Arc, - pub stay_connected: Arc, + pub is_connected: AtomicBool, + pub stay_connected: AtomicBool, } +#[derive(Debug)] struct CoordState<'a> { pub settings: Arc>, pub sck_outbound: Option>, Message>>, - pub sck_alive_server: Arc, + pub keep_remote_sources_alive: Arc, pub sck_alive_recvr: Arc, pub joystick_loop_alive: Arc, @@ -73,10 +75,9 @@ struct CoordState<'a> { pub rt: Handle, pub pipeline: gstreamer_pipeline::WebcamPipeline, - pub tracker_keep_alive: Arc, - pub tracker_is_alive: Arc, pub tracker_state: Arc>, + pub tracker_connection_state: Arc, } impl<'a> CoordState<'a> { @@ -93,7 +94,7 @@ impl<'a> CoordState<'a> { sck_outbound: None, sck_alive_recvr: Arc::new(AtomicBool::new(false)), - sck_alive_server: Arc::new(AtomicBool::new(false)), + keep_remote_sources_alive: Arc::new(AtomicBool::new(false)), joystick_loop_alive: Arc::new(AtomicBool::new(false)), current_priority: ConnectionType::Local, @@ -105,14 +106,17 @@ impl<'a> CoordState<'a> { rt, pipeline: gstreamer_pipeline::WebcamPipeline::new().unwrap(), - tracker_keep_alive: Arc::new(AtomicBool::new(false)), - tracker_is_alive: Arc::new(AtomicBool::new(false)), tracker_state, + tracker_connection_state: Arc::new(SocketState { + stay_connected: AtomicBool::new(false), + is_connected: AtomicBool::new(false), + }), }; this } + #[instrument] pub async fn socket_send(&mut self, message: Message) { if let Some(mut socket) = self.sck_outbound.take() { if let Err(e) = socket.send(message).await { @@ -128,7 +132,8 @@ impl<'a> CoordState<'a> { } async fn socket_start(&mut self) { - info!("Starting socket"); + debug!("Starting socket"); + self.sck_alive_recvr.store(true, Ordering::SeqCst); let conn_string: String = { let read_settings = self.settings.read().await; @@ -182,8 +187,7 @@ impl<'a> CoordState<'a> { conn_string, self.pipeline.sink_frame.clone(), self.to_mec.clone(), - self.tracker_keep_alive.clone(), - self.tracker_is_alive.clone(), + self.tracker_connection_state.clone(), self.tracker_state.clone(), self.rt.clone(), )); @@ -198,19 +202,27 @@ impl<'a> CoordState<'a> { )); } - if !self.tracker_is_alive.load(Ordering::SeqCst) { - if self.tracker_keep_alive.load(Ordering::SeqCst) { + if !self + .tracker_connection_state + .is_connected + .load(Ordering::SeqCst) + { + if self + .tracker_connection_state + .stay_connected + .load(Ordering::SeqCst) + { self.start_video_loop().await; } } - if !self.sck_alive_server.load(Ordering::SeqCst) { + if !self.keep_remote_sources_alive.load(Ordering::SeqCst) { info!("Restarting socket server"); - self.sck_alive_server.store(true, Ordering::SeqCst); + self.keep_remote_sources_alive.store(true, Ordering::SeqCst); self.rt.spawn(remote_sources::start_socketserver( self.rt.clone(), self.to_mec.clone(), - self.sck_alive_server.clone(), + self.keep_remote_sources_alive.clone(), self.tracker_state.clone(), )); } @@ -229,16 +241,20 @@ impl<'a> CoordState<'a> { pub async fn close(&mut self) { info!("closing coord state"); - self.tracker_keep_alive.store(false, Ordering::SeqCst); + self.tracker_connection_state + .stay_connected + .store(false, Ordering::SeqCst); self.socket_close().await; self.joystick_loop_alive.store(false, Ordering::SeqCst); - self.sck_alive_server.store(false, Ordering::SeqCst); + self.keep_remote_sources_alive + .store(false, Ordering::SeqCst); self.to_gui.close(); self.mec.close(); } } +#[instrument] pub async fn start_coordinator( // Main_Event_Channel mec: Receiver, @@ -279,9 +295,6 @@ pub async fn start_coordinator( ApplicationEvent::StartCameraSocket => { state.socket_start().await; } - ApplicationEvent::StartTrackerSocket => { - state.start_video_loop().await; - } ApplicationEvent::SocketMessage(socket_message) => { if let Err(e) = state.to_gui.send(GuiUpdate::SocketState(true)).await { error!("Could not send to gui thread! Closing coordinator: {e}"); @@ -296,7 +309,10 @@ pub async fn start_coordinator( debug!("Trying to get lock on tracker_state for enable automatic"); if let Ok(mut ts) = state.tracker_state.lock() { ts.enabled = do_enable; - state.tracker_keep_alive.store(do_enable, Ordering::SeqCst); + state + .tracker_connection_state + .stay_connected + .store(do_enable, Ordering::SeqCst); } state.check_states().await; } @@ -337,6 +353,7 @@ pub async fn start_coordinator( info!("Stopping Coordinator"); } +#[instrument] async fn socket_listen( mec: Sender, socket_recv_is_alive: Arc, diff --git a/src/coordinator/remote_video_processor.rs b/src/coordinator/remote_video_processor.rs index 423e93e..60fac12 100644 --- a/src/coordinator/remote_video_processor.rs +++ b/src/coordinator/remote_video_processor.rs @@ -1,9 +1,6 @@ use std::{ cmp::{max, min}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, Mutex, - }, + sync::{atomic::Ordering, Arc, Mutex}, time::Duration, }; @@ -11,24 +8,24 @@ use async_channel::Sender; use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt}; use gstreamer_app::AppSink; use gstreamer_video::{video_frame::Readable, VideoFrame, VideoInfo}; -use log::{error, info}; use tokio::{ net::TcpStream, runtime::Handle, time::{sleep_until, Instant}, }; use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tracing::{error, info, instrument}; use crate::remote_sources::TrackerState; -use super::{process_box_string::process_incoming_string, ApplicationEvent}; +use super::{process_box_string::process_incoming_string, ApplicationEvent, SocketState}; +#[instrument] pub async fn remote_video_loop( conn_string: String, appsink: Arc>, to_mec: Sender, - keep_alive: Arc, - is_alive: Arc, + socket_state: Arc, tracker_state: Arc>, runtime: Handle, ) { @@ -42,7 +39,7 @@ pub async fn remote_video_loop( .expect("Could not build video info!"); loop { - is_alive.store(true, Ordering::SeqCst); + socket_state.is_connected.store(true, Ordering::SeqCst); match connect_async(&conn_string).await { Err(e) => { error!("Could not connect to remote video loop! Trying again in 1 seconds: {e}"); @@ -55,7 +52,7 @@ pub async fn remote_video_loop( recvr, to_mec.clone(), tracker_state.clone(), - keep_alive.clone(), + socket_state.clone(), )); loop { @@ -77,7 +74,7 @@ pub async fn remote_video_loop( if let Err(e) = sender.close().await { error!("Could not close socket to remote computer: {e}") } - keep_alive.store(false, Ordering::SeqCst); + socket_state.is_connected.store(false, Ordering::SeqCst); return; } }; @@ -94,11 +91,12 @@ pub async fn remote_video_loop( if let Err(e) = sender.close().await { error!("Could not close socket to remote computer: {e}") } - keep_alive.store(false, Ordering::SeqCst); + socket_state.is_connected.store(false, Ordering::SeqCst); + socket_state.stay_connected.store(false, Ordering::SeqCst); return; } - if !keep_alive.load(Ordering::SeqCst) { + if !socket_state.stay_connected.load(Ordering::SeqCst) { info!("Shutting down remote video loop"); break; } @@ -107,22 +105,23 @@ pub async fn remote_video_loop( } } } - if !keep_alive.load(Ordering::SeqCst) { + if !socket_state.stay_connected.load(Ordering::SeqCst) { info!("Shutting down remote video loop"); break; } } - is_alive.store(false, Ordering::SeqCst); + socket_state.is_connected.store(false, Ordering::SeqCst); } +#[instrument] async fn listen_to_messages( mut recvr: SplitStream>>, to_mec: Sender, tracker_state: Arc>, - keep_alive: Arc, + socket_state: Arc, ) { info!("Starting tracker connection listen"); - while keep_alive.load(Ordering::SeqCst) { + while socket_state.stay_connected.load(Ordering::SeqCst) { match recvr.try_next().await { Ok(Some(message)) => { let (x_off, y_off, _do_send) = @@ -143,7 +142,8 @@ async fn listen_to_messages( .await { error!("Could not send message to MEC, assuming critical failure: {e}"); - keep_alive.store(false, Ordering::SeqCst); + socket_state.is_connected.store(false, Ordering::SeqCst); + socket_state.stay_connected.store(false, Ordering::SeqCst); return; } } @@ -156,7 +156,7 @@ async fn listen_to_messages( } info!( "Stopping tracker connection listen with keep alive: {}", - keep_alive.load(Ordering::SeqCst) + socket_state.stay_connected.load(Ordering::SeqCst) ); } @@ -164,10 +164,11 @@ fn get_video_frame( appsink: &AppSink, video_info: &VideoInfo, ) -> Result, String> { - let sample = appsink + let buffer = appsink .pull_sample() - .map_err(|e| format!("Could not pull appsink sample: {e}"))?; - let buffer = sample.buffer_owned().unwrap(); + .map_err(|e| format!("Could not pull appsink sample: {e}"))? + .buffer_owned() + .ok_or(format!("Could not get owned buffer from appsink"))?; gstreamer_video::VideoFrame::from_buffer_readable(buffer, video_info) .map_err(|_| format!("Unable to make video frame from buffer!")) } diff --git a/src/gstreamer_pipeline.rs b/src/gstreamer_pipeline.rs index 90d0ba4..ab37ee5 100644 --- a/src/gstreamer_pipeline.rs +++ b/src/gstreamer_pipeline.rs @@ -6,6 +6,7 @@ use snafu::prelude::*; use std::str::FromStr; use std::sync::{Arc, Mutex}; +#[derive(Debug)] pub struct WebcamPipeline { pub pipeline: Pipeline, diff --git a/src/main.rs b/src/main.rs index d6682d2..2880514 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,7 @@ use tracing::{self, info, Level}; use tracing_appender; use tracing_subscriber; -use crate::config::load_config; +use crate::config::{load_config, AppConfig}; mod config; mod coordinator; @@ -20,16 +20,16 @@ fn main() -> glib::ExitCode { // set the environment var to make gtk use window's default action bar env::set_var("gtk_csd", "0"); - let file_appender = tracing_appender::rolling::daily(".", "joystick-log.log"); + let file_appender = tracing_appender::rolling::daily(".", "joystick-log"); let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender); tracing_subscriber::fmt().with_writer(non_blocking).init(); let span = tracing::span!(Level::TRACE, "main"); let _enter = span.enter(); - info!("tracing intialized"); + info!("Logging intialized"); - let config = Arc::new(RwLock::new(load_config())); + let config: Arc> = Arc::new(RwLock::new(load_config())); gstreamer::init().expect("Unable to start gstreamer"); gstgtk4::plugin_register_static().expect("Unable to register gtk4 plugin"); diff --git a/src/remote_sources/mod.rs b/src/remote_sources/mod.rs index b858d0c..1ae3e80 100644 --- a/src/remote_sources/mod.rs +++ b/src/remote_sources/mod.rs @@ -19,6 +19,7 @@ use tokio_tungstenite::{ accept_async, tungstenite::{Error, Message, Result}, }; +use tracing::instrument; mod remote_source; @@ -27,6 +28,7 @@ use crate::{ ui::NormalizedBoxCoords, }; +#[derive(Debug)] pub struct TrackerState { pub tracking_id: u32, pub last_detect: Instant, @@ -37,6 +39,7 @@ pub struct TrackerState { pub identity_boxes: Vec, } +#[instrument] pub async fn start_socketserver( rt: Handle, mec: Sender, @@ -64,6 +67,7 @@ pub async fn start_socketserver( stay_alive.store(false, Ordering::SeqCst); } +#[instrument] async fn accept_connection( peer: SocketAddr, stream: TcpStream, @@ -78,6 +82,7 @@ async fn accept_connection( } } +#[instrument] async fn handle_connection( peer: SocketAddr, stream: TcpStream, diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 75d1d81..5e25521 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -1,8 +1,9 @@ +use std::fmt::Display; use std::sync::{Arc, Mutex}; use std::time::Instant; use gtk::cairo::Context; -use gtk::gdk::{Display, Paintable}; +use gtk::gdk::Paintable; use gtk::glib::clone; use gtk::{gio, glib, prelude::*, AspectFrame, CssProvider, Label, ListBox}; use gtk::{Application, ApplicationWindow}; @@ -27,6 +28,7 @@ pub enum GuiUpdate { UpdatePaintable(gstreamer::Element), } +#[derive(Debug)] pub struct BoxCoords { pub id: u32, pub x1: u32, @@ -35,6 +37,17 @@ pub struct BoxCoords { pub y2: u32, } +impl Display for BoxCoords { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Absolute Box {}, x1: {}, y1: {}, x2: {}, y2: {}", + self.id, self.x1, self.y1, self.x2, self.y2 + ) + } +} + +#[derive(Debug)] pub struct NormalizedBoxCoords { pub id: u32, pub x1: f32, @@ -55,12 +68,22 @@ impl NormalizedBoxCoords { } } +impl Display for NormalizedBoxCoords { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Normalized Box {}, x1: {}, y1: {}, x2: {}, y2: {}", + self.id, self.x1, self.y1, self.x2, self.y2 + ) + } +} + pub fn on_activate(app: &Application) { let provider = CssProvider::new(); provider.load_from_string(include_str!("../../style.css")); gtk::style_context_add_provider_for_display( - &Display::default().expect("Could not connect to a display"), + >k::gdk::Display::default().expect("Could not connect to a display"), &provider, gtk::STYLE_PROVIDER_PRIORITY_APPLICATION, ); diff --git a/src/ui/tracker_panel.rs b/src/ui/tracker_panel.rs index dfd7372..e83dde0 100644 --- a/src/ui/tracker_panel.rs +++ b/src/ui/tracker_panel.rs @@ -10,10 +10,14 @@ use gtk::{ Box, Label, ListItem, ListView, ScrolledWindow, SignalListItemFactory, SingleSelection, StringList, StringObject, ToggleButton, Widget, }; -use log::{debug, error}; +use tracing::{error, instrument}; + +#[cfg(feature = "tracker-state-debug")] +use tracing::debug; use crate::{coordinator::ApplicationEvent, remote_sources::TrackerState}; +#[derive(Debug)] pub struct TrackerPanel { top_level: Box, @@ -117,6 +121,7 @@ impl TrackerPanel { &self.top_level } + #[instrument] pub fn connect_button_callback(&self, to_mec: Sender) { self.enable_disable.connect_clicked(move |button| { if let Err(e) =