got camera connections working

This commit is contained in:
Nickiel12 2024-05-21 13:36:08 -07:00
parent 30dcdfaf55
commit 4b284cc7eb
9 changed files with 120 additions and 61 deletions

View file

@ -21,7 +21,7 @@ gst-plugin-gtk4 = { version = "0.12.2", features = ["gtk_v4_12"] }
gtk = { version = "0.8.1", package = "gtk4", features = ["v4_12"] }
log = "0.4.21"
serde = { version = "1.0.197", features = ["derive"] }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time"] }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "time", "sync"] }
tokio-tungstenite = "0.21.0"
toml = "0.8.12"
tracing = "0.1.40"

View file

@ -1,10 +1,9 @@
use config::{Config, FileFormat};
use gtk::cairo::IoError;
use snafu::prelude::*;
use log::{error, info};
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use std::fs::File;
use std::io::Write;
use tracing::{info, instrument};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppConfig {
@ -40,15 +39,23 @@ pub fn load_config() -> AppConfig {
#[derive(Debug, Snafu)]
pub enum SaveConfigError {
#[snafu(display("Could not serialize app state: {source}"))]
SerdeError {source: toml::ser::Error },
SerdeError { source: toml::ser::Error },
#[snafu(display("Could not write app state to file: {path}"))]
IoError {source: std::io::Error, path: String },
IoError {
source: std::io::Error,
path: String,
},
}
#[instrument]
pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> {
let toml_str = toml::to_string(&config).context(SerdeSnafu)?;
let mut file = File::create("./settings.toml").context(IoSnafu {path: "./settings.toml" })?;
file.write_all(toml_str.as_bytes()).context(IoSnafu {path: "./settings.toml" })?;
let mut file = File::create("./settings.toml").context(IoSnafu {
path: "./settings.toml",
})?;
file.write_all(toml_str.as_bytes()).context(IoSnafu {
path: "./settings.toml",
})?;
info!("Config file saved successfully");
Ok(())
}

View file

@ -13,14 +13,15 @@ use futures_util::{
};
use gstreamer::prelude::ElementExt;
use gstreamer::State;
use log::{error, info};
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::RwLock;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, info, instrument};
mod process_box_string;
mod remote_video_processor;
use crate::config::AppConfig;
use crate::remote_sources::TrackerState;
use crate::{gstreamer_pipeline, remote_sources};
@ -45,22 +46,23 @@ pub enum ConnectionType {
pub enum ApplicationEvent {
StartCameraSocket,
StartTrackerSocket,
SocketMessage(Message),
MoveEvent(MoveEvent, ConnectionType),
EnableAutomatic(bool),
}
#[derive(Debug)]
struct SocketState {
pub is_connected: Arc<AtomicBool>,
pub stay_connected: Arc<AtomicBool>,
pub is_connected: AtomicBool,
pub stay_connected: AtomicBool,
}
#[derive(Debug)]
struct CoordState<'a> {
pub settings: Arc<RwLock<AppConfig>>,
pub sck_outbound: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
pub sck_alive_server: Arc<AtomicBool>,
pub keep_remote_sources_alive: Arc<AtomicBool>,
pub sck_alive_recvr: Arc<AtomicBool>,
pub joystick_loop_alive: Arc<AtomicBool>,
@ -73,10 +75,9 @@ struct CoordState<'a> {
pub rt: Handle,
pub pipeline: gstreamer_pipeline::WebcamPipeline,
pub tracker_keep_alive: Arc<AtomicBool>,
pub tracker_is_alive: Arc<AtomicBool>,
pub tracker_state: Arc<Mutex<TrackerState>>,
pub tracker_connection_state: Arc<SocketState>,
}
impl<'a> CoordState<'a> {
@ -93,7 +94,7 @@ impl<'a> CoordState<'a> {
sck_outbound: None,
sck_alive_recvr: Arc::new(AtomicBool::new(false)),
sck_alive_server: Arc::new(AtomicBool::new(false)),
keep_remote_sources_alive: Arc::new(AtomicBool::new(false)),
joystick_loop_alive: Arc::new(AtomicBool::new(false)),
current_priority: ConnectionType::Local,
@ -105,14 +106,17 @@ impl<'a> CoordState<'a> {
rt,
pipeline: gstreamer_pipeline::WebcamPipeline::new().unwrap(),
tracker_keep_alive: Arc::new(AtomicBool::new(false)),
tracker_is_alive: Arc::new(AtomicBool::new(false)),
tracker_state,
tracker_connection_state: Arc::new(SocketState {
stay_connected: AtomicBool::new(false),
is_connected: AtomicBool::new(false),
}),
};
this
}
#[instrument]
pub async fn socket_send(&mut self, message: Message) {
if let Some(mut socket) = self.sck_outbound.take() {
if let Err(e) = socket.send(message).await {
@ -128,7 +132,8 @@ impl<'a> CoordState<'a> {
}
async fn socket_start(&mut self) {
info!("Starting socket");
debug!("Starting socket");
self.sck_alive_recvr.store(true, Ordering::SeqCst);
let conn_string: String = {
let read_settings = self.settings.read().await;
@ -182,8 +187,7 @@ impl<'a> CoordState<'a> {
conn_string,
self.pipeline.sink_frame.clone(),
self.to_mec.clone(),
self.tracker_keep_alive.clone(),
self.tracker_is_alive.clone(),
self.tracker_connection_state.clone(),
self.tracker_state.clone(),
self.rt.clone(),
));
@ -198,19 +202,27 @@ impl<'a> CoordState<'a> {
));
}
if !self.tracker_is_alive.load(Ordering::SeqCst) {
if self.tracker_keep_alive.load(Ordering::SeqCst) {
if !self
.tracker_connection_state
.is_connected
.load(Ordering::SeqCst)
{
if self
.tracker_connection_state
.stay_connected
.load(Ordering::SeqCst)
{
self.start_video_loop().await;
}
}
if !self.sck_alive_server.load(Ordering::SeqCst) {
if !self.keep_remote_sources_alive.load(Ordering::SeqCst) {
info!("Restarting socket server");
self.sck_alive_server.store(true, Ordering::SeqCst);
self.keep_remote_sources_alive.store(true, Ordering::SeqCst);
self.rt.spawn(remote_sources::start_socketserver(
self.rt.clone(),
self.to_mec.clone(),
self.sck_alive_server.clone(),
self.keep_remote_sources_alive.clone(),
self.tracker_state.clone(),
));
}
@ -229,16 +241,20 @@ impl<'a> CoordState<'a> {
pub async fn close(&mut self) {
info!("closing coord state");
self.tracker_keep_alive.store(false, Ordering::SeqCst);
self.tracker_connection_state
.stay_connected
.store(false, Ordering::SeqCst);
self.socket_close().await;
self.joystick_loop_alive.store(false, Ordering::SeqCst);
self.sck_alive_server.store(false, Ordering::SeqCst);
self.keep_remote_sources_alive
.store(false, Ordering::SeqCst);
self.to_gui.close();
self.mec.close();
}
}
#[instrument]
pub async fn start_coordinator(
// Main_Event_Channel
mec: Receiver<ApplicationEvent>,
@ -279,9 +295,6 @@ pub async fn start_coordinator(
ApplicationEvent::StartCameraSocket => {
state.socket_start().await;
}
ApplicationEvent::StartTrackerSocket => {
state.start_video_loop().await;
}
ApplicationEvent::SocketMessage(socket_message) => {
if let Err(e) = state.to_gui.send(GuiUpdate::SocketState(true)).await {
error!("Could not send to gui thread! Closing coordinator: {e}");
@ -296,7 +309,10 @@ pub async fn start_coordinator(
debug!("Trying to get lock on tracker_state for enable automatic");
if let Ok(mut ts) = state.tracker_state.lock() {
ts.enabled = do_enable;
state.tracker_keep_alive.store(do_enable, Ordering::SeqCst);
state
.tracker_connection_state
.stay_connected
.store(do_enable, Ordering::SeqCst);
}
state.check_states().await;
}
@ -337,6 +353,7 @@ pub async fn start_coordinator(
info!("Stopping Coordinator");
}
#[instrument]
async fn socket_listen(
mec: Sender<ApplicationEvent>,
socket_recv_is_alive: Arc<AtomicBool>,

View file

@ -1,9 +1,6 @@
use std::{
cmp::{max, min},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
sync::{atomic::Ordering, Arc, Mutex},
time::Duration,
};
@ -11,24 +8,24 @@ use async_channel::Sender;
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryStreamExt};
use gstreamer_app::AppSink;
use gstreamer_video::{video_frame::Readable, VideoFrame, VideoInfo};
use log::{error, info};
use tokio::{
net::TcpStream,
runtime::Handle,
time::{sleep_until, Instant},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{error, info, instrument};
use crate::remote_sources::TrackerState;
use super::{process_box_string::process_incoming_string, ApplicationEvent};
use super::{process_box_string::process_incoming_string, ApplicationEvent, SocketState};
#[instrument]
pub async fn remote_video_loop(
conn_string: String,
appsink: Arc<Mutex<AppSink>>,
to_mec: Sender<ApplicationEvent>,
keep_alive: Arc<AtomicBool>,
is_alive: Arc<AtomicBool>,
socket_state: Arc<SocketState>,
tracker_state: Arc<Mutex<TrackerState>>,
runtime: Handle,
) {
@ -42,7 +39,7 @@ pub async fn remote_video_loop(
.expect("Could not build video info!");
loop {
is_alive.store(true, Ordering::SeqCst);
socket_state.is_connected.store(true, Ordering::SeqCst);
match connect_async(&conn_string).await {
Err(e) => {
error!("Could not connect to remote video loop! Trying again in 1 seconds: {e}");
@ -55,7 +52,7 @@ pub async fn remote_video_loop(
recvr,
to_mec.clone(),
tracker_state.clone(),
keep_alive.clone(),
socket_state.clone(),
));
loop {
@ -77,7 +74,7 @@ pub async fn remote_video_loop(
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
}
keep_alive.store(false, Ordering::SeqCst);
socket_state.is_connected.store(false, Ordering::SeqCst);
return;
}
};
@ -94,11 +91,12 @@ pub async fn remote_video_loop(
if let Err(e) = sender.close().await {
error!("Could not close socket to remote computer: {e}")
}
keep_alive.store(false, Ordering::SeqCst);
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
}
if !keep_alive.load(Ordering::SeqCst) {
if !socket_state.stay_connected.load(Ordering::SeqCst) {
info!("Shutting down remote video loop");
break;
}
@ -107,22 +105,23 @@ pub async fn remote_video_loop(
}
}
}
if !keep_alive.load(Ordering::SeqCst) {
if !socket_state.stay_connected.load(Ordering::SeqCst) {
info!("Shutting down remote video loop");
break;
}
}
is_alive.store(false, Ordering::SeqCst);
socket_state.is_connected.store(false, Ordering::SeqCst);
}
#[instrument]
async fn listen_to_messages(
mut recvr: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
to_mec: Sender<ApplicationEvent>,
tracker_state: Arc<Mutex<TrackerState>>,
keep_alive: Arc<AtomicBool>,
socket_state: Arc<SocketState>,
) {
info!("Starting tracker connection listen");
while keep_alive.load(Ordering::SeqCst) {
while socket_state.stay_connected.load(Ordering::SeqCst) {
match recvr.try_next().await {
Ok(Some(message)) => {
let (x_off, y_off, _do_send) =
@ -143,7 +142,8 @@ async fn listen_to_messages(
.await
{
error!("Could not send message to MEC, assuming critical failure: {e}");
keep_alive.store(false, Ordering::SeqCst);
socket_state.is_connected.store(false, Ordering::SeqCst);
socket_state.stay_connected.store(false, Ordering::SeqCst);
return;
}
}
@ -156,7 +156,7 @@ async fn listen_to_messages(
}
info!(
"Stopping tracker connection listen with keep alive: {}",
keep_alive.load(Ordering::SeqCst)
socket_state.stay_connected.load(Ordering::SeqCst)
);
}
@ -164,10 +164,11 @@ fn get_video_frame(
appsink: &AppSink,
video_info: &VideoInfo,
) -> Result<VideoFrame<Readable>, String> {
let sample = appsink
let buffer = appsink
.pull_sample()
.map_err(|e| format!("Could not pull appsink sample: {e}"))?;
let buffer = sample.buffer_owned().unwrap();
.map_err(|e| format!("Could not pull appsink sample: {e}"))?
.buffer_owned()
.ok_or(format!("Could not get owned buffer from appsink"))?;
gstreamer_video::VideoFrame::from_buffer_readable(buffer, video_info)
.map_err(|_| format!("Unable to make video frame from buffer!"))
}

View file

@ -6,6 +6,7 @@ use snafu::prelude::*;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct WebcamPipeline {
pub pipeline: Pipeline,

View file

@ -6,7 +6,7 @@ use tracing::{self, info, Level};
use tracing_appender;
use tracing_subscriber;
use crate::config::load_config;
use crate::config::{load_config, AppConfig};
mod config;
mod coordinator;
@ -20,16 +20,16 @@ fn main() -> glib::ExitCode {
// set the environment var to make gtk use window's default action bar
env::set_var("gtk_csd", "0");
let file_appender = tracing_appender::rolling::daily(".", "joystick-log.log");
let file_appender = tracing_appender::rolling::daily(".", "joystick-log");
let (non_blocking, _gaurd) = tracing_appender::non_blocking(file_appender);
tracing_subscriber::fmt().with_writer(non_blocking).init();
let span = tracing::span!(Level::TRACE, "main");
let _enter = span.enter();
info!("tracing intialized");
info!("Logging intialized");
let config = Arc::new(RwLock::new(load_config()));
let config: Arc<RwLock<AppConfig>> = Arc::new(RwLock::new(load_config()));
gstreamer::init().expect("Unable to start gstreamer");
gstgtk4::plugin_register_static().expect("Unable to register gtk4 plugin");

View file

@ -19,6 +19,7 @@ use tokio_tungstenite::{
accept_async,
tungstenite::{Error, Message, Result},
};
use tracing::instrument;
mod remote_source;
@ -27,6 +28,7 @@ use crate::{
ui::NormalizedBoxCoords,
};
#[derive(Debug)]
pub struct TrackerState {
pub tracking_id: u32,
pub last_detect: Instant,
@ -37,6 +39,7 @@ pub struct TrackerState {
pub identity_boxes: Vec<NormalizedBoxCoords>,
}
#[instrument]
pub async fn start_socketserver(
rt: Handle,
mec: Sender<ApplicationEvent>,
@ -64,6 +67,7 @@ pub async fn start_socketserver(
stay_alive.store(false, Ordering::SeqCst);
}
#[instrument]
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
@ -78,6 +82,7 @@ async fn accept_connection(
}
}
#[instrument]
async fn handle_connection(
peer: SocketAddr,
stream: TcpStream,

View file

@ -1,8 +1,9 @@
use std::fmt::Display;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use gtk::cairo::Context;
use gtk::gdk::{Display, Paintable};
use gtk::gdk::Paintable;
use gtk::glib::clone;
use gtk::{gio, glib, prelude::*, AspectFrame, CssProvider, Label, ListBox};
use gtk::{Application, ApplicationWindow};
@ -27,6 +28,7 @@ pub enum GuiUpdate {
UpdatePaintable(gstreamer::Element),
}
#[derive(Debug)]
pub struct BoxCoords {
pub id: u32,
pub x1: u32,
@ -35,6 +37,17 @@ pub struct BoxCoords {
pub y2: u32,
}
impl Display for BoxCoords {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Absolute Box {}, x1: {}, y1: {}, x2: {}, y2: {}",
self.id, self.x1, self.y1, self.x2, self.y2
)
}
}
#[derive(Debug)]
pub struct NormalizedBoxCoords {
pub id: u32,
pub x1: f32,
@ -55,12 +68,22 @@ impl NormalizedBoxCoords {
}
}
impl Display for NormalizedBoxCoords {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Normalized Box {}, x1: {}, y1: {}, x2: {}, y2: {}",
self.id, self.x1, self.y1, self.x2, self.y2
)
}
}
pub fn on_activate(app: &Application) {
let provider = CssProvider::new();
provider.load_from_string(include_str!("../../style.css"));
gtk::style_context_add_provider_for_display(
&Display::default().expect("Could not connect to a display"),
&gtk::gdk::Display::default().expect("Could not connect to a display"),
&provider,
gtk::STYLE_PROVIDER_PRIORITY_APPLICATION,
);

View file

@ -10,10 +10,14 @@ use gtk::{
Box, Label, ListItem, ListView, ScrolledWindow, SignalListItemFactory, SingleSelection,
StringList, StringObject, ToggleButton, Widget,
};
use log::{debug, error};
use tracing::{error, instrument};
#[cfg(feature = "tracker-state-debug")]
use tracing::debug;
use crate::{coordinator::ApplicationEvent, remote_sources::TrackerState};
#[derive(Debug)]
pub struct TrackerPanel {
top_level: Box,
@ -117,6 +121,7 @@ impl TrackerPanel {
&self.top_level
}
#[instrument]
pub fn connect_button_callback(&self, to_mec: Sender<ApplicationEvent>) {
self.enable_disable.connect_clicked(move |button| {
if let Err(e) =