From b3ce6858e93df499e2b50d05c3b255d93e0df01c Mon Sep 17 00:00:00 2001 From: Nickiel12 <35903114+Nickiel12@users.noreply.github.com> Date: Sun, 2 Jan 2022 21:37:01 -0800 Subject: [PATCH] only one socket handler thread --- src/modules/socket_handler.rs | 48 +++++++++++++++++------------------ 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/src/modules/socket_handler.rs b/src/modules/socket_handler.rs index 8c5c519..a2ad0b3 100644 --- a/src/modules/socket_handler.rs +++ b/src/modules/socket_handler.rs @@ -10,7 +10,6 @@ pub struct Socket{ socket_txs: Arc>>>, stop_listener_flag: sync_flag::SyncFlagTx, handle_connections_join_handle: Option>, - pub has_closed: workctl::sync_flag::SyncFlagRx, } impl Socket { @@ -20,57 +19,56 @@ impl Socket { } pub fn handle_connections(listener: TcpListener, messenger_tx: Sender) -> Self { - let (mut has_closed_tx, has_closed_rx) = sync_flag::new_syncflag(false); let (tx, thread_stop_flag) = sync_flag::new_syncflag(true); let socket_streams = Arc::new(Mutex::new(vec![])); let thread_owned_streams = Arc::clone(&socket_streams); + let handle = thread::spawn(move || { listener.set_nonblocking(true).unwrap(); + let mut service_sockets: Vec> = Vec::new(); while thread_stop_flag.get() { - for (strm, _addr) in listener.accept() { - //create thread-passable pointer of current socket stream - let stream = Arc::new(strm); - //get lock on list of thread-passable pointers + for (s, _addr) in listener.accept() { + s.set_nonblocking(true).unwrap(); + let stream = Arc::new(s); let mut streams = thread_owned_streams.lock().unwrap(); streams.push(Arc::clone(&stream)); - //pass off a clone of the thread-passable pointer + service_sockets.push(Arc::clone(&stream)); drop(streams); - Socket::handle_client(Arc::clone(&stream), messenger_tx.clone(), thread_stop_flag.clone()); } + Socket::service_clients(&mut service_sockets, messenger_tx.clone()); thread::sleep(Duration::from_millis(100)); } println!("closed socket loop"); - has_closed_tx.set(true); drop(listener); }); + Socket { socket_txs: socket_streams, stop_listener_flag: tx, handle_connections_join_handle: Some(handle), - has_closed: has_closed_rx } } - pub fn handle_client(stream: Arc, update_tx: Sender, program_shutdown_flag: sync_flag::SyncFlagRx) { + pub fn service_clients(streams: &mut Vec>, update_tx: Sender) { let mut buffer = [0; 1024]; - stream.set_read_timeout(Some(Duration::from_millis(10))).expect("Could not set a read timeout"); - while program_shutdown_flag.get() { - match stream.as_ref().read(&mut buffer) { - Err(_) => {}, - Ok(read_size) => { - //Tcp is supposed to have a 0 byte read if closed by client - if read_size == 0 || !program_shutdown_flag.get() { - break; - - } else { - let output = String::from_utf8_lossy(&buffer[0..read_size]); - update_tx.send(output.into_owned()).unwrap(); - } + let mut remove = Vec::new(); + for i in 0..streams.len() { + let resp = streams.get(i).as_ref().unwrap().as_ref().read(&mut buffer); + if resp.is_ok() { + let msg_len = resp.unwrap(); + if msg_len == 0 { + remove.push(i); + } else { + update_tx.send(String::from_utf8_lossy(&buffer[0..msg_len]).into_owned()).unwrap(); } } } - stream.shutdown(Shutdown::Both).unwrap(); + for i in remove.iter() { + streams.get(*i).unwrap().shutdown(Shutdown::Both).unwrap(); + streams.remove(*i); + } + } pub fn close(&mut self) {