From a20b3fff6cdf825a256c5b6dceadd1dfb13b459d Mon Sep 17 00:00:00 2001 From: Nickiel12 Date: Sat, 20 Jul 2024 22:35:04 +0000 Subject: [PATCH] set up basic sockets --- Cargo.lock | 344 +++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 10 +- src/config.rs | 61 ++++++++ src/core/mod.rs | 114 ++++++++++++++ src/main.rs | 84 +++++++++- src/satellite_state.rs | 102 ++++++++++++ 6 files changed, 700 insertions(+), 15 deletions(-) create mode 100644 src/config.rs create mode 100644 src/core/mod.rs create mode 100644 src/satellite_state.rs diff --git a/Cargo.lock b/Cargo.lock index bc29bc6..d20f3e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,26 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "config" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be" +dependencies = [ + "async-trait", + "convert_case", + "json5", + "lazy_static", + "nom", + "pathdiff", + "ron", + "rust-ini", + "serde", + "serde_json", + "toml", + "yaml-rust", +] + [[package]] name = "console-api" version = "0.7.0" @@ -263,6 +283,35 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "core-foundation-sys" version = "0.8.6" @@ -302,6 +351,12 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + [[package]] name = "crypto-common" version = "0.1.6" @@ -373,6 +428,15 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "either" version = "1.13.0" @@ -410,6 +474,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -417,6 +496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -425,6 +505,23 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -454,10 +551,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -515,6 +615,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" + [[package]] name = "hashbrown" version = "0.14.5" @@ -535,17 +641,10 @@ dependencies = [ ] [[package]] -name = "helloworld" -version = "0.1.0" -dependencies = [ - "console-subscriber", - "obws", - "tokio", - "tokio-tungstenite 0.23.1", - "tracing", - "tracing-appender", - "tracing-subscriber", -] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermit-abi" @@ -731,6 +830,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json5" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1" +dependencies = [ + "pest", + "pest_derive", + "serde", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -743,6 +853,12 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "log" version = "0.4.22" @@ -862,6 +978,7 @@ version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69c2e1963f53ef1ed7bceeddc884ce0c7c41d424aa7eb17bd3fe35e8c0646a3d" dependencies = [ + "async-stream", "base64 0.22.1", "bitflags 2.6.0", "futures-util", @@ -886,18 +1003,79 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ordered-multimap" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e" +dependencies = [ + "dlv-list", + "hashbrown 0.13.2", +] + [[package]] name = "overload" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "pathdiff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd" + [[package]] name = "percent-encoding" version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -1072,6 +1250,28 @@ version = "0.8.45" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ade4539f42266ded9e755c605bdddf546242b2c961b03b06a7375260788a0523" +[[package]] +name = "ron" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94" +dependencies = [ + "base64 0.21.7", + "bitflags 2.6.0", + "serde", + "serde_derive", +] + +[[package]] +name = "rust-ini" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -1141,6 +1341,15 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_spanned" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0" +dependencies = [ + "serde", +] + [[package]] name = "serde_with" version = "3.9.0" @@ -1217,6 +1426,27 @@ version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +[[package]] +name = "snafu" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "socket2" version = "0.5.7" @@ -1311,6 +1541,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -1413,6 +1652,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac2caab0bf757388c6c0ae23b3293fdb463fee59434529014f85e3263b995c28" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788" +dependencies = [ + "indexmap 2.2.6", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.11.0" @@ -1594,6 +1867,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -1615,6 +1894,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" + [[package]] name = "url" version = "2.5.2" @@ -1647,6 +1932,25 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcs-obs-satellite" +version = "0.1.0" +dependencies = [ + "config", + "console-subscriber", + "futures", + "futures-util", + "obws", + "serde", + "snafu", + "tokio", + "tokio-tungstenite 0.23.1", + "toml", + "tracing", + "tracing-appender", + "tracing-subscriber", +] + [[package]] name = "version_check" version = "0.9.4" @@ -1891,3 +2195,21 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "winnow" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" +dependencies = [ + "memchr", +] + +[[package]] +name = "yaml-rust" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" +dependencies = [ + "linked-hash-map", +] diff --git a/Cargo.toml b/Cargo.toml index a074e95..ff60bcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,15 +1,21 @@ [package] -name = "helloworld" +name = "vcs-obs-satellite" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +config = "0.14.0" console-subscriber = "0.3.0" -obws = "0.12.0" +futures = "0.3.30" +futures-util = { version = "0.3.30", features = ["sink"] } +obws = { version = "0.12.0", features = ["events"] } +serde = { version = "1.0.204", features = ["serde_derive"] } +snafu = "0.8.4" tokio = { version = "1.38.1", features = ["rt-multi-thread"] } tokio-tungstenite = "0.23.1" +toml = "0.8.15" tracing = "0.1.40" tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..1614267 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,61 @@ +use config::{Config, FileFormat}; +use serde::{Deserialize, Serialize}; +use snafu::prelude::*; +use std::fs::File; +use std::io::Write; +use tracing::{info, instrument}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AppConfig { + pub core_ip: String, + pub core_port: u32, + + pub obs_ip: String, + pub obs_port: u16, + pub obs_password: String, +} + +impl Default for AppConfig { + fn default() -> Self { + AppConfig { + core_ip: "localhost".to_string(), + core_port: 4856, + + obs_ip: "localhost".to_string(), + obs_port: 4455, + obs_password: "EbEIsHn7lxnN73Hy".to_string(), + } + } +} + +pub fn load_config() -> AppConfig { + Config::builder() + .add_source(config::File::new("./settings.toml", FileFormat::Toml)) + .build() + .and_then(|val| val.try_deserialize()) + .unwrap_or_default() +} + +#[derive(Debug, Snafu)] +pub enum SaveConfigError { + #[snafu(display("Could not serialize app state: {source}"))] + SerdeError { source: toml::ser::Error }, + #[snafu(display("Could not write app state to file: {path}"))] + IoError { + source: std::io::Error, + path: String, + }, +} + +#[instrument] +pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> { + let toml_str = toml::to_string(&config).context(SerdeSnafu)?; + let mut file = File::create("./settings.toml").context(IoSnafu { + path: "./settings.toml", + })?; + file.write_all(toml_str.as_bytes()).context(IoSnafu { + path: "./settings.toml", + })?; + info!("Config file saved successfully"); + Ok(()) +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..7a942d3 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,114 @@ + +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use tokio::sync::{ + mpsc, oneshot, + watch::{self, Receiver}, +}; +use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tracing::{error, info, instrument, warn}; + +use crate::AppEvent; + +#[derive(Debug)] +pub struct ConnectionCoreParam { + pub connection_string: String, + pub event_loop_tx: mpsc::Sender, + // Main Event Channel + pub shutdown_rx: watch::Receiver, +} + +#[instrument] +pub async fn connect_to_core( + params: ConnectionCoreParam, + on_connection_oneshot: oneshot::Sender>, +) { + info!("Starting core communication process"); + + match connect_async(¶ms.connection_string).await { + Err(e) => warn!("Could not connect to core: {e}"), + Ok((websocket, _)) => { + let (to_core_tx, to_core_rx) = mpsc::channel::(5); + + info!("Connection to core made successfully"); + let (mut sender, recvr) = websocket.split(); + + if on_connection_oneshot.send(to_core_tx).is_err() { + error!("Could not give to_core_tx to oneshot!"); + return; + } + + tokio::spawn(handle_websocket_messages( + params.event_loop_tx.clone(), + recvr, + )); + handle_to_socket(&mut sender, to_core_rx, params.shutdown_rx.clone()).await; + } + } + + info!("Closing down core communication process"); +} + +pub async fn handle_to_socket( + websocket: &mut SplitSink>, Message>, + mut from_eventloop: mpsc::Receiver, + shutdown_rx: Receiver, +) { + loop { + if *shutdown_rx.borrow() { + break; + } + match from_eventloop.recv().await { + None => { + info!("event loop -> core channel closed"); + break; + } + Some(mess) => match websocket.send(mess).await { + Ok(_) => {} + Err(e) => { + error!("Error sending message to Core: {e}"); + if let Err(e) = websocket.close().await { + info!("Could not send close state to websocket: {e}"); + } + break; + } + }, + } + } + info!("Event Loop -> Core channel closed"); +} + +#[instrument(skip(event_tx, recvr))] +pub async fn handle_websocket_messages( + event_tx: mpsc::Sender, + mut recvr: SplitStream>>, +) { + while let Some(mess) = recvr.next().await { + match mess { + Err(e) => { + error!("There was an error receiving from Core: {e}"); + break; + } + Ok(mess) => match mess { + Message::Ping(_) | Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => { + // Do nothing with these frames, yet + } + Message::Text(mess) => { + info!("Received text from core: {}", mess); + if let Err(e) = event_tx.send(AppEvent::BoringLog(mess)).await { + error!("Could not send event to event channel! {e}"); + break; + } + } + Message::Close(_) => { + info!("Core websocket close frame received"); + break; + } + }, + } + } + // we cannot close the stream, only the sink + info!("Core websocket and Core -> Event Loop closing"); +} diff --git a/src/main.rs b/src/main.rs index 6e4fa05..2b66c5e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,83 @@ -fn main() { - println!("Hello, world!"); + +use std::time::Duration; + +use tokio::sync::{mpsc::{self, error::TryRecvError}, watch}; +use tracing::{error, info, warn}; + +mod config; +mod core; +mod satellite_state; + +use satellite_state::SatelliteState; + +pub enum AppEvent { + BoringLog(String), + Close, +} + +#[tokio::main] +async fn main() { + // #[cfg(all(not(feature = "tokio-debug"), debug_assertions))] + { + let sub = tracing_subscriber::FmtSubscriber::new(); + if let Err(e) = tracing::subscriber::set_global_default(sub) { + panic!("Could not set tracing global: {e}"); + } + } + #[cfg(feature = "tokio-debug")] + { + console_subscriber::init(); + } + + info!("Starting program"); + + let config = config::load_config(); + + let (shutdown_tx, shutdown_rx) = watch::channel::(false); + + let (tx, rx) = mpsc::channel::(10); + + let mut state = SatelliteState { + config, + shutdown_tx, + shutdown_rx, + events_tx: tx, + events_rx: rx, + + to_core: None, + obs_client: None, + to_core_oneshot: None, + }; + + loop { + state.check_sockets().await; + + match state.events_rx.try_recv() { + Err(e) => { + match e { + TryRecvError::Empty => { + tokio::time::sleep(Duration::from_millis(500)).await; + }, + TryRecvError::Disconnected => { + error!("events mpsc was closed! Exiting main thread"); + break; + } + } + } + Ok(res) => { + + } + } + + } + + // let version = client.general().version().await.unwrap(); + // info!("Connected to OBS version: {:?}", version); + + // let scene_list = client.scenes().list().await.unwrap(); + // info!("Available scenes are: {:?}", scene_list); + + if let Err(_) = state.shutdown_tx.send(true) { + warn!("Could not send shutdown signal on shutdown_tx! Oneshot already used"); + } } diff --git a/src/satellite_state.rs b/src/satellite_state.rs new file mode 100644 index 0000000..d1f54fc --- /dev/null +++ b/src/satellite_state.rs @@ -0,0 +1,102 @@ +use futures_util::StreamExt; +use obws::Client; +use tokio::sync::{mpsc, oneshot, watch}; +use tokio_tungstenite::tungstenite::Message; +use tracing::{info, instrument, warn}; + +use crate::{config::AppConfig, core::{connect_to_core, ConnectionCoreParam}, AppEvent}; + +pub struct SatelliteState { + pub config: AppConfig, + pub obs_client: Option, + + pub shutdown_tx: watch::Sender, + pub shutdown_rx: watch::Receiver, + + pub events_tx: mpsc::Sender, + pub events_rx: mpsc::Receiver, + + pub to_core: Option>, + pub to_core_oneshot: Option>>, +} + +impl SatelliteState { + pub async fn check_sockets(&mut self) { + if self.obs_client.is_none() { + self.obs_client = match Client::connect( + self.config.obs_ip.clone(), + self.config.obs_port, + Some(self.config.obs_password.clone()), + ) + .await + { + Err(e) => { + info!("Failed to connect to obs: {e}"); + None + } + Ok(client) => Some(client), + } + } + + if self.to_core.is_none() { + self.start_core_socket().await; + } + + if let Some(mut os) = self.to_core_oneshot.take() { + if let Ok(res) = os.try_recv() { + self.to_core = Some(res); + } else { + self.to_core_oneshot = Some(os); + } + } + } + + pub async fn start_core_socket(&mut self) { + let param = ConnectionCoreParam { + shutdown_rx: self.shutdown_rx.clone(), + event_loop_tx: self.events_tx.clone(), + connection_string: format!("ws://{}:{}", self.config.core_ip, self.config.core_port.to_string()), + }; + + let (tx, rx) = oneshot::channel::>(); + + self.to_core_oneshot = Some(rx); + + tokio::spawn(connect_to_core(param, tx)); + + } + + #[instrument(skip(obs_client, event_tx))] + pub async fn start_obs_events(obs_client: &Client, event_tx: mpsc::Sender) { + let event_stream = obs_client.events(); + match event_stream { + Err(e) => { + warn!("Could not open event_stream from obs: {e}"); + } + Ok(es) => { + info!("Started obs even stream"); + futures_util::pin_mut!(es); + + while let Some(event) = es.next().await { + match event { + _ => { + match event_tx + .send(AppEvent::BoringLog(format!( + "Get some kind of obs event! {:#?}", + event + ))) + .await + { + Err(e) => { + warn!("event_tx closed, exiting: {e}"); + break; + } + Ok(_) => {} + } + } + } + } + } + } + } +}