134 lines
3.1 KiB
Rust
134 lines
3.1 KiB
Rust
use async_std::sync::Arc;
|
|
use async_std::sync::Mutex;
|
|
use std::collections::HashMap;
|
|
|
|
use async_std::net::{SocketAddr, TcpListener, TcpStream};
|
|
use async_std::sync::{Receiver, Sender};
|
|
use async_std::task;
|
|
use async_tungstenite::{accept_async, WebSocketStream};
|
|
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
|
|
|
|
#[derive(Debug)]
|
|
pub struct NetworkMessage {
|
|
pub conn_id: usize,
|
|
pub data: NetworkPayload,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub enum NetworkPayload {
|
|
Connected,
|
|
Message(tungstenite::Message),
|
|
}
|
|
|
|
struct Connection {
|
|
stream: SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>,
|
|
address: SocketAddr,
|
|
conn_id: usize,
|
|
}
|
|
|
|
pub struct NetworkManager {
|
|
connections: HashMap<usize, Connection>,
|
|
}
|
|
|
|
impl NetworkManager {
|
|
pub fn new() -> Self {
|
|
NetworkManager {
|
|
connections: HashMap::new(),
|
|
}
|
|
}
|
|
|
|
fn register(&mut self, conn: Connection) {
|
|
self.connections.insert(conn.conn_id, conn);
|
|
}
|
|
|
|
async fn send(&mut self, message: NetworkMessage) {
|
|
let conn = self
|
|
.connections
|
|
.get_mut(&message.conn_id)
|
|
.expect("cant send message to an unregistered connection");
|
|
match message.data {
|
|
NetworkPayload::Message(msg) => conn
|
|
.stream
|
|
.send(msg)
|
|
.await
|
|
.expect("could not send message to connection"),
|
|
_ => (),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn listen(
|
|
bind: String,
|
|
incoming: Sender<NetworkMessage>,
|
|
outgoing: Receiver<NetworkMessage>,
|
|
) {
|
|
let network = Arc::new(Mutex::new(NetworkManager::new()));
|
|
let listener = TcpListener::bind(&bind).await.expect("Can't listen");
|
|
log::info!("Listening on: {}", bind);
|
|
task::spawn(write_loop(network.clone(), outgoing));
|
|
while let Ok((stream, _)) = listener.accept().await {
|
|
let peer = stream
|
|
.peer_addr()
|
|
.expect("connected streams should have a peer address");
|
|
log::info!("Peer address: {}", peer);
|
|
let conn = accept_connection(peer, stream, incoming.clone()).await;
|
|
network.lock().await.register(conn);
|
|
}
|
|
}
|
|
|
|
async fn accept_connection(
|
|
peer: SocketAddr,
|
|
stream: TcpStream,
|
|
incoming: Sender<NetworkMessage>,
|
|
) -> Connection {
|
|
// Generate random connection id
|
|
let conn_id = rand::random();
|
|
|
|
// Get streams
|
|
let ws_stream = accept_async(stream).await.expect("could not accept");
|
|
let (ws_sender, ws_receiver) = ws_stream.split();
|
|
|
|
// Send event to game instance
|
|
incoming
|
|
.send(NetworkMessage {
|
|
conn_id,
|
|
data: NetworkPayload::Connected,
|
|
})
|
|
.await;
|
|
// Start read loop
|
|
task::spawn(read_loop(ws_receiver, incoming, conn_id));
|
|
|
|
// Return connection instance for network manager
|
|
Connection {
|
|
stream: ws_sender,
|
|
address: peer,
|
|
conn_id,
|
|
}
|
|
}
|
|
|
|
async fn read_loop(
|
|
mut stream: SplitStream<WebSocketStream<TcpStream>>,
|
|
ch: Sender<NetworkMessage>,
|
|
conn_id: usize,
|
|
) {
|
|
loop {
|
|
let message = stream
|
|
.next()
|
|
.await
|
|
.expect("failed getting the next message")
|
|
.expect("received error while reading");
|
|
ch.send(NetworkMessage {
|
|
conn_id,
|
|
data: NetworkPayload::Message(message),
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
|
|
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage>) {
|
|
loop {
|
|
let msg = ch.recv().await.expect("failed to receive outgoing message");
|
|
net.lock().await.send(msg).await;
|
|
}
|
|
}
|