set up basic sockets

This commit is contained in:
Nickiel12 2024-07-20 22:35:04 +00:00
parent 317b960cd7
commit a20b3fff6c
6 changed files with 700 additions and 15 deletions

344
Cargo.lock generated
View file

@ -225,6 +225,26 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "config"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7328b20597b53c2454f0b1919720c25c7339051c02b72b7e05409e00b14132be"
dependencies = [
"async-trait",
"convert_case",
"json5",
"lazy_static",
"nom",
"pathdiff",
"ron",
"rust-ini",
"serde",
"serde_json",
"toml",
"yaml-rust",
]
[[package]]
name = "console-api"
version = "0.7.0"
@ -263,6 +283,35 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "convert_case"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "core-foundation-sys"
version = "0.8.6"
@ -302,6 +351,12 @@ version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80"
[[package]]
name = "crunchy"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -373,6 +428,15 @@ dependencies = [
"crypto-common",
]
[[package]]
name = "dlv-list"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
dependencies = [
"const-random",
]
[[package]]
name = "either"
version = "1.13.0"
@ -410,6 +474,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -417,6 +496,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -425,6 +505,23 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]]
name = "futures-executor"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
@ -454,10 +551,13 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@ -515,6 +615,12 @@ version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
[[package]]
name = "hashbrown"
version = "0.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e"
[[package]]
name = "hashbrown"
version = "0.14.5"
@ -535,17 +641,10 @@ dependencies = [
]
[[package]]
name = "helloworld"
version = "0.1.0"
dependencies = [
"console-subscriber",
"obws",
"tokio",
"tokio-tungstenite 0.23.1",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
@ -731,6 +830,17 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json5"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96b0db21af676c1ce64250b5f40f3ce2cf27e4e47cb91ed91eb6fe9350b430c1"
dependencies = [
"pest",
"pest_derive",
"serde",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -743,6 +853,12 @@ version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "log"
version = "0.4.22"
@ -862,6 +978,7 @@ version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69c2e1963f53ef1ed7bceeddc884ce0c7c41d424aa7eb17bd3fe35e8c0646a3d"
dependencies = [
"async-stream",
"base64 0.22.1",
"bitflags 2.6.0",
"futures-util",
@ -886,18 +1003,79 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "ordered-multimap"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ed8acf08e98e744e5384c8bc63ceb0364e68a6854187221c18df61c4797690e"
dependencies = [
"dlv-list",
"hashbrown 0.13.2",
]
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pathdiff"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8835116a5c179084a830efb3adc117ab007512b535bc1a21c991d3b32a6b44dd"
[[package]]
name = "percent-encoding"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "pest"
version = "2.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd53dff83f26735fdc1ca837098ccf133605d794cdae66acfc2bfac3ec809d95"
dependencies = [
"memchr",
"thiserror",
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a548d2beca6773b1c244554d36fcf8548a8a58e74156968211567250e48e49a"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c93a82e8d145725dcbaf44e5ea887c8a869efdcc28706df2d08c69e17077183"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pest_meta"
version = "2.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a941429fea7e08bedec25e4f6785b6ffaacc6b755da98df5ef3e7dcf4a124c4f"
dependencies = [
"once_cell",
"pest",
"sha2",
]
[[package]]
name = "pin-project"
version = "1.1.5"
@ -1072,6 +1250,28 @@ version = "0.8.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade4539f42266ded9e755c605bdddf546242b2c961b03b06a7375260788a0523"
[[package]]
name = "ron"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b91f7eff05f748767f183df4320a63d6936e9c6107d97c9e6bdd9784f4289c94"
dependencies = [
"base64 0.21.7",
"bitflags 2.6.0",
"serde",
"serde_derive",
]
[[package]]
name = "rust-ini"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e2a3bcec1f113553ef1c88aae6c020a369d03d55b58de9869a0908930385091"
dependencies = [
"cfg-if",
"ordered-multimap",
]
[[package]]
name = "rustc-demangle"
version = "0.1.24"
@ -1141,6 +1341,15 @@ dependencies = [
"syn",
]
[[package]]
name = "serde_spanned"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e674e01f999af37c49f70a6ede167a8a60b2503e56c5599532a65baa5969a0"
dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "3.9.0"
@ -1217,6 +1426,27 @@ version = "1.13.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
[[package]]
name = "snafu"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b835cb902660db3415a672d862905e791e54d306c6e8189168c7f3d9ae1c79d"
dependencies = [
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "socket2"
version = "0.5.7"
@ -1311,6 +1541,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinyvec"
version = "1.8.0"
@ -1413,6 +1652,40 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac2caab0bf757388c6c0ae23b3293fdb463fee59434529014f85e3263b995c28"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.22.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "278f3d518e152219c994ce877758516bca5e118eaed6996192a774fb9fbf0788"
dependencies = [
"indexmap 2.2.6",
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
]
[[package]]
name = "tonic"
version = "0.11.0"
@ -1594,6 +1867,12 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "ucd-trie"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
[[package]]
name = "unicode-bidi"
version = "0.3.15"
@ -1615,6 +1894,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202"
[[package]]
name = "url"
version = "2.5.2"
@ -1647,6 +1932,25 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcs-obs-satellite"
version = "0.1.0"
dependencies = [
"config",
"console-subscriber",
"futures",
"futures-util",
"obws",
"serde",
"snafu",
"tokio",
"tokio-tungstenite 0.23.1",
"toml",
"tracing",
"tracing-appender",
"tracing-subscriber",
]
[[package]]
name = "version_check"
version = "0.9.4"
@ -1891,3 +2195,21 @@ name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]]
name = "winnow"
version = "0.6.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1"
dependencies = [
"memchr",
]
[[package]]
name = "yaml-rust"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85"
dependencies = [
"linked-hash-map",
]

View file

@ -1,15 +1,21 @@
[package]
name = "helloworld"
name = "vcs-obs-satellite"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
config = "0.14.0"
console-subscriber = "0.3.0"
obws = "0.12.0"
futures = "0.3.30"
futures-util = { version = "0.3.30", features = ["sink"] }
obws = { version = "0.12.0", features = ["events"] }
serde = { version = "1.0.204", features = ["serde_derive"] }
snafu = "0.8.4"
tokio = { version = "1.38.1", features = ["rt-multi-thread"] }
tokio-tungstenite = "0.23.1"
toml = "0.8.15"
tracing = "0.1.40"
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.18", features = ["tracing-log"] }

61
src/config.rs Normal file
View file

@ -0,0 +1,61 @@
use config::{Config, FileFormat};
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use std::fs::File;
use std::io::Write;
use tracing::{info, instrument};
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct AppConfig {
pub core_ip: String,
pub core_port: u32,
pub obs_ip: String,
pub obs_port: u16,
pub obs_password: String,
}
impl Default for AppConfig {
fn default() -> Self {
AppConfig {
core_ip: "localhost".to_string(),
core_port: 4856,
obs_ip: "localhost".to_string(),
obs_port: 4455,
obs_password: "EbEIsHn7lxnN73Hy".to_string(),
}
}
}
pub fn load_config() -> AppConfig {
Config::builder()
.add_source(config::File::new("./settings.toml", FileFormat::Toml))
.build()
.and_then(|val| val.try_deserialize())
.unwrap_or_default()
}
#[derive(Debug, Snafu)]
pub enum SaveConfigError {
#[snafu(display("Could not serialize app state: {source}"))]
SerdeError { source: toml::ser::Error },
#[snafu(display("Could not write app state to file: {path}"))]
IoError {
source: std::io::Error,
path: String,
},
}
#[instrument]
pub fn save_config(config: &AppConfig) -> Result<(), SaveConfigError> {
let toml_str = toml::to_string(&config).context(SerdeSnafu)?;
let mut file = File::create("./settings.toml").context(IoSnafu {
path: "./settings.toml",
})?;
file.write_all(toml_str.as_bytes()).context(IoSnafu {
path: "./settings.toml",
})?;
info!("Config file saved successfully");
Ok(())
}

114
src/core/mod.rs Normal file
View file

@ -0,0 +1,114 @@
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use tokio::sync::{
mpsc, oneshot,
watch::{self, Receiver},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tracing::{error, info, instrument, warn};
use crate::AppEvent;
#[derive(Debug)]
pub struct ConnectionCoreParam {
pub connection_string: String,
pub event_loop_tx: mpsc::Sender<AppEvent>,
// Main Event Channel
pub shutdown_rx: watch::Receiver<bool>,
}
#[instrument]
pub async fn connect_to_core(
params: ConnectionCoreParam,
on_connection_oneshot: oneshot::Sender<mpsc::Sender<Message>>,
) {
info!("Starting core communication process");
match connect_async(&params.connection_string).await {
Err(e) => warn!("Could not connect to core: {e}"),
Ok((websocket, _)) => {
let (to_core_tx, to_core_rx) = mpsc::channel::<Message>(5);
info!("Connection to core made successfully");
let (mut sender, recvr) = websocket.split();
if on_connection_oneshot.send(to_core_tx).is_err() {
error!("Could not give to_core_tx to oneshot!");
return;
}
tokio::spawn(handle_websocket_messages(
params.event_loop_tx.clone(),
recvr,
));
handle_to_socket(&mut sender, to_core_rx, params.shutdown_rx.clone()).await;
}
}
info!("Closing down core communication process");
}
pub async fn handle_to_socket(
websocket: &mut SplitSink<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>, Message>,
mut from_eventloop: mpsc::Receiver<Message>,
shutdown_rx: Receiver<bool>,
) {
loop {
if *shutdown_rx.borrow() {
break;
}
match from_eventloop.recv().await {
None => {
info!("event loop -> core channel closed");
break;
}
Some(mess) => match websocket.send(mess).await {
Ok(_) => {}
Err(e) => {
error!("Error sending message to Core: {e}");
if let Err(e) = websocket.close().await {
info!("Could not send close state to websocket: {e}");
}
break;
}
},
}
}
info!("Event Loop -> Core channel closed");
}
#[instrument(skip(event_tx, recvr))]
pub async fn handle_websocket_messages(
event_tx: mpsc::Sender<AppEvent>,
mut recvr: SplitStream<WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>>,
) {
while let Some(mess) = recvr.next().await {
match mess {
Err(e) => {
error!("There was an error receiving from Core: {e}");
break;
}
Ok(mess) => match mess {
Message::Ping(_) | Message::Pong(_) | Message::Binary(_) | Message::Frame(_) => {
// Do nothing with these frames, yet
}
Message::Text(mess) => {
info!("Received text from core: {}", mess);
if let Err(e) = event_tx.send(AppEvent::BoringLog(mess)).await {
error!("Could not send event to event channel! {e}");
break;
}
}
Message::Close(_) => {
info!("Core websocket close frame received");
break;
}
},
}
}
// we cannot close the stream, only the sink
info!("Core websocket and Core -> Event Loop closing");
}

View file

@ -1,3 +1,83 @@
fn main() {
println!("Hello, world!");
use std::time::Duration;
use tokio::sync::{mpsc::{self, error::TryRecvError}, watch};
use tracing::{error, info, warn};
mod config;
mod core;
mod satellite_state;
use satellite_state::SatelliteState;
pub enum AppEvent {
BoringLog(String),
Close,
}
#[tokio::main]
async fn main() {
// #[cfg(all(not(feature = "tokio-debug"), debug_assertions))]
{
let sub = tracing_subscriber::FmtSubscriber::new();
if let Err(e) = tracing::subscriber::set_global_default(sub) {
panic!("Could not set tracing global: {e}");
}
}
#[cfg(feature = "tokio-debug")]
{
console_subscriber::init();
}
info!("Starting program");
let config = config::load_config();
let (shutdown_tx, shutdown_rx) = watch::channel::<bool>(false);
let (tx, rx) = mpsc::channel::<AppEvent>(10);
let mut state = SatelliteState {
config,
shutdown_tx,
shutdown_rx,
events_tx: tx,
events_rx: rx,
to_core: None,
obs_client: None,
to_core_oneshot: None,
};
loop {
state.check_sockets().await;
match state.events_rx.try_recv() {
Err(e) => {
match e {
TryRecvError::Empty => {
tokio::time::sleep(Duration::from_millis(500)).await;
},
TryRecvError::Disconnected => {
error!("events mpsc was closed! Exiting main thread");
break;
}
}
}
Ok(res) => {
}
}
}
// let version = client.general().version().await.unwrap();
// info!("Connected to OBS version: {:?}", version);
// let scene_list = client.scenes().list().await.unwrap();
// info!("Available scenes are: {:?}", scene_list);
if let Err(_) = state.shutdown_tx.send(true) {
warn!("Could not send shutdown signal on shutdown_tx! Oneshot already used");
}
}

102
src/satellite_state.rs Normal file
View file

@ -0,0 +1,102 @@
use futures_util::StreamExt;
use obws::Client;
use tokio::sync::{mpsc, oneshot, watch};
use tokio_tungstenite::tungstenite::Message;
use tracing::{info, instrument, warn};
use crate::{config::AppConfig, core::{connect_to_core, ConnectionCoreParam}, AppEvent};
pub struct SatelliteState {
pub config: AppConfig,
pub obs_client: Option<Client>,
pub shutdown_tx: watch::Sender<bool>,
pub shutdown_rx: watch::Receiver<bool>,
pub events_tx: mpsc::Sender<AppEvent>,
pub events_rx: mpsc::Receiver<AppEvent>,
pub to_core: Option<mpsc::Sender<Message>>,
pub to_core_oneshot: Option<oneshot::Receiver<mpsc::Sender<Message>>>,
}
impl SatelliteState {
pub async fn check_sockets(&mut self) {
if self.obs_client.is_none() {
self.obs_client = match Client::connect(
self.config.obs_ip.clone(),
self.config.obs_port,
Some(self.config.obs_password.clone()),
)
.await
{
Err(e) => {
info!("Failed to connect to obs: {e}");
None
}
Ok(client) => Some(client),
}
}
if self.to_core.is_none() {
self.start_core_socket().await;
}
if let Some(mut os) = self.to_core_oneshot.take() {
if let Ok(res) = os.try_recv() {
self.to_core = Some(res);
} else {
self.to_core_oneshot = Some(os);
}
}
}
pub async fn start_core_socket(&mut self) {
let param = ConnectionCoreParam {
shutdown_rx: self.shutdown_rx.clone(),
event_loop_tx: self.events_tx.clone(),
connection_string: format!("ws://{}:{}", self.config.core_ip, self.config.core_port.to_string()),
};
let (tx, rx) = oneshot::channel::<mpsc::Sender<Message>>();
self.to_core_oneshot = Some(rx);
tokio::spawn(connect_to_core(param, tx));
}
#[instrument(skip(obs_client, event_tx))]
pub async fn start_obs_events(obs_client: &Client, event_tx: mpsc::Sender<AppEvent>) {
let event_stream = obs_client.events();
match event_stream {
Err(e) => {
warn!("Could not open event_stream from obs: {e}");
}
Ok(es) => {
info!("Started obs even stream");
futures_util::pin_mut!(es);
while let Some(event) = es.next().await {
match event {
_ => {
match event_tx
.send(AppEvent::BoringLog(format!(
"Get some kind of obs event! {:#?}",
event
)))
.await
{
Err(e) => {
warn!("event_tx closed, exiting: {e}");
break;
}
Ok(_) => {}
}
}
}
}
}
}
}
}