diff --git a/Cargo.lock b/Cargo.lock index 7c27948..9b1eb93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -766,6 +766,7 @@ dependencies = [ "futures-util", "log", "rand", + "rmp-serde", "serde 1.0.116", "serde_derive", "serde_json", @@ -1047,6 +1048,27 @@ dependencies = [ "winapi", ] +[[package]] +name = "rmp" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" +dependencies = [ + "byteorder", + "num-traits 0.2.12", +] + +[[package]] +name = "rmp-serde" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ce7d70c926fe472aed493b902010bccc17fa9f7284145cb8772fd22fdb052d8" +dependencies = [ + "byteorder", + "rmp", + "serde 1.0.116", +] + [[package]] name = "rust-ini" version = "0.13.0" diff --git a/Cargo.toml b/Cargo.toml index 0c77155..393fa7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ env_logger = "0.7" futures-util = "0.3" log = { version = "0.4", features = ["std", "serde"] } rand = "0.7" +rmp-serde = "0.14" serde = { version = "1", features = ["derive"] } serde_derive = "1" serde_json = "1" diff --git a/src/game.rs b/src/game.rs index ea0f925..43559c4 100644 --- a/src/game.rs +++ b/src/game.rs @@ -1,20 +1,36 @@ -use crate::network::{NetworkMessage, NetworkPayload}; +use crate::{ + messages::{ClientMessage, ServerMessage}, + network::{NetworkMessage, NetworkPayload}, +}; use async_std::sync::Arc; use async_std::sync::RwLock; use async_std::sync::{Receiver, Sender}; use shipyard::{error, World}; use std::collections::HashMap; +macro_rules! send { + ($g:ident, $id:expr, $e:expr) => { + $g.write() + .await + .net_out + .send(NetworkMessage { + conn_id: $id, + data: $e, + }) + .await + }; +} + pub struct Player {} pub struct Game { world: World, players: HashMap, - net_out: Sender, + net_out: Sender>, } impl Game { - pub fn new(net_out: Sender) -> Self { + pub fn new(net_out: Sender>) -> Self { // Create world let world = World::default(); @@ -37,7 +53,10 @@ impl Game { self.world.try_run_workload("update") } - pub async fn read_loop(game: Arc>, net_in: Receiver) { + pub async fn read_loop( + game: Arc>, + net_in: Receiver>, + ) { loop { let message = net_in.recv().await.expect("could not read from channel"); match message.data { @@ -47,18 +66,27 @@ impl Game { .await .players .insert(message.conn_id, Player {}); + // Say hellO! + send!( + game, + message.conn_id, + NetworkPayload::Message(ServerMessage::Hello( + "unnamed-server".to_string(), + "odynet-a1".to_string() + )) + ); + } + NetworkPayload::Disconnected => { + log::info!("[{:?}] disconnected", &message.conn_id); } NetworkPayload::Message(msg) => { log::info!("[{:?}] said {:?}", &message.conn_id, msg); - game.write() - .await - .net_out - .send(NetworkMessage { - conn_id: message.conn_id, - data: NetworkPayload::Message(tungstenite::Message::text("hello")), - }) - .await; } + NetworkPayload::InvalidFormat(err) => send!( + game, + message.conn_id, + NetworkPayload::Message(ServerMessage::Error(err)) + ), } } } diff --git a/src/main.rs b/src/main.rs index 7cb8b13..f72b7ce 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ mod components; mod config; mod game; +mod messages; mod network; mod systems; diff --git a/src/messages.rs b/src/messages.rs new file mode 100644 index 0000000..318c1e8 --- /dev/null +++ b/src/messages.rs @@ -0,0 +1,26 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub enum ClientMessage { + Hello(), +} + +impl ClientMessage { + pub fn try_read(msg: tungstenite::Message) -> Result { + rmp_serde::from_slice(msg.into_data().as_slice()) + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerMessage { + Hello(String, String), // Server name - Protocol version + Error(String), +} + +impl Into for ServerMessage { + fn into(self) -> tungstenite::Message { + tungstenite::Message::binary( + rmp_serde::to_vec_named(&self).expect("could not serialize server response"), + ) + } +} diff --git a/src/network.rs b/src/network.rs index ba6022e..364a323 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,3 +1,4 @@ +use crate::messages::{ClientMessage, ServerMessage}; use async_std::sync::Arc; use async_std::sync::Mutex; use std::collections::HashMap; @@ -9,15 +10,17 @@ use async_tungstenite::{accept_async, WebSocketStream}; use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt}; #[derive(Debug)] -pub struct NetworkMessage { +pub struct NetworkMessage { pub conn_id: usize, - pub data: NetworkPayload, + pub data: NetworkPayload, } #[derive(Debug)] -pub enum NetworkPayload { +pub enum NetworkPayload { Connected, - Message(tungstenite::Message), + Disconnected, + Message(T), + InvalidFormat(String), } struct Connection { @@ -41,7 +44,7 @@ impl NetworkManager { self.connections.insert(conn.conn_id, conn); } - async fn send(&mut self, message: NetworkMessage) { + async fn send(&mut self, message: NetworkMessage) { let conn = self .connections .get_mut(&message.conn_id) @@ -49,7 +52,7 @@ impl NetworkManager { match message.data { NetworkPayload::Message(msg) => conn .stream - .send(msg) + .send(msg.into()) .await .expect("could not send message to connection"), _ => (), @@ -59,8 +62,8 @@ impl NetworkManager { pub async fn listen( bind: String, - incoming: Sender, - outgoing: Receiver, + incoming: Sender>, + outgoing: Receiver>, ) { let network = Arc::new(Mutex::new(NetworkManager::new())); let listener = TcpListener::bind(&bind).await.expect("Can't listen"); @@ -79,7 +82,7 @@ pub async fn listen( async fn accept_connection( peer: SocketAddr, stream: TcpStream, - incoming: Sender, + incoming: Sender>, ) -> Connection { // Generate random connection id let conn_id = rand::random(); @@ -108,24 +111,53 @@ async fn accept_connection( async fn read_loop( mut stream: SplitStream>, - ch: Sender, + ch: Sender>, conn_id: usize, ) { loop { - let message = stream - .next() - .await - .expect("failed getting the next message") - .expect("received error while reading"); - ch.send(NetworkMessage { - conn_id, - data: NetworkPayload::Message(message), - }) - .await; + let recv = stream.next().await; + match recv { + // Handle close + Some(Ok(tungstenite::Message::Close(reason))) => { + ch.send(NetworkMessage { + conn_id, + data: NetworkPayload::Disconnected, + }) + .await; + log::info!("{:?} left for: {:?}", conn_id, reason); + break; + } + // Message received + Some(Ok(message)) => match ClientMessage::try_read(message) { + Ok(msg) => { + ch.send(NetworkMessage { + conn_id, + data: NetworkPayload::Message(msg), + }) + .await; + } + Err(err) => { + log::warn!("[{:?}] invalid message received: {:?}", conn_id, err); + // Ask server to notify client about them fucking up + ch.send(NetworkMessage { + conn_id, + data: NetworkPayload::InvalidFormat(err.to_string()), + }) + .await; + } + }, + // Errors + Some(Err(err)) => { + log::warn!("error received in read loop: {:?}", err); + break; + } + None => break, + } } + log::info!("stopped read loop for {:?}", conn_id); } -async fn write_loop(net: Arc>, ch: Receiver) { +async fn write_loop(net: Arc>, ch: Receiver>) { loop { let msg = ch.recv().await.expect("failed to receive outgoing message"); net.lock().await.send(msg).await;