The big fuck you refactor part one: Connections

This commit is contained in:
Hamcha 2020-10-03 15:41:11 +02:00
parent f3e200e91d
commit ac5c4b98a0
Signed by: hamcha
GPG key ID: 41467804B19A3315
5 changed files with 143 additions and 110 deletions

3
Cargo.lock generated
View file

@ -898,13 +898,16 @@ name = "odyssey-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"actix", "actix",
"actix-rt",
"config", "config",
"env_logger", "env_logger",
"futures-channel",
"futures-util", "futures-util",
"log", "log",
"rand", "rand",
"serde 1.0.116", "serde 1.0.116",
"serde_derive", "serde_derive",
"serde_json",
"shipyard", "shipyard",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",

View file

@ -6,6 +6,7 @@ edition = "2018"
[dependencies] [dependencies]
actix = "0.10" actix = "0.10"
actix-rt = "1.1"
config = "0.10" config = "0.10"
env_logger = "0.7" env_logger = "0.7"
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] }
@ -13,8 +14,10 @@ log = { version = "0.4", features = ["std", "serde"] }
rand = "0.7" rand = "0.7"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_derive = "1" serde_derive = "1"
serde_json = "1"
shipyard = { version = "0.4", features = ["parallel", "serde"] } shipyard = { version = "0.4", features = ["parallel", "serde"] }
tokio = { version = "0.2", features = ["full"] } tokio = { version = "0.2", features = ["full"] }
tokio-tungstenite = "0.11" tokio-tungstenite = "0.11"
tungstenite = "0.11" tungstenite = "0.11"
ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] } ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] }
futures-channel = "0.3"

53
src/game.rs Normal file
View file

@ -0,0 +1,53 @@
use actix::prelude::*;
use shipyard::World;
use std::{collections::HashMap, time::Duration};
// Minimum delay between updates, in milliseconds
const MIN_UPDATE_MS: u64 = 10;
pub struct Player {}
#[derive(Message)]
#[rtype(result = "()")]
pub struct PlayerConnected {
pub conn_id: usize,
}
pub struct Game {
world: World,
players: HashMap<usize, Player>,
}
impl Game {
pub fn new() -> Self {
// Create world
let world = World::default();
// Create workload
world.add_workload("update").build();
Game {
world,
players: HashMap::new(),
}
}
}
impl Actor for Game {
type Context = Context<Game>;
fn started(&mut self, ctx: &mut Self::Context) {
// Start update loop
ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| {
game.world.run_workload("update");
});
}
}
impl Handler<PlayerConnected> for Game {
type Result = ();
fn handle(&mut self, msg: PlayerConnected, _ctx: &mut Self::Context) -> Self::Result {
log::info!("Added player {:?} to lobby", msg.conn_id);
self.players.insert(msg.conn_id, Player {});
}
}

View file

@ -1,52 +1,27 @@
mod components; mod components;
mod config; mod config;
mod game;
mod network; mod network;
mod systems; mod systems;
use crate::config::Settings; use crate::config::Settings;
use crate::network::{listen_websocket, NetworkManager}; use crate::game::Game;
use crate::network::listen_websocket;
use actix::prelude::*; use actix::prelude::*;
use env_logger::Env; use env_logger::Env;
use shipyard::World;
use std::time::Duration;
// Minimum delay between updates, in milliseconds #[actix_rt::main]
const MIN_UPDATE_MS: u64 = 10; async fn main() {
struct Game {
world: World,
}
impl Actor for Game {
type Context = Context<Game>;
fn started(&mut self, ctx: &mut Self::Context) {
// Start update loop
ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| {
game.world.run_workload("update");
});
}
}
fn main() {
let env = Env::default().filter_or("LOG_LEVEL", "info"); let env = Env::default().filter_or("LOG_LEVEL", "info");
env_logger::init_from_env(env); env_logger::init_from_env(env);
let settings = Settings::new().unwrap(); let settings = Settings::new().unwrap();
let system = System::new("main"); let game = Game::new().start();
Game::create(|_ctx| { listen_websocket(settings.bind, game).await;
// Create world
let world = World::default();
// Create workload tokio::signal::ctrl_c().await.unwrap();
world.add_workload("update").build(); log::info!("Ctrl-C received, shutting down");
System::current().stop();
Game { world }
});
NetworkManager::new(settings.bind).start();
system.run().unwrap();
} }

View file

