use crate::messages::{ClientMessage, ServerMessage}; use async_std::sync::Arc; use async_std::sync::Mutex; use std::collections::HashMap; use async_std::net::{SocketAddr, TcpListener, TcpStream}; use async_std::sync::{Receiver, Sender}; use async_std::task; use async_tungstenite::{accept_async, WebSocketStream}; use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt}; #[derive(Debug)] pub struct NetworkMessage { pub conn_id: usize, pub data: NetworkPayload, } #[derive(Debug)] pub enum NetworkPayload { Connected, Disconnected, Message(T), InvalidFormat(String), } struct Connection { stream: SplitSink, tungstenite::Message>, address: SocketAddr, conn_id: usize, } pub struct NetworkManager { connections: HashMap, } impl NetworkManager { pub fn new() -> Self { NetworkManager { connections: HashMap::new(), } } fn register(&mut self, conn: Connection) { self.connections.insert(conn.conn_id, conn); } async fn send(&mut self, message: NetworkMessage) { let conn = self .connections .get_mut(&message.conn_id) .expect("cant send message to an unregistered connection"); if let NetworkPayload::Message(msg) = message.data { let res = conn.stream.send(msg.into()).await; if let Err(err) = res { log::error!("Error sending message over to game instance: {:?}", err) } } } } pub async fn listen( bind: String, incoming: Sender>, outgoing: Receiver>, ) { let network = Arc::new(Mutex::new(NetworkManager::new())); let listener = TcpListener::bind(&bind).await.expect("Can't listen"); log::info!("Listening on: {}", bind); task::spawn(write_loop(network.clone(), outgoing)); while let Ok((stream, _)) = listener.accept().await { let peer = stream .peer_addr() .expect("connected streams should have a peer address"); log::info!("Peer address: {}", peer); match accept_connection(peer, stream, incoming.clone()).await { Ok(conn) => { network.lock().await.register(conn); } Err(err) => log::warn!("failed connection from {:?}: {:?}", peer, err), } } } async fn accept_connection( peer: SocketAddr, stream: TcpStream, incoming: Sender>, ) -> Result { // Generate random connection id let conn_id = rand::random(); // Get streams let ws_stream = accept_async(stream).await?; let (ws_sender, ws_receiver) = ws_stream.split(); // Send event to game instance incoming .send(NetworkMessage { conn_id, data: NetworkPayload::Connected, }) .await; // Start read loop task::spawn(read_loop(ws_receiver, incoming, conn_id)); // Return connection instance for network manager Ok(Connection { stream: ws_sender, address: peer, conn_id, }) } async fn read_loop( mut stream: SplitStream>, ch: Sender>, conn_id: usize, ) { loop { let recv = stream.next().await; match recv { // Handle close Some(Ok(tungstenite::Message::Close(reason))) => { ch.send(NetworkMessage { conn_id, data: NetworkPayload::Disconnected, }) .await; log::info!("{:?} left for: {:?}", conn_id, reason); break; } // Message received Some(Ok(message)) => match ClientMessage::try_read(message) { Ok(msg) => { ch.send(NetworkMessage { conn_id, data: NetworkPayload::Message(msg), }) .await; } Err(err) => { log::warn!("[{:?}] invalid message received: {:?}", conn_id, err); // Ask server to notify client about them fucking up ch.send(NetworkMessage { conn_id, data: NetworkPayload::InvalidFormat(err.to_string()), }) .await; } }, // Errors Some(Err(err)) => { log::warn!("error received in read loop: {:?}", err); break; } None => break, } } log::info!("stopped read loop for {:?}", conn_id); } async fn write_loop(net: Arc>, ch: Receiver>) { loop { match ch.recv().await { Ok(msg) => { net.lock().await.send(msg).await; } Err(err) => { log::warn!("write loop received error: {:?}", err); break; } } } }