odyssey-server/src/network.rs

175 lines
4.2 KiB
Rust

use crate::messages::{ClientMessage, ServerMessage};
use async_std::sync::Arc;
use async_std::sync::Mutex;
use std::collections::HashMap;
use async_std::net::{SocketAddr, TcpListener, TcpStream};
use async_std::sync::{Receiver, Sender};
use async_std::task;
use async_tungstenite::{accept_async, WebSocketStream};
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
#[derive(Debug)]
pub struct NetworkMessage<T> {
pub conn_id: usize,
pub data: NetworkPayload<T>,
}
#[derive(Debug)]
pub enum NetworkPayload<T> {
Connected,
Disconnected,
Message(T),
InvalidFormat(String),
}
struct Connection {
stream: SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>,
address: SocketAddr,
conn_id: usize,
}
pub struct NetworkManager {
connections: HashMap<usize, Connection>,
}
impl NetworkManager {
pub fn new() -> Self {
NetworkManager {
connections: HashMap::new(),
}
}
fn register(&mut self, conn: Connection) {
self.connections.insert(conn.conn_id, conn);
}
async fn send(&mut self, message: NetworkMessage<ServerMessage>) {
let conn = self
.connections
.get_mut(&message.conn_id)
.expect("cant send message to an unregistered connection");
if let NetworkPayload::Message(msg) = message.data {
let res = conn.stream.send(msg.into()).await;
if let Err(err) = res {
log::error!("Error sending message over to game instance: {:?}", err)
}
}
}
}
pub async fn listen(
bind: String,
incoming: Sender<NetworkMessage<ClientMessage>>,
outgoing: Receiver<NetworkMessage<ServerMessage>>,
) {
let network = Arc::new(Mutex::new(NetworkManager::new()));
let listener = TcpListener::bind(&bind).await.expect("Can't listen");
log::info!("Listening on: {}", bind);
task::spawn(write_loop(network.clone(), outgoing));
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
log::info!("Peer address: {}", peer);
match accept_connection(peer, stream, incoming.clone()).await {
Ok(conn) => {
network.lock().await.register(conn);
}
Err(err) => log::warn!("failed connection from {:?}: {:?}", peer, err),
}
}
}
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
incoming: Sender<NetworkMessage<ClientMessage>>,
) -> Result<Connection, tungstenite::Error> {
// Generate random connection id
let conn_id = rand::random();
// Get streams
let ws_stream = accept_async(stream).await?;
let (ws_sender, ws_receiver) = ws_stream.split();
// Send event to game instance
incoming
.send(NetworkMessage {
conn_id,
data: NetworkPayload::Connected,
})
.await;
// Start read loop
task::spawn(read_loop(ws_receiver, incoming, conn_id));
// Return connection instance for network manager
Ok(Connection {
stream: ws_sender,
address: peer,
conn_id,
})
}
async fn read_loop(
mut stream: SplitStream<WebSocketStream<TcpStream>>,
ch: Sender<NetworkMessage<ClientMessage>>,
conn_id: usize,
) {
loop {
let recv = stream.next().await;
match recv {
// Handle close
Some(Ok(tungstenite::Message::Close(reason))) => {
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::Disconnected,
})
.await;
log::info!("{:?} left for: {:?}", conn_id, reason);
break;
}
// Message received
Some(Ok(message)) => match ClientMessage::try_read(message) {
Ok(msg) => {
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::Message(msg),
})
.await;
}
Err(err) => {
log::warn!("[{:?}] invalid message received: {:?}", conn_id, err);
// Ask server to notify client about them fucking up
ch.send(NetworkMessage {
conn_id,
data: NetworkPayload::InvalidFormat(err.to_string()),
})
.await;
}
},
// Errors
Some(Err(err)) => {
log::warn!("error received in read loop: {:?}", err);
break;
}
None => break,
}
}
log::info!("stopped read loop for {:?}", conn_id);
}
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage<ServerMessage>>) {
loop {
match ch.recv().await {
Ok(msg) => {
net.lock().await.send(msg).await;
}
Err(err) => {
log::warn!("write loop received error: {:?}", err);
break;
}
}
}
}