@ -1,94 +1,93 @@
use crate::game::PlayerConnected;
use crate::Game;
use actix::prelude::*; use actix::prelude::*;
use log::info; use futures_util::StreamExt;
use rand::prelude::*;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
pub struct NetworkConnection { #[derive(Message)]
conn_id: u64, #[rtype(result = "()")]
connected: bool, struct TcpConnect(pub TcpStream, pub SocketAddr);
stream: WebSocketStream<TcpStream>,
manager: Addr<NetworkManager>,
}
impl Actor for NetworkConnection {
type Context = Context<NetworkConnection>;
fn started(&mut self, ctx: &mut Self::Context) {
// Send manager my info and get a connection ID assigned
let addr = ctx.address();
self.manager
.send(ClientConnected { addr })
.into_actor(self)
.then(move |res, act, _| {
act.conn_id = res.unwrap();
act.connected = true;
async {}.into_actor(act)
})
.wait(ctx);
//TODO Start recv/send loop
}
}
#[derive(Message)] #[derive(Message)]
#[rtype(result = "u64")] #[rtype(result = "()")]
struct ClientConnected { struct WSConnect(pub WebSocketStream<TcpStream>, pub SocketAddr);
addr: Addr<NetworkConnection>,
struct Connection {
stream: WebSocketStream<TcpStream>,
addr: SocketAddr,
conn_id: usize,
} }
pub struct NetworkManager { struct Server {
bind: String, game: Addr<Game>,
clients: HashMap<u64, Addr<NetworkConnection>>, connections: HashMap<usize, Connection>,
} }
impl Actor for NetworkManager { impl Actor for Server {
type Context = Context<NetworkManager>; type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
Arbiter::spawn(listen_websocket(self.bind.clone(), ctx.address()));
}
} }
impl Handler<ClientConnected> for NetworkManager { impl Server {
type Result = u64; pub fn new(game: Addr<Game>) -> Self {
fn handle(&mut self, msg: ClientConnected, _ctx: &mut Self::Context) -> Self::Result { Server {
let client_id = rand::thread_rng().gen(); game,
self.clients.insert(client_id, msg.addr); connections: HashMap::new(),
client_id
}
}
impl NetworkManager {
pub fn new(bind: String) -> Self {
NetworkManager {
bind,
clients: HashMap::new(),
} }
} }
} }
pub async fn listen_websocket(bind: String, manager: Addr<NetworkManager>) { /// Handle stream of TcpStream's
let try_socket = TcpListener::bind(&bind).await; impl Handler<TcpConnect> for Server {
let mut listener = try_socket.expect("Failed to bind"); /// this is response for message, which is defined by `ResponseType` trait
info!("Listening on: {}", bind); /// in this case we just return unit.
type Result = ();
while let Ok((stream, _)) = listener.accept().await { fn handle(&mut self, msg: TcpConnect, ctx: &mut Context<Self>) {
let addr = stream accept_websocket(msg)
.peer_addr() .into_actor(self)
.expect("connected streams should have a peer address"); .then(move |res, act, _| {
info!("Peer address: {}", addr); // Assign random id to player
let conn_id = rand::random();
act.connections.insert(
conn_id,
Connection {
stream: res.0,
addr: res.1,
conn_id,
},
);
act.game
.send(PlayerConnected { conn_id })
.into_actor(act)
.then(move |_, act, _| async {}.into_actor(act))
})
.wait(ctx);
}
}
let stream = tokio_tungstenite::accept_async(stream) async fn accept_websocket(msg: TcpConnect) -> WSConnect {
let stream = tokio_tungstenite::accept_async(msg.0)
.await .await
.expect("Error during the websocket handshake occurred"); .expect("Could not accept as websocket");
info!("New WebSocket connection: {}", addr); WSConnect(stream, msg.1)
}
NetworkConnection::create(|_ctx| NetworkConnection { pub async fn listen_websocket(bind: String, game: Addr<Game>) {
conn_id: 0, let try_socket = TcpListener::bind(&bind).await;
connected: false, let listener = Box::new(try_socket.expect("Failed to bind"));
stream, log::info!("Listening on: {}", bind);
manager: manager.clone(),
Server::create(move |ctx| {
ctx.add_message_stream(Box::leak(listener).incoming().map(|st| {
let st = st.expect("could not accept socket");
let addr = st
.peer_addr()
.expect("could not retrieve connection address");
TcpConnect(st, addr)
}));
Server::new(game)
}); });
} }
}