diff --git a/src/modules/io_handler.rs b/src/modules/io_handler.rs index e69de29..a2db02f 100644 --- a/src/modules/io_handler.rs +++ b/src/modules/io_handler.rs @@ -0,0 +1,82 @@ +use std::{sync::mpsc::{Sender, self, Receiver}, thread::{JoinHandle, self}}; + +use super::{message_handler::{MessageHandler, StateMessage}, stream_states::{enums::StateUpdate, stream_states_class::StreamState}}; + + +pub struct IOHandler { + pub listener_join_handler: Option>, + pub message_thread_tx: Option>, + data_rx: Option>, +} + +impl IOHandler { + pub fn new() -> Self { + IOHandler{ + ..Default::default() + } + } + + pub fn start_socket_listener(&mut self, mut message_handler: impl MessageHandler + Send + 'static, socket_addr: &str) { + let (tx, rx) = mpsc::channel::(); + let (tx_1, rx_1) = mpsc::channel::(); + self.message_thread_tx = Some(tx); + self.data_rx = Some(rx_1); + + let handle = thread::spawn(move || { + for message in rx { + match message { + StateMessage::StateUpdate(message) => { + message_handler.handle_update(message); + }, + StateMessage::GetStates => { + tx_1.send(message_handler.get_states()).unwrap(); + } + StateMessage::CloseListener => { + tx_1.send(message_handler.get_states()).unwrap(); + break;} + } + } + + }); + self.listener_join_handler = Some(handle); + } + + pub fn get_states(&self) -> StreamState { + let tx = self.message_thread_tx.clone(); + match tx { + Some(tx) => { + tx.send(StateMessage::GetStates).unwrap(); + let rx = self.data_rx.as_ref(); + match rx { + Some(rx) => { + let message = rx.recv().unwrap(); + message + }, + None => {panic!("Trying to get data_rx before IOHandler has it!");} + } + }, + None => {panic!("trying to access states before IOHandler has a handle for transmitting");} + } + } + + pub fn close(self) -> StreamState { + let state = self.get_states(); + assert_eq!(self.listener_join_handler.and_then(|f| { + let tx = self.message_thread_tx.clone().unwrap(); + tx.send(StateMessage::CloseListener).unwrap(); + f.join().unwrap(); + Some(true) + }).unwrap(), true); + state + } +} + +impl Default for IOHandler { + fn default() -> Self { + IOHandler { + message_thread_tx: None, + listener_join_handler: None, + data_rx: None, + } + } +} \ No newline at end of file diff --git a/src/tests/io_handler_tests.rs b/src/tests/io_handler_tests.rs index e69de29..00db4e7 100644 --- a/src/tests/io_handler_tests.rs +++ b/src/tests/io_handler_tests.rs @@ -0,0 +1,21 @@ +use std::thread; + +use crate::modules::{io_handler::{IOHandler}, message_handler::{MessageHandler, StateMessage}, stream_states::{stream_states_class::StreamState, enums::StateUpdate}}; + + +#[test] +fn test_make_socket() { + let state = StreamState::new(); + let mut io_handler = IOHandler::new(); + + io_handler.start_socket_listener(state, "no-one cares"); + + let tx = io_handler.message_thread_tx.clone().unwrap(); + tx.send(StateMessage::StateUpdate(StateUpdate::SceneIsAugmented(true))).unwrap(); + tx.send(StateMessage::StateUpdate(StateUpdate::StreamIsMuted(true))).unwrap(); + thread::sleep(std::time::Duration::from_millis(1000)); + + let final_state = io_handler.close(); + assert_eq!(final_state.scene_is_augmented, true); + assert_eq!(final_state.stream_is_muted, true); +} \ No newline at end of file