BYE BYE ACTIX AND TOKIO

This commit is contained in:
Hamcha 2020-10-04 04:14:23 +02:00
parent ac5c4b98a0
commit b03514b8bd
Signed by: hamcha
GPG key ID: 41467804B19A3315
5 changed files with 480 additions and 729 deletions

974
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,19 +5,16 @@ authors = ["Hamcha <hamcha@crunchy.rocks>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
actix = "0.10" async-std = { version = "1.6", features = ["unstable"] }
actix-rt = "1.1" async-tungstenite = "0.8"
config = "0.10" config = "0.10"
env_logger = "0.7" env_logger = "0.7"
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } futures-util = "0.3"
log = { version = "0.4", features = ["std", "serde"] } log = { version = "0.4", features = ["std", "serde"] }
rand = "0.7" rand = "0.7"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_derive = "1" serde_derive = "1"
serde_json = "1" serde_json = "1"
shipyard = { version = "0.4", features = ["parallel", "serde"] } shipyard = { version = "0.4", features = ["parallel", "serde"] }
tokio = { version = "0.2", features = ["full"] }
tokio-tungstenite = "0.11"
tungstenite = "0.11" tungstenite = "0.11"
ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] } ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] }
futures-channel = "0.3"

View file

@ -1,25 +1,21 @@
use actix::prelude::*; use crate::network::NetworkMessage;
use async_std::sync::{Receiver, Sender};
use shipyard::World; use shipyard::World;
use std::{collections::HashMap, time::Duration}; use std::collections::HashMap;
// Minimum delay between updates, in milliseconds // Minimum delay between updates, in milliseconds
const MIN_UPDATE_MS: u64 = 10; const MIN_UPDATE_MS: u64 = 10;
pub struct Player {} pub struct Player {}
#[derive(Message)]
#[rtype(result = "()")]
pub struct PlayerConnected {
pub conn_id: usize,
}
pub struct Game { pub struct Game {
world: World, world: World,
players: HashMap<usize, Player>, players: HashMap<usize, Player>,
net_out: Sender<NetworkMessage>,
} }
impl Game { impl Game {
pub fn new() -> Self { pub fn new(net_out: Sender<NetworkMessage>) -> Self {
// Create world // Create world
let world = World::default(); let world = World::default();
@ -29,25 +25,20 @@ impl Game {
Game { Game {
world, world,
players: HashMap::new(), players: HashMap::new(),
net_out,
}
}
pub async fn read_loop(&self, net_in: Receiver<NetworkMessage>) {
loop {
let message = net_in.recv().await.expect("could not read from channel");
log::info!("Message received: {:?}", &message);
self.net_out
.send(NetworkMessage {
conn_id: message.conn_id,
data: tungstenite::Message::text("hello"),
})
.await;
} }
} }
} }
impl Actor for Game {
type Context = Context<Game>;
fn started(&mut self, ctx: &mut Self::Context) {
// Start update loop
ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| {
game.world.run_workload("update");
});
}
}
impl Handler<PlayerConnected> for Game {
type Result = ();
fn handle(&mut self, msg: PlayerConnected, _ctx: &mut Self::Context) -> Self::Result {
log::info!("Added player {:?} to lobby", msg.conn_id);
self.players.insert(msg.conn_id, Player {});
}
}

View file

@ -6,22 +6,28 @@ mod systems;
use crate::config::Settings; use crate::config::Settings;
use crate::game::Game; use crate::game::Game;
use crate::network::listen_websocket; use crate::network::{listen, NetworkManager};
use actix::prelude::*; use async_std::sync::Arc;
use async_std::{sync::channel, task};
use env_logger::Env; use env_logger::Env;
#[actix_rt::main] async fn run() {
async fn main() {
let env = Env::default().filter_or("LOG_LEVEL", "info"); let env = Env::default().filter_or("LOG_LEVEL", "info");
env_logger::init_from_env(env); env_logger::init_from_env(env);
let settings = Settings::new().unwrap(); let settings = Settings::new().unwrap();
let game = Game::new().start(); let (in_s, in_r) = channel(10);
let (out_s, out_r) = channel(10);
listen_websocket(settings.bind, game).await; let game = Arc::new(Game::new(out_s));
task::spawn(async move {
game.clone().read_loop(in_r).await;
});
tokio::signal::ctrl_c().await.unwrap(); task::block_on(listen(settings.bind, in_s, out_r));
log::info!("Ctrl-C received, shutting down"); }
System::current().stop();
fn main() {
task::block_on(run());
} }

View file

