From 6ad9ae8404e36fab1668e86de08073be7b273e67 Mon Sep 17 00:00:00 2001 From: Nickiel12 <35903114+Nickiel12@users.noreply.github.com> Date: Wed, 22 Dec 2021 16:24:21 -0800 Subject: [PATCH] changed so impl send is socket.send --- src/modules/socket_handler.rs | 38 +++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/modules/socket_handler.rs b/src/modules/socket_handler.rs index dfe0674..6994630 100644 --- a/src/modules/socket_handler.rs +++ b/src/modules/socket_handler.rs @@ -1,12 +1,13 @@ use workctl::sync_flag; use std::net::{TcpListener, TcpStream, Shutdown}; use std::io::{Read, Write}; -use crossbeam_channel::{Sender, Receiver}; +use std::sync::{Mutex, Arc}; +use crossbeam_channel::{Sender}; use std::thread::{self, JoinHandle}; use std::time::Duration; pub struct Socket{ - socket_txs: Vec>, + socket_txs: Arc>>>, stop_listener_flag: sync_flag::SyncFlagTx, handle_connections_join_handle: Option>, } @@ -17,27 +18,32 @@ impl Socket { TcpListener::bind(address).unwrap() } - pub fn handle_connections(listener: TcpListener, messenger_tx: Sender, messenger_rx: Receiver) -> Self { + pub fn handle_connections(listener: TcpListener, messenger_tx: Sender) -> Self { let (tx, thread_stop_flag) = sync_flag::new_syncflag(true); - + let socket_streams = Arc::new(Mutex::new(vec![])); + + let thread_owned_streams = Arc::clone(&socket_streams); let handle = thread::spawn(move || { listener.set_nonblocking(true).unwrap(); while thread_stop_flag.get() { - for (stream, _addr) in listener.accept() { - Socket::handle_client(stream, messenger_tx.clone(), messenger_rx.clone(), thread_stop_flag.clone()); + for (strm, _addr) in listener.accept() { + let stream = Arc::new(strm); + let mut streams = thread_owned_streams.lock().unwrap(); + streams.push(Arc::clone(&stream)); + Socket::handle_client(stream.as_ref(), messenger_tx.clone(), thread_stop_flag.clone()); } thread::sleep(Duration::from_millis(100)); } drop(listener); }); Socket { - socket_txs: Vec::>::new(), + socket_txs: socket_streams, stop_listener_flag: tx, handle_connections_join_handle: Some(handle), } } - pub fn handle_client(mut stream: TcpStream, update_tx: Sender, message_rx: Receiver, program_shutdown_flag: sync_flag::SyncFlagRx) { + pub fn handle_client(mut stream: &TcpStream, update_tx: Sender, program_shutdown_flag: sync_flag::SyncFlagRx) { let mut buffer = [0; 1024]; stream.set_read_timeout(Some(Duration::from_millis(100))).expect("Could not set a read timeout"); while program_shutdown_flag.get() { @@ -54,13 +60,6 @@ impl Socket { } } } - match message_rx.try_recv() { - Err(_) => {}, - Ok(message) => { - stream.write(message.as_bytes()).unwrap(); - stream.flush().unwrap(); - } - } } stream.shutdown(Shutdown::Both).unwrap(); } @@ -71,4 +70,13 @@ impl Socket { .take().expect("Called on not running thread") .join().expect("Could not join thread"); } + + pub fn send(&self, message: String) { + let streams = self.socket_txs.lock().unwrap(); + for socket_tx in streams.iter(){ + let mut tx = socket_tx.as_ref(); + tx.write(message.clone().as_bytes()).unwrap(); + tx.flush().unwrap(); + } + } } \ No newline at end of file