only one socket handler thread
This commit is contained in:
parent
6c8f5c44f7
commit
b3ce6858e9
1 changed files with 23 additions and 25 deletions
|
@ -10,7 +10,6 @@ pub struct Socket{
|
||||||
socket_txs: Arc<Mutex<Vec<Arc<TcpStream>>>>,
|
socket_txs: Arc<Mutex<Vec<Arc<TcpStream>>>>,
|
||||||
stop_listener_flag: sync_flag::SyncFlagTx,
|
stop_listener_flag: sync_flag::SyncFlagTx,
|
||||||
handle_connections_join_handle: Option<JoinHandle<()>>,
|
handle_connections_join_handle: Option<JoinHandle<()>>,
|
||||||
pub has_closed: workctl::sync_flag::SyncFlagRx,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Socket {
|
impl Socket {
|
||||||
|
@ -20,57 +19,56 @@ impl Socket {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_connections(listener: TcpListener, messenger_tx: Sender<String>) -> Self {
|
pub fn handle_connections(listener: TcpListener, messenger_tx: Sender<String>) -> Self {
|
||||||
let (mut has_closed_tx, has_closed_rx) = sync_flag::new_syncflag(false);
|
|
||||||
let (tx, thread_stop_flag) = sync_flag::new_syncflag(true);
|
let (tx, thread_stop_flag) = sync_flag::new_syncflag(true);
|
||||||
let socket_streams = Arc::new(Mutex::new(vec![]));
|
let socket_streams = Arc::new(Mutex::new(vec![]));
|
||||||
|
|
||||||
let thread_owned_streams = Arc::clone(&socket_streams);
|
let thread_owned_streams = Arc::clone(&socket_streams);
|
||||||
|
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
listener.set_nonblocking(true).unwrap();
|
listener.set_nonblocking(true).unwrap();
|
||||||
|
let mut service_sockets: Vec<Arc<TcpStream>> = Vec::new();
|
||||||
while thread_stop_flag.get() {
|
while thread_stop_flag.get() {
|
||||||
for (strm, _addr) in listener.accept() {
|
for (s, _addr) in listener.accept() {
|
||||||
//create thread-passable pointer of current socket stream
|
s.set_nonblocking(true).unwrap();
|
||||||
let stream = Arc::new(strm);
|
let stream = Arc::new(s);
|
||||||
//get lock on list of thread-passable pointers
|
|
||||||
let mut streams = thread_owned_streams.lock().unwrap();
|
let mut streams = thread_owned_streams.lock().unwrap();
|
||||||
streams.push(Arc::clone(&stream));
|
streams.push(Arc::clone(&stream));
|
||||||
//pass off a clone of the thread-passable pointer
|
service_sockets.push(Arc::clone(&stream));
|
||||||
drop(streams);
|
drop(streams);
|
||||||
Socket::handle_client(Arc::clone(&stream), messenger_tx.clone(), thread_stop_flag.clone());
|
|
||||||
}
|
}
|
||||||
|
Socket::service_clients(&mut service_sockets, messenger_tx.clone());
|
||||||
thread::sleep(Duration::from_millis(100));
|
thread::sleep(Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
println!("closed socket loop");
|
println!("closed socket loop");
|
||||||
has_closed_tx.set(true);
|
|
||||||
drop(listener);
|
drop(listener);
|
||||||
});
|
});
|
||||||
|
|
||||||
Socket {
|
Socket {
|
||||||
socket_txs: socket_streams,
|
socket_txs: socket_streams,
|
||||||
stop_listener_flag: tx,
|
stop_listener_flag: tx,
|
||||||
handle_connections_join_handle: Some(handle),
|
handle_connections_join_handle: Some(handle),
|
||||||
has_closed: has_closed_rx
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_client(stream: Arc<TcpStream>, update_tx: Sender<String>, program_shutdown_flag: sync_flag::SyncFlagRx) {
|
pub fn service_clients(streams: &mut Vec<Arc<TcpStream>>, update_tx: Sender<String>) {
|
||||||
let mut buffer = [0; 1024];
|
let mut buffer = [0; 1024];
|
||||||
stream.set_read_timeout(Some(Duration::from_millis(10))).expect("Could not set a read timeout");
|
let mut remove = Vec::new();
|
||||||
while program_shutdown_flag.get() {
|
for i in 0..streams.len() {
|
||||||
match stream.as_ref().read(&mut buffer) {
|
let resp = streams.get(i).as_ref().unwrap().as_ref().read(&mut buffer);
|
||||||
Err(_) => {},
|
if resp.is_ok() {
|
||||||
Ok(read_size) => {
|
let msg_len = resp.unwrap();
|
||||||
//Tcp is supposed to have a 0 byte read if closed by client
|
if msg_len == 0 {
|
||||||
if read_size == 0 || !program_shutdown_flag.get() {
|
remove.push(i);
|
||||||
break;
|
} else {
|
||||||
|
update_tx.send(String::from_utf8_lossy(&buffer[0..msg_len]).into_owned()).unwrap();
|
||||||
} else {
|
|
||||||
let output = String::from_utf8_lossy(&buffer[0..read_size]);
|
|
||||||
update_tx.send(output.into_owned()).unwrap();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stream.shutdown(Shutdown::Both).unwrap();
|
for i in remove.iter() {
|
||||||
|
streams.get(*i).unwrap().shutdown(Shutdown::Both).unwrap();
|
||||||
|
streams.remove(*i);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn close(&mut self) {
|
pub fn close(&mut self) {
|
||||||
|
|
Loading…
Reference in a new issue