changed so socket is an object with internal states
This commit is contained in:
parent
91d267b93d
commit
35bc3df300
3 changed files with 26 additions and 18 deletions
|
@ -1,7 +1,7 @@
|
||||||
use std::{time::Duration};
|
use std::{time::Duration};
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
|
|
||||||
use modules::{socket_handler::Socket, stream_states::stream_states_class::StreamState, message_handler::{MessageHandler, StateMessage}};
|
use modules::{socket_handler::Socket, stream_states::stream_states_class::StreamState, message_handler::{MessageHandler}};
|
||||||
use workctl::sync_flag;
|
use workctl::sync_flag;
|
||||||
|
|
||||||
use crate::modules::stream_states::state_update::StateUpdate;
|
use crate::modules::stream_states::state_update::StateUpdate;
|
||||||
|
@ -19,7 +19,7 @@ fn main() {
|
||||||
let socket_listener = Socket::make_listener(SERVER_ADDRESS);
|
let socket_listener = Socket::make_listener(SERVER_ADDRESS);
|
||||||
let (from_socket_tx, from_socket_rx) = unbounded::<String>();
|
let (from_socket_tx, from_socket_rx) = unbounded::<String>();
|
||||||
let (to_socket_tx, to_socket_rx) = unbounded::<String>();
|
let (to_socket_tx, to_socket_rx) = unbounded::<String>();
|
||||||
let (mut listener_can_run_flag, listener_join_handle) = Socket::handle_connections(socket_listener, from_socket_tx, to_socket_rx);
|
let mut socket = Socket::handle_connections(socket_listener, from_socket_tx, to_socket_rx);
|
||||||
|
|
||||||
let (control_c_flag_tx, control_c_called_flag_rx) = sync_flag::new_syncflag(false);
|
let (control_c_flag_tx, control_c_called_flag_rx) = sync_flag::new_syncflag(false);
|
||||||
|
|
||||||
|
@ -40,9 +40,7 @@ fn main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Close the listener thread
|
socket.close();
|
||||||
listener_can_run_flag.set(false);
|
|
||||||
listener_join_handle.join().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn setup_control_c(mut control_c_flag_tx: sync_flag::SyncFlagTx) {
|
fn setup_control_c(mut control_c_flag_tx: sync_flag::SyncFlagTx) {
|
||||||
|
|
|
@ -6,7 +6,9 @@ use std::thread::{self, JoinHandle};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub struct Socket{
|
pub struct Socket{
|
||||||
|
socket_txs: Vec<Sender<String>>,
|
||||||
|
stop_listener_flag: sync_flag::SyncFlagTx,
|
||||||
|
handle_connections_join_handle: Option<JoinHandle<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Socket {
|
impl Socket {
|
||||||
|
@ -15,7 +17,7 @@ impl Socket {
|
||||||
TcpListener::bind(address).unwrap()
|
TcpListener::bind(address).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_connections(listener: TcpListener, messenger_tx: Sender<String>, messenger_rx: Receiver<String>) -> (sync_flag::SyncFlagTx, JoinHandle<()>){
|
pub fn handle_connections(listener: TcpListener, messenger_tx: Sender<String>, messenger_rx: Receiver<String>) -> Self {
|
||||||
let (tx, thread_stop_flag) = sync_flag::new_syncflag(true);
|
let (tx, thread_stop_flag) = sync_flag::new_syncflag(true);
|
||||||
|
|
||||||
let handle = thread::spawn(move || {
|
let handle = thread::spawn(move || {
|
||||||
|
@ -28,7 +30,11 @@ impl Socket {
|
||||||
}
|
}
|
||||||
drop(listener);
|
drop(listener);
|
||||||
});
|
});
|
||||||
(tx, handle)
|
Socket {
|
||||||
|
socket_txs: Vec::<Sender<String>>::new(),
|
||||||
|
stop_listener_flag: tx,
|
||||||
|
handle_connections_join_handle: Some(handle),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn handle_client(mut stream: TcpStream, update_tx: Sender<String>, message_rx: Receiver<String>, program_shutdown_flag: sync_flag::SyncFlagRx) {
|
pub fn handle_client(mut stream: TcpStream, update_tx: Sender<String>, message_rx: Receiver<String>, program_shutdown_flag: sync_flag::SyncFlagRx) {
|
||||||
|
@ -58,4 +64,11 @@ impl Socket {
|
||||||
}
|
}
|
||||||
stream.shutdown(Shutdown::Both).unwrap();
|
stream.shutdown(Shutdown::Both).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn close(&mut self) {
|
||||||
|
self.stop_listener_flag.set(false);
|
||||||
|
self.handle_connections_join_handle
|
||||||
|
.take().expect("Called on not running thread")
|
||||||
|
.join().expect("Could not join thread");
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -1,4 +1,3 @@
|
||||||
use std::sync::mpsc;
|
|
||||||
use crossbeam_channel::unbounded;
|
use crossbeam_channel::unbounded;
|
||||||
use std::io::{Write, Read};
|
use std::io::{Write, Read};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
@ -35,7 +34,7 @@ fn can_handle_messages() {
|
||||||
let (tx_1, rx_1) = unbounded::<String>();
|
let (tx_1, rx_1) = unbounded::<String>();
|
||||||
let (_stream_tx, stream_rx) = unbounded::<String>();
|
let (_stream_tx, stream_rx) = unbounded::<String>();
|
||||||
|
|
||||||
let (mut flag, connection_handle) = Socket::handle_connections(listener, tx_1, stream_rx);
|
let mut socket = Socket::handle_connections(listener, tx_1, stream_rx);
|
||||||
|
|
||||||
let join_handle = std::thread::spawn(move || {
|
let join_handle = std::thread::spawn(move || {
|
||||||
let mut outgoing = std::net::TcpStream::connect("localhost:5004").unwrap();
|
let mut outgoing = std::net::TcpStream::connect("localhost:5004").unwrap();
|
||||||
|
@ -45,10 +44,10 @@ fn can_handle_messages() {
|
||||||
join_handle.join().unwrap();
|
join_handle.join().unwrap();
|
||||||
thread::sleep(Duration::from_millis(1000));
|
thread::sleep(Duration::from_millis(1000));
|
||||||
|
|
||||||
flag.set(false);
|
|
||||||
connection_handle.join().unwrap();
|
|
||||||
let message = rx_1.recv().unwrap();
|
let message = rx_1.recv().unwrap();
|
||||||
assert_eq!(message, String::from("this is a test"));
|
assert_eq!(message, String::from("this is a test"));
|
||||||
|
|
||||||
|
socket.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -57,7 +56,7 @@ fn can_handle_delayed_messages() {
|
||||||
let (tx_1, rx_1) = unbounded::<String>();
|
let (tx_1, rx_1) = unbounded::<String>();
|
||||||
let (_stream_tx, stream_rx) = unbounded::<String>();
|
let (_stream_tx, stream_rx) = unbounded::<String>();
|
||||||
|
|
||||||
let (mut flag, connection_handle) = Socket::handle_connections(listener, tx_1, stream_rx);
|
let mut socket = Socket::handle_connections(listener, tx_1, stream_rx);
|
||||||
|
|
||||||
let mut outgoing = std::net::TcpStream::connect("localhost:5005").unwrap();
|
let mut outgoing = std::net::TcpStream::connect("localhost:5005").unwrap();
|
||||||
outgoing.write("this is a test1\n".as_bytes()).unwrap();
|
outgoing.write("this is a test1\n".as_bytes()).unwrap();
|
||||||
|
@ -74,8 +73,7 @@ fn can_handle_delayed_messages() {
|
||||||
println!("{}", message);
|
println!("{}", message);
|
||||||
assert_eq!(message, String::from("this is a test3\n"));
|
assert_eq!(message, String::from("this is a test3\n"));
|
||||||
|
|
||||||
flag.set(false);
|
socket.close();
|
||||||
connection_handle.join().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -84,7 +82,7 @@ fn can_send_and_receive_on_stream() {
|
||||||
let (tx_1, rx_1) = unbounded::<String>();
|
let (tx_1, rx_1) = unbounded::<String>();
|
||||||
let (stream_tx, stream_rx) = unbounded::<String>();
|
let (stream_tx, stream_rx) = unbounded::<String>();
|
||||||
|
|
||||||
let (mut close_socket_flag, connection_handle) = Socket::handle_connections(listener, tx_1, stream_rx);
|
let mut socket = Socket::handle_connections(listener, tx_1, stream_rx);
|
||||||
|
|
||||||
let mut outgoing = std::net::TcpStream::connect("localhost:5006").unwrap();
|
let mut outgoing = std::net::TcpStream::connect("localhost:5006").unwrap();
|
||||||
outgoing.set_read_timeout(Some(Duration::from_millis(1000))).expect("couln't set timout");
|
outgoing.set_read_timeout(Some(Duration::from_millis(1000))).expect("couln't set timout");
|
||||||
|
@ -105,6 +103,5 @@ fn can_send_and_receive_on_stream() {
|
||||||
assert_eq!("this is another test!", message.into_owned());
|
assert_eq!("this is another test!", message.into_owned());
|
||||||
|
|
||||||
drop(outgoing);
|
drop(outgoing);
|
||||||
close_socket_flag.set(false);
|
socket.close();
|
||||||
connection_handle.join().unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue