diff --git a/src/modules/socket_handler.rs b/src/modules/socket_handler.rs index 6994630..68f31a8 100644 --- a/src/modules/socket_handler.rs +++ b/src/modules/socket_handler.rs @@ -10,6 +10,7 @@ pub struct Socket{ socket_txs: Arc>>>, stop_listener_flag: sync_flag::SyncFlagTx, handle_connections_join_handle: Option>, + pub has_closed: workctl::sync_flag::SyncFlagRx, } impl Socket { @@ -19,6 +20,7 @@ impl Socket { } pub fn handle_connections(listener: TcpListener, messenger_tx: Sender) -> Self { + let (mut has_closed_tx, has_closed_rx) = sync_flag::new_syncflag(false); let (tx, thread_stop_flag) = sync_flag::new_syncflag(true); let socket_streams = Arc::new(Mutex::new(vec![])); @@ -27,23 +29,28 @@ impl Socket { listener.set_nonblocking(true).unwrap(); while thread_stop_flag.get() { for (strm, _addr) in listener.accept() { + //create thread-passable pointer of current socket stream let stream = Arc::new(strm); + //get lock on list of thread-passable pointers let mut streams = thread_owned_streams.lock().unwrap(); streams.push(Arc::clone(&stream)); - Socket::handle_client(stream.as_ref(), messenger_tx.clone(), thread_stop_flag.clone()); + //pass off a clone of the thread-passable pointer + Socket::handle_client(stream.as_ref(), messenger_tx.clone(), thread_stop_flag.clone(), Arc::clone(&thread_owned_streams)); } thread::sleep(Duration::from_millis(100)); } + has_closed_tx.set(true); drop(listener); }); Socket { socket_txs: socket_streams, stop_listener_flag: tx, handle_connections_join_handle: Some(handle), + has_closed: has_closed_rx } } - pub fn handle_client(mut stream: &TcpStream, update_tx: Sender, program_shutdown_flag: sync_flag::SyncFlagRx) { + pub fn handle_client(mut stream: &TcpStream, update_tx: Sender, program_shutdown_flag: sync_flag::SyncFlagRx, thread_owned_streams: Arc>>>) { let mut buffer = [0; 1024]; stream.set_read_timeout(Some(Duration::from_millis(100))).expect("Could not set a read timeout"); while program_shutdown_flag.get() {