@ -1,93 +1,102 @@
use crate::game::PlayerConnected; use async_std::sync::Arc;
use crate::Game; use async_std::sync::Mutex;
use actix::prelude::*;
use futures_util::StreamExt;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::WebSocketStream;
#[derive(Message)] use async_std::net::{SocketAddr, TcpListener, TcpStream};
#[rtype(result = "()")] use async_std::sync::{Receiver, Sender};
struct TcpConnect(pub TcpStream, pub SocketAddr); use async_std::task;
use async_tungstenite::{accept_async, WebSocketStream};
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
#[derive(Message)] #[derive(Debug)]
#[rtype(result = "()")] pub struct NetworkMessage {
struct WSConnect(pub WebSocketStream<TcpStream>, pub SocketAddr); pub conn_id: usize,
pub data: tungstenite::Message,
}
struct Connection { struct Connection {
stream: WebSocketStream<TcpStream>, stream: SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>,
addr: SocketAddr,
conn_id: usize, conn_id: usize,
} }
struct Server { pub struct NetworkManager {
game: Addr<Game>,
connections: HashMap<usize, Connection>, connections: HashMap<usize, Connection>,
} }
impl Actor for Server { impl NetworkManager {
type Context = Context<Self>; pub fn new() -> Self {
} NetworkManager {
impl Server {
pub fn new(game: Addr<Game>) -> Self {
Server {
game,
connections: HashMap::new(), connections: HashMap::new(),
} }
} }
}
/// Handle stream of TcpStream's fn register(&mut self, conn: Connection) {
impl Handler<TcpConnect> for Server { self.connections.insert(conn.conn_id, conn);
/// this is response for message, which is defined by `ResponseType` trait }
/// in this case we just return unit.
type Result = ();
fn handle(&mut self, msg: TcpConnect, ctx: &mut Context<Self>) { async fn send(&self, message: NetworkMessage) {
accept_websocket(msg) let conn = self
.into_actor(self) .connections
.then(move |res, act, _| { .get(&message.conn_id)
// Assign random id to player .expect("cant send message to an unregistered connection");
let conn_id = rand::random(); conn.stream.send(message.data).await;
act.connections.insert(
conn_id,
Connection {
stream: res.0,
addr: res.1,
conn_id,
},
);
act.game
.send(PlayerConnected { conn_id })
.into_actor(act)
.then(move |_, act, _| async {}.into_actor(act))
})
.wait(ctx);
} }
} }
async fn accept_websocket(msg: TcpConnect) -> WSConnect { pub async fn listen(
let stream = tokio_tungstenite::accept_async(msg.0) net: Arc<NetworkManager>,
.await bind: String,
.expect("Could not accept as websocket"); incoming: Sender<NetworkMessage>,
WSConnect(stream, msg.1) outgoing: Receiver<NetworkMessage>,
} ) {
let network = Arc::new(Mutex::new(NetworkManager::new()));
pub async fn listen_websocket(bind: String, game: Addr<Game>) { let listener = TcpListener::bind(&bind).await.expect("Can't listen");
let try_socket = TcpListener::bind(&bind).await;
let listener = Box::new(try_socket.expect("Failed to bind"));
log::info!("Listening on: {}", bind); log::info!("Listening on: {}", bind);
task::spawn(write_loop(network.clone(), outgoing));
Server::create(move |ctx| { while let Ok((stream, _)) = listener.accept().await {
ctx.add_message_stream(Box::leak(listener).incoming().map(|st| { let peer = stream
let st = st.expect("could not accept socket"); .peer_addr()
let addr = st .expect("connected streams should have a peer address");
.peer_addr() log::info!("Peer address: {}", peer);
.expect("could not retrieve connection address"); let conn = accept_connection(peer, stream, incoming.clone()).await;
TcpConnect(st, addr) network.lock().await.register(conn);
})); }
Server::new(game) }
});
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
incoming: Sender<NetworkMessage>,
) -> Connection {
let conn_id = rand::random();
let ws_stream = accept_async(stream).await.expect("could not accept");
log::info!("New WebSocket connection: {}", peer);
let (ws_sender, ws_receiver) = ws_stream.split();
task::spawn(read_loop(ws_receiver, incoming, conn_id));
Connection {
stream: ws_sender,
conn_id,
}
}
async fn read_loop(
mut stream: SplitStream<WebSocketStream<TcpStream>>,
ch: Sender<NetworkMessage>,
conn_id: usize,
) {
loop {
let data = stream
.next()
.await
.expect("failed getting the next message")
.expect("received error while reading");
ch.send(NetworkMessage { conn_id, data }).await;
}
}
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage>) {
loop {
let msg = ch.recv().await.expect("failed to receive outgoing message");
net.lock().await.send(msg).await;
}
} }