Compare commits

...

3 commits

5 changed files with 534 additions and 692 deletions

974
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -5,15 +5,16 @@ authors = ["Hamcha <hamcha@crunchy.rocks>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
tungstenite = "0.11" async-std = { version = "1.6", features = ["unstable"] }
shipyard = { version = "0.4", features = ["parallel", "serde"] } async-tungstenite = "0.8"
config = "0.10" config = "0.10"
env_logger = "0.7"
futures-util = "0.3"
log = { version = "0.4", features = ["std", "serde"] }
rand = "0.7"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
serde_derive = "1" serde_derive = "1"
tokio = { version = "0.2", features = ["full"] } serde_json = "1"
tokio-tungstenite = "0.11" shipyard = { version = "0.4", features = ["parallel", "serde"] }
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] } tungstenite = "0.11"
log = { version = "0.4", features = ["std", "serde"] }
ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] } ultraviolet = { version = "0.7", features = [ "f64", "int", "serde" ] }
env_logger = "0.7"
actix = "0.10"

44
src/game.rs Normal file
View file

@ -0,0 +1,44 @@
use crate::network::NetworkMessage;
use async_std::sync::{Receiver, Sender};
use shipyard::World;
use std::collections::HashMap;
// Minimum delay between updates, in milliseconds
const MIN_UPDATE_MS: u64 = 10;
pub struct Player {}
pub struct Game {
world: World,
players: HashMap<usize, Player>,
net_out: Sender<NetworkMessage>,
}
impl Game {
pub fn new(net_out: Sender<NetworkMessage>) -> Self {
// Create world
let world = World::default();
// Create workload
world.add_workload("update").build();
Game {
world,
players: HashMap::new(),
net_out,
}
}
pub async fn read_loop(&self, net_in: Receiver<NetworkMessage>) {
loop {
let message = net_in.recv().await.expect("could not read from channel");
log::info!("Message received: {:?}", &message);
self.net_out
.send(NetworkMessage {
conn_id: message.conn_id,
data: tungstenite::Message::text("hello"),
})
.await;
}
}
}

View file

@ -1,90 +1,33 @@
mod components; mod components;
mod config; mod config;
mod game;
mod network;
mod systems; mod systems;
use crate::config::Settings; use crate::config::Settings;
use actix::prelude::*; use crate::game::Game;
use crate::network::{listen, NetworkManager};
use async_std::sync::Arc;
use async_std::{sync::channel, task};
use env_logger::Env; use env_logger::Env;
use log::info;
use shipyard::World;
use std::time::Duration;
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::WebSocketStream;
// Minimum delay between updates, in milliseconds async fn run() {
const MIN_UPDATE_MS: u64 = 10;
struct Game {
world: World,
}
impl Actor for Game {
type Context = Context<Game>;
fn started(&mut self, ctx: &mut Self::Context) {
// Start update loop
ctx.run_interval(Duration::from_millis(MIN_UPDATE_MS), |game, _ctx| {
game.world.run_workload("update");
});
}
}
struct NetworkConnection {
stream: WebSocketStream<TcpStream>,
}
impl Actor for NetworkConnection {
type Context = Context<NetworkConnection>;
fn started(&mut self, _ctx: &mut Self::Context) {
//TODO Read
}
}
struct NetworkManager {}
impl Actor for NetworkManager {
type Context = Context<NetworkManager>;
}
fn main() {
let env = Env::default().filter_or("LOG_LEVEL", "info"); let env = Env::default().filter_or("LOG_LEVEL", "info");
env_logger::init_from_env(env); env_logger::init_from_env(env);
let settings = Settings::new().unwrap(); let settings = Settings::new().unwrap();
let system = System::new("main"); let (in_s, in_r) = channel(10);
Game::create(|_ctx| { let (out_s, out_r) = channel(10);
// Create world
let world = World::default();
// Create workload let game = Arc::new(Game::new(out_s));
world.add_workload("update").build(); task::spawn(async move {
game.clone().read_loop(in_r).await;
Game { world }
}); });
Arbiter::spawn(listen_websocket(settings.bind)); task::block_on(listen(settings.bind, in_s, out_r));
system.run().unwrap();
} }
async fn listen_websocket(bind: String) { fn main() {
let try_socket = TcpListener::bind(&bind).await; task::block_on(run());
let mut listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", bind);
while let Ok((stream, _)) = listener.accept().await {
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
info!("Peer address: {}", addr);
let stream = tokio_tungstenite::accept_async(stream)
.await
.expect("Error during the websocket handshake occurred");
info!("New WebSocket connection: {}", addr);
NetworkConnection::create(|_ctx| NetworkConnection { stream });
}
} }

102
src/network.rs Normal file
View file

@ -0,0 +1,102 @@
use async_std::sync::Arc;
use async_std::sync::Mutex;
use std::collections::HashMap;
use async_std::net::{SocketAddr, TcpListener, TcpStream};
use async_std::sync::{Receiver, Sender};
use async_std::task;
use async_tungstenite::{accept_async, WebSocketStream};
use futures_util::{stream::SplitSink, stream::SplitStream, SinkExt, StreamExt};
#[derive(Debug)]
pub struct NetworkMessage {
pub conn_id: usize,
pub data: tungstenite::Message,
}
struct Connection {
stream: SplitSink<WebSocketStream<TcpStream>, tungstenite::Message>,
conn_id: usize,
}
pub struct NetworkManager {
connections: HashMap<usize, Connection>,
}
impl NetworkManager {
pub fn new() -> Self {
NetworkManager {
connections: HashMap::new(),
}
}
fn register(&mut self, conn: Connection) {
self.connections.insert(conn.conn_id, conn);
}
async fn send(&self, message: NetworkMessage) {
let conn = self
.connections
.get(&message.conn_id)
.expect("cant send message to an unregistered connection");
conn.stream.send(message.data).await;
}
}
pub async fn listen(
net: Arc<NetworkManager>,
bind: String,
incoming: Sender<NetworkMessage>,
outgoing: Receiver<NetworkMessage>,
) {
let network = Arc::new(Mutex::new(NetworkManager::new()));
let listener = TcpListener::bind(&bind).await.expect("Can't listen");
log::info!("Listening on: {}", bind);
task::spawn(write_loop(network.clone(), outgoing));
while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
log::info!("Peer address: {}", peer);
let conn = accept_connection(peer, stream, incoming.clone()).await;
network.lock().await.register(conn);
}
}
async fn accept_connection(
peer: SocketAddr,
stream: TcpStream,
incoming: Sender<NetworkMessage>,
) -> Connection {
let conn_id = rand::random();
let ws_stream = accept_async(stream).await.expect("could not accept");
log::info!("New WebSocket connection: {}", peer);
let (ws_sender, ws_receiver) = ws_stream.split();
task::spawn(read_loop(ws_receiver, incoming, conn_id));
Connection {
stream: ws_sender,
conn_id,
}
}
async fn read_loop(
mut stream: SplitStream<WebSocketStream<TcpStream>>,
ch: Sender<NetworkMessage>,
conn_id: usize,
) {
loop {
let data = stream
.next()
.await
.expect("failed getting the next message")
.expect("received error while reading");
ch.send(NetworkMessage { conn_id, data }).await;
}
}
async fn write_loop(net: Arc<Mutex<NetworkManager>>, ch: Receiver<NetworkMessage>) {
loop {
let msg = ch.recv().await.expect("failed to receive outgoing message");
net.lock().await.send(msg).await;
}
}