odyssey-server/src/network.rs

175 lines
4.2 KiB
Rust
Raw Permalink Normal View History

2020-10-05 13:11:41 +00:00
use crate::messages::{ClientMessage, ServerMessage};
2020-10-04 02:14:23 +00:00
use async_std::sync::Arc;
use async_std::sync::Mutex;
use std::collections::HashMap;
2020-10-04 02:14:23 +00:00
use async_std::net::{SocketAddr, TcpListener, TcpStream};
use async_std::sync::{Receiver, Sender};
use async_std::task;
use async_tungstenite::{accept_async, WebSocketStream};
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
2020-10-04 02:14:23 +00:00
#[derive(Debug)]
2020-10-05 13:11:41 +00:00
pub struct NetworkMessage<T> {
2020-10-04 02:14:23 +00:00
pub conn_id: usize,
2020-10-05 13:11:41 +00:00
pub data: NetworkPayload<T>,
2020-10-05 00:24:23 +00:00
}
#[derive(Debug)]
2020-10-05 13:11:41 +00:00
pub enum NetworkPayload<T> {
2020-10-05 00:24:23 +00:00
Connected,
2020-10-05 13:11:41 +00:00
Disconnected,
Message(T),
InvalidFormat(String),
2020-10-04 02:14:23 +00:00
}
struct Connection {
2020-10-04 02:14:23 +00:00
stream: SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>,
2020-10-05 00:24:23 +00:00
address: SocketAddr,
conn_id: usize,
}
2020-10-04 02:14:23 +00:00
pub struct NetworkManager {
connections: HashMap<usize, Connection>,
}
2020-10-04 02:14:23 +00:00
impl NetworkManager {
pub fn new() -> Self {
NetworkManager {
connections: HashMap::new(),
}
}
2020-10-04 02:14:23 +00:00
fn register(&mut self, conn: Connection) {
self.connections.insert(conn.conn_id, conn);
}
2020-10-05 13:11:41 +00:00
async fn send(&mut self, message: NetworkMessage<ServerMessage>) {
2020-10-04 02:14:23 +00:00
let conn = self
.connections
.get_mut(&message.conn_id)
2020-10-04 02:14:23 +00:00
.expect("cant send message to an unregistered connection");
2020-10-05 15:17:44 +00:00
if let NetworkPayload::Message(msg) = message.data {
let res = conn.stream.send(msg.into()).await;
if let Err(err) = res {
log::error!("Error sending message over to game instance: {:?}", err)
}
2020-10-05 00:24:23 +00:00
}
}
}
2020-10-04 02:14:23 +00:00
pub async fn listen(
bind: String,
2020-10-05 13:11:41 +00:00
incoming: Sender<NetworkMessage<ClientMessage>>,
outgoing: Receiver<NetworkMessage<ServerMessage>>,
2020-10-04 02:14:23 +00:00
) {
let network = Arc::new(Mutex::new(NetworkManager::new()));
let listener = TcpListener::bind(&bind).await.expect("Can't listen");
log::info!("Listening on: {}", bind);
task::spawn(write_loop(network.clone(), outgoing));
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
log::info!("Peer address: {}", peer);
2020-10-05 15:17:44 +00:00
match accept_connection(peer, stream, incoming.clone()).await {
Ok(conn) => {
network.lock().await.register(conn);
}
Err(err) => log::warn!("failed connection from {:?}: {:?}", peer, err),
}
2020-10-04 02:14:23 +00:00
}
}
2020-10-04 02:14:23 +00:00
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
2020-10-05 13:11:41 +00:00
incoming: Sender<NetworkMessage<ClientMessage>>,
2020-10-05 15:17:44 +00:00
) -> Result<Connection, tungstenite::Error> {
2020-10-05 00:24:23 +00:00
// Generate random connection id
2020-10-04 02:14:23 +00:00
let conn_id = rand::random();
2020-10-05 00:24:23 +00:00
// Get streams
2020-10-05 15:17:44 +00:00
let ws_stream = accept_async(stream).await?;
2020-10-04 02:14:23 +00:00
let (ws_sender, ws_receiver) = ws_stream.split();
2020-10-05 00:24:23 +00:00
// Send event to game instance
incoming
.send(NetworkMessage {
conn_id,
data: NetworkPayload::Connected,
})
.await;
// Start read loop
2020-10-04 02:14:23 +00:00
task::spawn(read_loop(ws_receiver, incoming, conn_id));
2020-10-05 00:24:23 +00:00
// Return connection instance for network manager
2020-10-05 15:17:44 +00:00
Ok(Connection {
2020-10-04 02:14:23 +00:00
stream: ws_sender,
2020-10-05 00:24:23 +00:00
address: peer,
2020-10-04 02:14:23 +00:00
conn_id,
2020-10-05 15:17:44 +00:00
})
2020-10-04 02:14:23 +00:00
}
2020-10-04 02:14:23 +00:00
async fn read_loop(
mut stream: SplitStream<WebSocketStream<TcpStream>>,
2020-10-05 13:11:41 +00:00
ch: Sender<NetworkMessage<ClientMessage>>,
2020-10-04 02:14:23 +00:00
conn_id: usize,
) {
loop {
2020-10-05 13:11:41 +00:00
let recv = stream.next().await;
match recv {
// Handle close
Some(Ok(tungstenite::Message::Close(reason))) => {
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::Disconnected,
})
.await;
log::info!("{:?} left for: {:?}", conn_id, reason);
break;
}
// Message received
Some(Ok(message)) => match ClientMessage::try_read(message) {
Ok(msg) => {
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::Message(msg),
})
.await;
}
Err(err) => {
log::warn!("[{:?}] invalid message received: {:?}", conn_id, err);
// Ask server to notify client about them fucking up
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::InvalidFormat(err.to_string()),
})
.await;
}
},
// Errors
Some(Err(err)) => {
log::warn!("error received in read loop: {:?}", err);
break;
}
None => break,
}
2020-10-04 02:14:23 +00:00
}
2020-10-05 13:11:41 +00:00
log::info!("stopped read loop for {:?}", conn_id);
2020-10-04 02:14:23 +00:00
}
2020-10-05 13:11:41 +00:00
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage<ServerMessage>>) {
2020-10-04 02:14:23 +00:00
loop {
2020-10-05 15:17:44 +00:00
match ch.recv().await {
Ok(msg) => {
net.lock().await.send(msg).await;
}
Err(err) => {
log::warn!("write loop received error: {:?}", err);
break;
}
}
2020-10-04 02:14:23 +00:00
}
}