use msgpack for messages

This commit is contained in:
Hamcha 2020-10-05 15:11:41 +02:00
parent b615c86820
commit a7e227b5f0
Signed by: hamcha
GPG key ID: 41467804B19A3315
6 changed files with 143 additions and 33 deletions

22
Cargo.lock generated
View file

@ -766,6 +766,7 @@ dependencies = [
"futures-util", "futures-util",
"log", "log",
"rand", "rand",
"rmp-serde",
"serde 1.0.116", "serde 1.0.116",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
@ -1047,6 +1048,27 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rmp"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f"
dependencies = [
"byteorder",
"num-traits 0.2.12",
]
[[package]]
name = "rmp-serde"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ce7d70c926fe472aed493b902010bccc17fa9f7284145cb8772fd22fdb052d8"
dependencies = [
"byteorder",
"rmp",
"serde 1.0.116",
]
[[package]] [[package]]
name = "rust-ini" name = "rust-ini"
version = "0.13.0" version = "0.13.0"

View file

@ -12,6 +12,7 @@ env_logger = "0.7"
futures-util = "0.3" futures-util = "0.3"
log = { version = "0.4", features = ["std", "serde"] } log = { version = "0.4", features = ["std", "serde"] }
rand = "0.7" rand = "0.7"
rmp-serde = "0.14"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_derive = "1" serde_derive = "1"
serde_json = "1" serde_json = "1"

View file

@ -1,20 +1,36 @@
use crate::network::{NetworkMessage, NetworkPayload}; use crate::{
messages::{ClientMessage, ServerMessage},
network::{NetworkMessage, NetworkPayload},
};
use async_std::sync::Arc; use async_std::sync::Arc;
use async_std::sync::RwLock; use async_std::sync::RwLock;
use async_std::sync::{Receiver, Sender}; use async_std::sync::{Receiver, Sender};
use shipyard::{error, World}; use shipyard::{error, World};
use std::collections::HashMap; use std::collections::HashMap;
macro_rules! send {
($g:ident, $id:expr, $e:expr) => {
$g.write()
.await
.net_out
.send(NetworkMessage {
conn_id: $id,
data: $e,
})
.await
};
}
pub struct Player {} pub struct Player {}
pub struct Game { pub struct Game {
world: World, world: World,
players: HashMap<usize, Player>, players: HashMap<usize, Player>,
net_out: Sender<NetworkMessage>, net_out: Sender<NetworkMessage<ServerMessage>>,
} }
impl Game { impl Game {
pub fn new(net_out: Sender<NetworkMessage>) -> Self { pub fn new(net_out: Sender<NetworkMessage<ServerMessage>>) -> Self {
// Create world // Create world
let world = World::default(); let world = World::default();
@ -37,7 +53,10 @@ impl Game {
self.world.try_run_workload("update") self.world.try_run_workload("update")
} }
pub async fn read_loop(game: Arc<RwLock<Self>>, net_in: Receiver<NetworkMessage>) { pub async fn read_loop(
game: Arc<RwLock<Self>>,
net_in: Receiver<NetworkMessage<ClientMessage>>,
) {
loop { loop {
let message = net_in.recv().await.expect("could not read from channel"); let message = net_in.recv().await.expect("could not read from channel");
match message.data { match message.data {
@ -47,18 +66,27 @@ impl Game {
.await .await
.players .players
.insert(message.conn_id, Player {}); .insert(message.conn_id, Player {});
// Say hellO!
send!(
game,
message.conn_id,
NetworkPayload::Message(ServerMessage::Hello(
"unnamed-server".to_string(),
"odynet-a1".to_string()
))
);
}
NetworkPayload::Disconnected => {
log::info!("[{:?}] disconnected", &message.conn_id);
} }
NetworkPayload::Message(msg) => { NetworkPayload::Message(msg) => {
log::info!("[{:?}] said {:?}", &message.conn_id, msg); log::info!("[{:?}] said {:?}", &message.conn_id, msg);
game.write()
.await
.net_out
.send(NetworkMessage {
conn_id: message.conn_id,
data: NetworkPayload::Message(tungstenite::Message::text("hello")),
})
.await;
} }
NetworkPayload::InvalidFormat(err) => send!(
game,
message.conn_id,
NetworkPayload::Message(ServerMessage::Error(err))
),
} }
} }
} }

View file

@ -1,6 +1,7 @@
mod components; mod components;
mod config; mod config;
mod game; mod game;
mod messages;
mod network; mod network;
mod systems; mod systems;

26
src/messages.rs Normal file
View file

@ -0,0 +1,26 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub enum ClientMessage {
Hello(),
}
impl ClientMessage {
pub fn try_read(msg: tungstenite::Message) -> Result<ClientMessage, rmp_serde::decode::Error> {
rmp_serde::from_slice(msg.into_data().as_slice())
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ServerMessage {
Hello(String, String), // Server name - Protocol version
Error(String),
}
impl Into<tungstenite::Message> for ServerMessage {
fn into(self) -> tungstenite::Message {
tungstenite::Message::binary(
rmp_serde::to_vec_named(&self).expect("could not serialize server response"),
)
}
}

View file

@ -1,3 +1,4 @@
use crate::messages::{ClientMessage, ServerMessage};
use async_std::sync::Arc; use async_std::sync::Arc;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
@ -9,15 +10,17 @@ use async_tungstenite::{accept_async, WebSocketStream};
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt}; use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
#[derive(Debug)] #[derive(Debug)]
pub struct NetworkMessage { pub struct NetworkMessage<T> {
pub conn_id: usize, pub conn_id: usize,
pub data: NetworkPayload, pub data: NetworkPayload<T>,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum NetworkPayload { pub enum NetworkPayload<T> {
Connected, Connected,
Message(tungstenite::Message), Disconnected,
Message(T),
InvalidFormat(String),
} }
struct Connection { struct Connection {
@ -41,7 +44,7 @@ impl NetworkManager {
self.connections.insert(conn.conn_id, conn); self.connections.insert(conn.conn_id, conn);
} }
async fn send(&mut self, message: NetworkMessage) { async fn send(&mut self, message: NetworkMessage<ServerMessage>) {
let conn = self let conn = self
.connections .connections
.get_mut(&message.conn_id) .get_mut(&message.conn_id)
@ -49,7 +52,7 @@ impl NetworkManager {
match message.data { match message.data {
NetworkPayload::Message(msg) => conn NetworkPayload::Message(msg) => conn
.stream .stream
.send(msg) .send(msg.into())
.await .await
.expect("could not send message to connection"), .expect("could not send message to connection"),
_ => (), _ => (),
@ -59,8 +62,8 @@ impl NetworkManager {
pub async fn listen( pub async fn listen(
bind: String, bind: String,
incoming: Sender<NetworkMessage>, incoming: Sender<NetworkMessage<ClientMessage>>,
outgoing: Receiver<NetworkMessage>, outgoing: Receiver<NetworkMessage<ServerMessage>>,
) { ) {
let network = Arc::new(Mutex::new(NetworkManager::new())); let network = Arc::new(Mutex::new(NetworkManager::new()));
let listener = TcpListener::bind(&bind).await.expect("Can't listen"); let listener = TcpListener::bind(&bind).await.expect("Can't listen");
@ -79,7 +82,7 @@ pub async fn listen(
async fn accept_connection( async fn accept_connection(
peer: SocketAddr, peer: SocketAddr,
stream: TcpStream, stream: TcpStream,
incoming: Sender<NetworkMessage>, incoming: Sender<NetworkMessage<ClientMessage>>,
) -> Connection { ) -> Connection {
// Generate random connection id // Generate random connection id
let conn_id = rand::random(); let conn_id = rand::random();
@ -108,24 +111,53 @@ async fn accept_connection(
async fn read_loop( async fn read_loop(
mut stream: SplitStream<WebSocketStream<TcpStream>>, mut stream: SplitStream<WebSocketStream<TcpStream>>,
ch: Sender<NetworkMessage>, ch: Sender<NetworkMessage<ClientMessage>>,
conn_id: usize, conn_id: usize,
) { ) {
loop { loop {
let message = stream let recv = stream.next().await;
.next() match recv {
.await // Handle close
.expect("failed getting the next message") Some(Ok(tungstenite::Message::Close(reason))) => {
.expect("received error while reading"); ch.send(NetworkMessage {
ch.send(NetworkMessage { conn_id,
conn_id, data: NetworkPayload::Disconnected,
data: NetworkPayload::Message(message), })
}) .await;
.await; log::info!("{:?} left for: {:?}", conn_id, reason);
break;
}
// Message received
Some(Ok(message)) => match ClientMessage::try_read(message) {
Ok(msg) => {
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::Message(msg),
})
.await;
}
Err(err) => {
log::warn!("[{:?}] invalid message received: {:?}", conn_id, err);
// Ask server to notify client about them fucking up
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::InvalidFormat(err.to_string()),
})
.await;
}
},
// Errors
Some(Err(err)) => {
log::warn!("error received in read loop: {:?}", err);
break;
}
None => break,
}
} }
log::info!("stopped read loop for {:?}", conn_id);
} }
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage>) { async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage<ServerMessage>>) {
loop { loop {
let msg = ch.recv().await.expect("failed to receive outgoing message"); let msg = ch.recv().await.expect("failed to receive outgoing message");
net.lock().await.send(msg).await; net.lock().await.send(msg).await;