From ac5c4b98a083de16b2c87d1da7828b65d0e71894 Mon Sep 17 00:00:00 2001 From: Hamcha Date: Sat, 3 Oct 2020 15:41:11 +0200 Subject: [PATCH] The big fuck you refactor part one: Connections --- Cargo.lock | 3 + Cargo.toml | 5 +- src/game.rs | 53 ++++++++++++++++++ src/main.rs | 45 ++++----------- src/network.rs | 147 ++++++++++++++++++++++++------------------------- 5 files changed, 143 insertions(+), 110 deletions(-) create mode 100644 src/game.rs diff --git a/Cargo.lock b/Cargo.lock index 035b2eb..2f25a3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -898,13 +898,16 @@ name = "odyssey-server" version = "0.1.0" dependencies = [ "actix", + "actix-rt", "config", "env_logger", + "futures-channel", "futures-util", "log", "rand", "serde 1.0.116", "serde_derive", + "serde_json", "shipyard", "tokio", "tokio-tungstenite", diff --git a/Cargo.toml b/Cargo.toml index fcca469..69118e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2018" [dependencies] actix = "0.10" +actix-rt = "1.1" config = "0.10" env_logger = "0.7" futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } @@ -13,8 +14,10 @@ log = { version = "0.4", features = ["std", "serde"] } rand = "0.7" serde = { version = "1", features = ["derive"] } serde_derive = "1" +serde_json = "1" shipyard = { version = "0.4", features = ["parallel", "serde"] } tokio = { version = "0.2", features = ["full"] } tokio-tungstenite = "0.11" tungstenite = "0.11" -ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] } \ No newline at end of file +ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] } +futures-channel = "0.3" \ No newline at end of file diff --git a/src/game.rs b/src/game.rs new file mode 100644 index 0000000..1656c43 --- /dev/null +++ b/src/game.rs @@ -0,0 +1,53 @@ +use actix::prelude::*; +use shipyard::World; +use std::{collections::HashMap, time::Duration}; + +// Minimum delay between updates, in milliseconds +const MIN_UPDATE_MS: u64 = 10; + +pub struct Player {} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct PlayerConnected { + pub conn_id: usize, +} + +pub struct Game { + world: World, + players: HashMap, +} + +impl Game { + pub fn new() -> Self { + // Create world + let world = World::default(); + + // Create workload + world.add_workload("update").build(); + + Game { + world, + players: HashMap::new(), + } + } +} + +impl Actor for Game { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + // Start update loop + ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| { + game.world.run_workload("update"); + }); + } +} + +impl Handler for Game { + type Result = (); + fn handle(&mut self, msg: PlayerConnected, _ctx: &mut Self::Context) -> Self::Result { + log::info!("Added player {:?} to lobby", msg.conn_id); + self.players.insert(msg.conn_id, Player {}); + } +} diff --git a/src/main.rs b/src/main.rs index e90eaf6..fb9ea0d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,52 +1,27 @@ mod components; mod config; +mod game; mod network; mod systems; use crate::config::Settings; -use crate::network::{listen_websocket, NetworkManager}; +use crate::game::Game; +use crate::network::listen_websocket; use actix::prelude::*; use env_logger::Env; -use shipyard::World; -use std::time::Duration; -// Minimum delay between updates, in milliseconds -const MIN_UPDATE_MS: u64 = 10; - -struct Game { - world: World, -} - -impl Actor for Game { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - // Start update loop - ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| { - game.world.run_workload("update"); - }); - } -} - -fn main() { +#[actix_rt::main] +async fn main() { let env = Env::default().filter_or("LOG_LEVEL", "info"); env_logger::init_from_env(env); let settings = Settings::new().unwrap(); - let system = System::new("main"); + let game = Game::new().start(); - Game::create(|_ctx| { - // Create world - let world = World::default(); + listen_websocket(settings.bind, game).await; - // Create workload - world.add_workload("update").build(); - - Game { world } - }); - - NetworkManager::new(settings.bind).start(); - - system.run().unwrap(); + tokio::signal::ctrl_c().await.unwrap(); + log::info!("Ctrl-C received, shutting down"); + System::current().stop(); } diff --git a/src/network.rs b/src/network.rs index 3064859..6f5ef05 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,94 +1,93 @@ +use crate::game::PlayerConnected; +use crate::Game; use actix::prelude::*; -use log::info; -use rand::prelude::*; +use futures_util::StreamExt; use std::collections::HashMap; +use std::net::SocketAddr; use tokio::net::{TcpListener, TcpStream}; use tokio_tungstenite::WebSocketStream; -pub struct NetworkConnection { - conn_id: u64, - connected: bool, - stream: WebSocketStream, - manager: Addr, -} - -impl Actor for NetworkConnection { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - // Send manager my info and get a connection ID assigned - let addr = ctx.address(); - self.manager - .send(ClientConnected { addr }) - .into_actor(self) - .then(move |res, act, _| { - act.conn_id = res.unwrap(); - act.connected = true; - async {}.into_actor(act) - }) - .wait(ctx); - //TODO Start recv/send loop - } -} +#[derive(Message)] +#[rtype(result = "()")] +struct TcpConnect(pub TcpStream, pub SocketAddr); #[derive(Message)] -#[rtype(result = "u64")] -struct ClientConnected { - addr: Addr, +#[rtype(result = "()")] +struct WSConnect(pub WebSocketStream, pub SocketAddr); + +struct Connection { + stream: WebSocketStream, + addr: SocketAddr, + conn_id: usize, } -pub struct NetworkManager { - bind: String, - clients: HashMap>, +struct Server { + game: Addr, + connections: HashMap, } -impl Actor for NetworkManager { - type Context = Context; - - fn started(&mut self, ctx: &mut Self::Context) { - Arbiter::spawn(listen_websocket(self.bind.clone(), ctx.address())); - } +impl Actor for Server { + type Context = Context; } -impl Handler for NetworkManager { - type Result = u64; - fn handle(&mut self, msg: ClientConnected, _ctx: &mut Self::Context) -> Self::Result { - let client_id = rand::thread_rng().gen(); - self.clients.insert(client_id, msg.addr); - client_id - } -} - -impl NetworkManager { - pub fn new(bind: String) -> Self { - NetworkManager { - bind, - clients: HashMap::new(), +impl Server { + pub fn new(game: Addr) -> Self { + Server { + game, + connections: HashMap::new(), } } } -pub async fn listen_websocket(bind: String, manager: Addr) { - let try_socket = TcpListener::bind(&bind).await; - let mut listener = try_socket.expect("Failed to bind"); - info!("Listening on: {}", bind); +/// Handle stream of TcpStream's +impl Handler for Server { + /// this is response for message, which is defined by `ResponseType` trait + /// in this case we just return unit. + type Result = (); - while let Ok((stream, _)) = listener.accept().await { - let addr = stream - .peer_addr() - .expect("connected streams should have a peer address"); - info!("Peer address: {}", addr); - - let stream = tokio_tungstenite::accept_async(stream) - .await - .expect("Error during the websocket handshake occurred"); - info!("New WebSocket connection: {}", addr); - - NetworkConnection::create(|_ctx| NetworkConnection { - conn_id: 0, - connected: false, - stream, - manager: manager.clone(), - }); + fn handle(&mut self, msg: TcpConnect, ctx: &mut Context) { + accept_websocket(msg) + .into_actor(self) + .then(move |res, act, _| { + // Assign random id to player + let conn_id = rand::random(); + act.connections.insert( + conn_id, + Connection { + stream: res.0, + addr: res.1, + conn_id, + }, + ); + act.game + .send(PlayerConnected { conn_id }) + .into_actor(act) + .then(move |_, act, _| async {}.into_actor(act)) + }) + .wait(ctx); } } + +async fn accept_websocket(msg: TcpConnect) -> WSConnect { + let stream = tokio_tungstenite::accept_async(msg.0) + .await + .expect("Could not accept as websocket"); + WSConnect(stream, msg.1) +} + +pub async fn listen_websocket(bind: String, game: Addr) { + let try_socket = TcpListener::bind(&bind).await; + let listener = Box::new(try_socket.expect("Failed to bind")); + log::info!("Listening on: {}", bind); + + Server::create(move |ctx| { + ctx.add_message_stream(Box::leak(listener).incoming().map(|st| { + let st = st.expect("could not accept socket"); + let addr = st + .peer_addr() + .expect("could not retrieve connection address"); + TcpConnect(st, addr) + })); + Server::new(game) + }); +}