From e2767990e83f8400ca0394a96b104cf069239084 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 2 Apr 2019 17:50:45 -0700 Subject: [PATCH 01/11] redis WIP --- Cargo.lock | 52 ++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 2 ++ docker-compose.yml | 12 ++++++++++- src/main.rs | 22 ++++++-------------- src/server.rs | 22 ++++++++++++++++++++ 5 files changed, 93 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b0c2ee3..9b12cd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,6 +146,11 @@ name = "ascii" version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "ascii" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "atty" version = "0.2.11" @@ -284,6 +289,18 @@ dependencies = [ "bitflags 1.0.4 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "combine" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "ascii 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "either 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "cookie" version = "0.11.0" @@ -932,6 +949,7 @@ dependencies = [ "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "redis 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.39 (registry+https://github.com/rust-lang/crates.io-index)", "set 0.1.0 (git+https://github.com/GeneralSet/Set)", @@ -1243,6 +1261,22 @@ dependencies = [ "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "redis" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)", + "combine 3.8.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "sha1 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-executor 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-tcp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "redox_syscall" version = "0.1.51" @@ -1905,6 +1939,14 @@ name = "unicode-xid" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "unreachable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "untrusted" version = "0.6.2" @@ -1974,6 +2016,11 @@ name = "version_check" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "walrus" version = "0.4.0" @@ -2201,6 +2248,7 @@ dependencies = [ "checksum arc-swap 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1025aeae2b664ca0ea726a89d574fe8f4e77dd712d443236ad1de00379450cf6" "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" "checksum ascii 0.8.7 (registry+https://github.com/rust-lang/crates.io-index)" = "97be891acc47ca214468e09425d02cef3af2c94d0d82081cd02061f996802f14" +"checksum ascii 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a5fc969a8ce2c9c0c4b0429bb8431544f6658283c8326ba5ff8c762b75369335" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a6d640bee2da49f60a4068a7fae53acde8982514ab7bae8b8cea9e88cbcfd799" "checksum backtrace 0.3.14 (registry+https://github.com/rust-lang/crates.io-index)" = "cd5a90e2b463010cd0e0ce9a11d4a9d5d58d9f41d4a6ba3dcaf9e68b466e88b4" @@ -2219,6 +2267,7 @@ dependencies = [ "checksum chrono 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "45912881121cb26fad7c38c17ba7daa18764771836b34fab7d3fbd93ed633878" "checksum chunked_transfer 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "498d20a7aaf62625b9bf26e637cf7736417cde1d0c99f1d04d1170229a85cf87" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +"checksum combine 3.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "da3da6baa321ec19e1cc41d31bf599f00c783d0517095cdaf0332e3fe8d20680" "checksum cookie 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1465f8134efa296b4c19db34d909637cb2bf0f7aaf21299e23e18fa29ac557cf" "checksum crc 1.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d663548de7f5cca343f1e0a48d14dcfb0e9eb4e079ec58883b7251539fa10aeb" "checksum crc32fast 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e91d5240c6975ef33aeb5f148f35275c25eda8e8a5f95abe421978b05b8bf192" @@ -2327,6 +2376,7 @@ dependencies = [ "checksum rayon 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "373814f27745b2686b350dd261bfd24576a6fb0e2c5919b3a2b6005f820b0473" "checksum rayon-core 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b055d1e92aba6877574d8fe604a63c8b5df60f60e5982bf7ccbb1338ea527356" "checksum rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" +"checksum redis 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b543b95de413ac964ca609e91fd9fd58419312e69988fb197f3ff8770312a1af" "checksum redox_syscall 0.1.51 (registry+https://github.com/rust-lang/crates.io-index)" = "423e376fffca3dfa06c9e9790a9ccd282fafb3cc6e6397d01dbf64f9bacc6b85" "checksum redox_termios 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7e891cfe48e9100a70a3b6eb652fef28920c117d366339687bd5576160db0f76" "checksum regex 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "53ee8cfdddb2e0291adfb9f13d31d3bbe0a03c9a402c01b1e24188d86c35b24f" @@ -2395,6 +2445,7 @@ dependencies = [ "checksum unicode-normalization 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "141339a08b982d942be2ca06ff8b076563cbe223d1befd5450716790d44e2426" "checksum unicode-segmentation 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "aa6024fc12ddfd1c6dbc14a80fa2324d4568849869b779f6bd37e5e4c03344d1" "checksum unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" +"checksum unreachable 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "382810877fe448991dfc7f0dd6e3ae5d58088fd0ea5e35189655f84e6814fa56" "checksum untrusted 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "55cd1f4b4e96b46aeb8d4855db4a7a9bd96eeeb5c6a1ab54593328761642ce2f" "checksum url 1.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" "checksum utf8-ranges 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "796f7e48bef87609f7ade7e06495a87d5cd06c7866e6a5cbfceffc558a243737" @@ -2404,6 +2455,7 @@ dependencies = [ "checksum v_htmlescape 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "020cae817dc82693aa523f01087b291b1c7a9ac8cea5c12297963f21769fb27f" "checksum vcpkg 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "def296d3eb3b12371b2c7d0e83bfe1403e4db2d7a0bba324a12b21c4ee13143d" "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" +"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" "checksum walrus 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac68580ebc0a40571ba97f43735f91f36eecccd799bd4cd15048ca5be4f13c63" "checksum walrus-macro 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2091d3de555b62f4c2e0f564daa2cb13cb4635cd95c5eb297405a970f72a7f87" "checksum wasm-bindgen 0.2.38 (registry+https://github.com/rust-lang/crates.io-index)" = "f7148f7446b911dd8e8d490a4fd44bf7fcd6caa21ba2fed9aef7bf34b9a974d4" diff --git a/Cargo.toml b/Cargo.toml index 14d05e7..931a160 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,5 @@ serde_json = "1.0" actix = "0.7" actix-web = "0.7" + +redis = "0.10.0" diff --git a/docker-compose.yml b/docker-compose.yml index 179ed4f..5dbbe01 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,4 +5,14 @@ services: - RUST_ENV=docker build: . ports: - - '3001:3001' \ No newline at end of file + - '3001:3001' + depends_on: + - redis + redis: + image: redis + ports: + - '6379:6379' + volumes: + - redis-data:/data +volumes: + redis-data: \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 990a50e..347fe1f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ extern crate actix; extern crate actix_web; extern crate serde; extern crate serde_json; +extern crate redis; extern crate set; extern crate rand; @@ -177,30 +178,19 @@ fn main() { let sys = actix::System::new("multiplayer-server"); - // Start server actor in separate thread - let server = Arbiter::start(|_| server::Server::default()); - // Create Http server with websocket support - let env = match env::var("RUST_ENV") { - Result::Ok(env) => env, - _ => "development".to_string(), - }; - let address = match env.as_str() { - "docker" => "0.0.0.0:3001", - _ => "127.0.0.1:3001", - }; - error!("{}", address); HttpServer::new(move || { + // Start server actor in separate thread + let addr = Arbiter::start(|_| server::Server::default()); + // Websocket sessions state - let state = WsSessionState { - addr: server.clone(), - }; + let state = WsSessionState { addr }; App::with_state(state) // websocket .resource("/", |r| r.route().f(chat_route)) - }).bind(address) + }).bind("0.0.0.0:3001") .unwrap() .start(); diff --git a/src/server.rs b/src/server.rs index 95a3ddc..8fbb4af 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ use actix::prelude::*; use std::collections::{HashMap}; use serde_json::{Result as JSON_Result}; use serde::{Deserialize, Serialize}; +use redis::Commands; use set::Set; /// Server sends this messages to session @@ -32,6 +33,7 @@ pub struct GameUpdateMessage { gameState: Game, } +#[derive(Serialize, Deserialize)] pub struct User { addr: Recipient, name: String, @@ -53,6 +55,7 @@ pub struct Game { previousSelection: Option } +#[derive(Serialize, Deserialize)] pub struct Lobby { users: HashMap, game_type: Option, @@ -63,12 +66,14 @@ pub struct Lobby { /// sessions. implementation is super primitive pub struct Server { sessions: HashMap, + redis: redis::Client } impl Default for Server { fn default() -> Server { Server { sessions: HashMap::new(), + redis: redis::Client::open("redis://redis:6379").unwrap(), } } } @@ -162,6 +167,23 @@ impl Handler for Server { fn handle(&mut self, msg: Join, _: &mut Context) { let Join { id, addr, username, room_name } = msg; + // let client = self.redis.clone(); + // let con = client.get_connection().unwrap(); + + // let lobby = con.get(room_name.clone()).unwrap_or(None); + // if lobby.is_none() { + // let blank_lobby = Lobby { + // users: HashMap::new(), + // game_type: None, + // game_state: None + // }; + // let blank_lobby_str = match serde_json::to_string(&blank_lobby) { + // JSON_Result::Ok(u) => u, + // _ => panic!("Unable to serialize blank lobby") + // }; + // let _ : () = con.set(&room_name, blank_lobby_str).unwrap(); + // } + if self.sessions.get_mut(&room_name).is_none() { self.sessions.insert( room_name.clone(), From 4dbdd83a530dd15e40281f6e5a7a3ffdbf89bfa0 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 2 Apr 2019 21:59:43 -0700 Subject: [PATCH 02/11] conver join to use redis instance --- src/server.rs | 109 +++++++++++++++++++++++--------------------------- 1 file changed, 50 insertions(+), 59 deletions(-) diff --git a/src/server.rs b/src/server.rs index 8fbb4af..02ec8a4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -35,7 +35,6 @@ pub struct GameUpdateMessage { #[derive(Serialize, Deserialize)] pub struct User { - addr: Recipient, name: String, points: isize, } @@ -66,6 +65,8 @@ pub struct Lobby { /// sessions. implementation is super primitive pub struct Server { sessions: HashMap, + // TODO: rename user_addrs to sessions + user_addrs: HashMap>, redis: redis::Client } @@ -73,6 +74,7 @@ impl Default for Server { fn default() -> Server { Server { sessions: HashMap::new(), + user_addrs: HashMap::new(), redis: redis::Client::open("redis://redis:6379").unwrap(), } } @@ -87,30 +89,28 @@ impl Actor for Server { #[allow(unused_must_use)] impl Server { - fn emit_users(&mut self, room_name: &String, _skip_id: usize) { - if let Some(session) = self.sessions.get_mut(room_name) { + fn emit_users(&mut self, room_name: &String, _skip_id: usize, users: HashMap) { + let mut message = UserMessage { + eventType: "users".to_string(), + users: Vec::new(), + }; - let mut message = UserMessage { - eventType: "users".to_string(), - users: Vec::new(), - }; - for user in session.users.values() { - message.users.push( - ClientUser { - name: user.name.clone(), - points: user.points.clone(), - } - ) - } - let message_string = match serde_json::to_string(&message) { - JSON_Result::Ok(u) => u, - _ => panic!("Not able to serialize users") - }; + for user in users.values() { + message.users.push( + ClientUser { + name: user.name.clone(), + points: user.points.clone(), + } + ) + } - for (_id, user) in &session.users { - // TODO continue if skip_id == user key - user.addr.do_send(Message(message_string.to_owned())); - } + let message_string = serde_json::to_string(&message).unwrap(); + for (id, user) in users { + // TODO continue if skip_id == user key + match self.user_addrs.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; } } @@ -127,7 +127,7 @@ impl Server { }; for (_id, user) in &session.users { // TODO continue if skip_id == user key - user.addr.do_send(Message(message_string.to_owned())); + // user.addr.do_send(Message(message_string.to_owned())); } } @@ -145,7 +145,7 @@ impl Server { }; for (_id, user) in &session.users { // TODO continue if skip_id == user key - user.addr.do_send(Message(message_string.to_owned())); + // user.addr.do_send(Message(message_string.to_owned())); } } } @@ -166,44 +166,35 @@ impl Handler for Server { fn handle(&mut self, msg: Join, _: &mut Context) { let Join { id, addr, username, room_name } = msg; + + // add refrence to user addr to server session + self.user_addrs.insert(id, addr); + + // get/create lobby + let con = self.redis.get_connection().unwrap(); + let existing_lobby: redis::RedisResult = con.get("room_name"); + let mut new_lobby: Lobby = match existing_lobby { + Ok(l) => { + serde_json::from_str(&l).unwrap() + }, + _ => Lobby { + users: HashMap::new(), + game_type: None, + game_state: None + } + }; - // let client = self.redis.clone(); - // let con = client.get_connection().unwrap(); - - // let lobby = con.get(room_name.clone()).unwrap_or(None); - // if lobby.is_none() { - // let blank_lobby = Lobby { - // users: HashMap::new(), - // game_type: None, - // game_state: None - // }; - // let blank_lobby_str = match serde_json::to_string(&blank_lobby) { - // JSON_Result::Ok(u) => u, - // _ => panic!("Unable to serialize blank lobby") - // }; - // let _ : () = con.set(&room_name, blank_lobby_str).unwrap(); - // } - - if self.sessions.get_mut(&room_name).is_none() { - self.sessions.insert( - room_name.clone(), - Lobby { - users: HashMap::new(), - game_type: None, - game_state: None, - } - ); - } - - self.sessions.get_mut(&room_name).unwrap().users.insert( + // add new user to lobby + new_lobby.users.insert( id, User { - addr: addr, name: username, - points: 0, + points: 0 } ); - self.emit_users(&room_name, id); + + // TODO: call emits from redis subscription + self.emit_users(&room_name, id, new_lobby.users); } } @@ -290,7 +281,7 @@ impl Handler for Server { user.points = user.points + 1; } - self.emit_users(&room_name, id); + // self.emit_users(&room_name, id, self.sessions.get(&room_name).unwrap().users); let name = self.sessions.get(&room_name).unwrap().users.get(&id).unwrap().name.to_string(); @@ -325,7 +316,7 @@ impl Handler for Server { user.points = user.points - 1; } - self.emit_users(&room_name, id); + // self.emit_users(&room_name, id, self.sessions.get(&room_name).unwrap().users); let name = self.sessions.get(&room_name).unwrap().users.get(&id).unwrap().name.to_string(); From f4b91614a719d9a5ff7f60542c2b3272b4e4e3fc Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 2 Apr 2019 22:21:42 -0700 Subject: [PATCH 03/11] redis for game type --- src/main.rs | 1 - src/server.rs | 52 +++++++++++++++++++++++++++++++++------------------ 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/main.rs b/src/main.rs index 347fe1f..e223f81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,6 @@ extern crate redis; extern crate set; extern crate rand; -use std::env; use rand::prelude::*; use serde_json::{Result as JSON_Result}; use serde::{Deserialize, Serialize}; diff --git a/src/server.rs b/src/server.rs index 02ec8a4..7bd6ef9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -89,7 +89,7 @@ impl Actor for Server { #[allow(unused_must_use)] impl Server { - fn emit_users(&mut self, room_name: &String, _skip_id: usize, users: HashMap) { + fn emit_users(&mut self, users: HashMap) { let mut message = UserMessage { eventType: "users".to_string(), users: Vec::new(), @@ -105,7 +105,7 @@ impl Server { } let message_string = serde_json::to_string(&message).unwrap(); - for (id, user) in users { + for (id, _user) in users { // TODO continue if skip_id == user key match self.user_addrs.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), @@ -114,9 +114,7 @@ impl Server { } } - fn emit_game_type(&mut self, room_name: &String) { - let session = self.sessions.get(room_name).unwrap(); - let game_type = session.game_type.as_ref().unwrap(); + fn emit_game_type(&mut self, game_type: &String, users: HashMap) { let message = GameTypeMessage { eventType: "setGameType".to_string(), gameType: game_type.to_string(), @@ -125,9 +123,12 @@ impl Server { JSON_Result::Ok(u) => u, _ => panic!("Not able to serialize users") }; - for (_id, user) in &session.users { + for (id, _user) in users { // TODO continue if skip_id == user key - // user.addr.do_send(Message(message_string.to_owned())); + match self.user_addrs.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; } } @@ -143,10 +144,12 @@ impl Server { JSON_Result::Ok(u) => u, _ => panic!("Not able to serialize users") }; - for (_id, user) in &session.users { + for (id, _user) in &session.users { // TODO continue if skip_id == user key - // user.addr.do_send(Message(message_string.to_owned())); - } + match self.user_addrs.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; } } } @@ -172,7 +175,7 @@ impl Handler for Server { // get/create lobby let con = self.redis.get_connection().unwrap(); - let existing_lobby: redis::RedisResult = con.get("room_name"); + let existing_lobby: redis::RedisResult = con.get(&room_name); let mut new_lobby: Lobby = match existing_lobby { Ok(l) => { serde_json::from_str(&l).unwrap() @@ -192,9 +195,11 @@ impl Handler for Server { points: 0 } ); + let lobby_json = serde_json::to_string(&new_lobby).unwrap(); + let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emits from redis subscription - self.emit_users(&room_name, id, new_lobby.users); + // TODO: call emit functions from redis subscription + self.emit_users(new_lobby.users); } } @@ -212,12 +217,23 @@ impl Handler for Server { fn handle(&mut self, msg: SetGameType, _: &mut Context) { let SetGameType { game_type, room_name } = msg; - if self.sessions.get_mut(&room_name).is_none() { - return - } + // get lobby + let con = self.redis.get_connection().unwrap(); + let existing_lobby: redis::RedisResult = con.get("room_name"); + let mut new_lobby: Lobby = match existing_lobby { + Ok(l) => { + serde_json::from_str(&l).unwrap() + }, + _ => panic!("requested lobby does not exist") + }; + + // update lobby game type + new_lobby.game_type = Some(game_type.clone()); + let lobby_json = serde_json::to_string(&new_lobby).unwrap(); + let _: () = con.set(&room_name, lobby_json).unwrap(); - self.sessions.get_mut(&room_name).unwrap().game_type = Some(game_type.clone()); - self.emit_game_type(&room_name); + // TODO: call emit functions from redis subscription + self.emit_game_type(&game_type, new_lobby.users); } } From 8b001bf555efbb4e27bfe6f4363927ac383fb477 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 2 Apr 2019 23:03:32 -0700 Subject: [PATCH 04/11] update game state actions to use redis --- src/server.rs | 104 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/src/server.rs b/src/server.rs index 7bd6ef9..6b067ec 100644 --- a/src/server.rs +++ b/src/server.rs @@ -89,7 +89,7 @@ impl Actor for Server { #[allow(unused_must_use)] impl Server { - fn emit_users(&mut self, users: HashMap) { + fn emit_users(&mut self, users: &HashMap) { let mut message = UserMessage { eventType: "users".to_string(), users: Vec::new(), @@ -132,10 +132,7 @@ impl Server { } } - fn emit_game_update(&mut self, room_name: &String) { - let session = self.sessions.get(room_name).unwrap(); - let game_state = session.game_state.as_ref().unwrap(); - + fn emit_game_update(&mut self, game_state: &Game, users: HashMap) { let message = GameUpdateMessage { eventType: "updateGame".to_string(), gameState: game_state.clone(), @@ -144,12 +141,13 @@ impl Server { JSON_Result::Ok(u) => u, _ => panic!("Not able to serialize users") }; - for (id, _user) in &session.users { + for (id, _user) in users { // TODO continue if skip_id == user key match self.user_addrs.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server - }; } + }; + } } } @@ -197,9 +195,8 @@ impl Handler for Server { ); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_users(new_lobby.users); + self.emit_users(&new_lobby.users); } } @@ -219,7 +216,7 @@ impl Handler for Server { // get lobby let con = self.redis.get_connection().unwrap(); - let existing_lobby: redis::RedisResult = con.get("room_name"); + let existing_lobby: redis::RedisResult = con.get(&room_name); let mut new_lobby: Lobby = match existing_lobby { Ok(l) => { serde_json::from_str(&l).unwrap() @@ -227,7 +224,7 @@ impl Handler for Server { _ => panic!("requested lobby does not exist") }; - // update lobby game type + // update lobby with new game type new_lobby.game_type = Some(game_type.clone()); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); @@ -250,10 +247,17 @@ impl Handler for Server { fn handle(&mut self, msg: StartGame, _: &mut Context) { let StartGame { room_name } = msg; - if self.sessions.get_mut(&room_name).is_none() { - return - } + // get lobby + let con = self.redis.get_connection().unwrap(); + let existing_lobby: redis::RedisResult = con.get(&room_name); + let mut new_lobby: Lobby = match existing_lobby { + Ok(l) => { + serde_json::from_str(&l).unwrap() + }, + _ => panic!("requested lobby does not exist") + }; + // init game state let set = Set::new(4, 3); let deck = set.init_deck(); let update_board = set.update_board(deck, "".to_string()); @@ -263,9 +267,14 @@ impl Handler for Server { numberOfSets: update_board.sets, previousSelection: None }; - self.sessions.get_mut(&room_name).unwrap().game_state = Some(game_state); - self.emit_game_update(&room_name); + + // update lobby with new game state + new_lobby.game_state = Some(game_state.clone()); + let lobby_json = serde_json::to_string(&new_lobby).unwrap(); + let _: () = con.set(&room_name, lobby_json).unwrap(); + // TODO: call emit functions from redis subscription + self.emit_game_update(&game_state, new_lobby.users); } } @@ -284,31 +293,39 @@ impl Handler for Server { fn handle(&mut self, msg: VerifySet, _: &mut Context) { let VerifySet { id, room_name, selected } = msg; - if self.sessions.get_mut(&room_name).is_none() { - return - } - + // get lobby + let con = self.redis.get_connection().unwrap(); + let existing_lobby: redis::RedisResult = con.get(&room_name); + let mut new_lobby: Lobby = match existing_lobby { + Ok(l) => { + serde_json::from_str(&l).unwrap() + }, + _ => panic!("requested lobby does not exist") + }; + + // check if selected cards are a set let set = Set::new(4, 3); let is_valid_set = set.is_set(selected.clone()); if is_valid_set { + // add a point to the user who selected the set { - let user = self.sessions.get_mut(&room_name).unwrap() - .users.get_mut(&id).unwrap(); + let user = new_lobby.users.get_mut(&id).unwrap(); user.points = user.points + 1; } + self.emit_users(&new_lobby.users); - // self.emit_users(&room_name, id, self.sessions.get(&room_name).unwrap().users); - - let name = self.sessions.get(&room_name).unwrap().users.get(&id).unwrap().name.to_string(); - - let old_board = self.sessions.get(&room_name).unwrap().game_state.as_ref().unwrap().board.to_string(); - let deck = self.sessions.get(&room_name).unwrap().game_state.as_ref().unwrap().deck.to_string(); + // get current game state + let name = new_lobby.users.get(&id).unwrap().name.to_string(); + let old_board = new_lobby.game_state.as_ref().unwrap().board.to_string(); + let deck = new_lobby.game_state.as_ref().unwrap().deck.to_string(); + // remove selected cards from the board let mut board: Vec<&str> = old_board.split(",").collect(); for card in selected.split(",") { board.remove_item(&card); } + // create updated game state let set = Set::new(4, 3); let update_board = set.update_board(deck, board.join(",")); let game_state = Game { @@ -322,29 +339,34 @@ impl Handler for Server { }), }; - self.sessions.get_mut(&room_name).unwrap().game_state = Some(game_state); - - self.emit_game_update(&room_name); + // update lobby with new game state + new_lobby.game_state = Some(game_state.clone()); + let lobby_json = serde_json::to_string(&new_lobby).unwrap(); + let _: () = con.set(&room_name, lobby_json).unwrap(); + // TODO: call emit functions from redis subscription + self.emit_game_update(&game_state, new_lobby.users); } else { + // subtract a point to the user who selected the set { - let user = self.sessions.get_mut(&room_name).unwrap() - .users.get_mut(&id).unwrap(); - user.points = user.points - 1; + let user = new_lobby.users.get_mut(&id).unwrap(); + user.points = user.points + 1; } + self.emit_users(&new_lobby.users); - // self.emit_users(&room_name, id, self.sessions.get(&room_name).unwrap().users); - - let name = self.sessions.get(&room_name).unwrap().users.get(&id).unwrap().name.to_string(); - + // create previous selection + let name = new_lobby.users.get(&id).unwrap().name.to_string(); let previousSelection = Some(Selection{ user: name, valid: false, selection: selected, }); - self.sessions.get_mut(&room_name).unwrap().game_state.as_mut().unwrap().previousSelection = previousSelection; - - self.emit_game_update(&room_name); + // update lobby with new previous selection + new_lobby.game_state.as_mut().unwrap().previousSelection = previousSelection; + let lobby_json = serde_json::to_string(&new_lobby).unwrap(); + let _: () = con.set(&room_name, lobby_json).unwrap(); + // TODO: call emit functions from redis subscription + self.emit_game_update(&new_lobby.game_state.unwrap(), new_lobby.users); } } } \ No newline at end of file From 3064babdd2d4b617d77619905e134679f13dd9ce Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 2 Apr 2019 23:10:50 -0700 Subject: [PATCH 05/11] use old session to store user addrs --- src/server.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/src/server.rs b/src/server.rs index 6b067ec..531cb80 100644 --- a/src/server.rs +++ b/src/server.rs @@ -64,9 +64,7 @@ pub struct Lobby { /// `Server` manages rooms and responsible for coordinating game /// sessions. implementation is super primitive pub struct Server { - sessions: HashMap, - // TODO: rename user_addrs to sessions - user_addrs: HashMap>, + sessions: HashMap>, redis: redis::Client } @@ -74,7 +72,6 @@ impl Default for Server { fn default() -> Server { Server { sessions: HashMap::new(), - user_addrs: HashMap::new(), redis: redis::Client::open("redis://redis:6379").unwrap(), } } @@ -107,7 +104,7 @@ impl Server { let message_string = serde_json::to_string(&message).unwrap(); for (id, _user) in users { // TODO continue if skip_id == user key - match self.user_addrs.get(&id) { + match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server }; @@ -125,7 +122,7 @@ impl Server { }; for (id, _user) in users { // TODO continue if skip_id == user key - match self.user_addrs.get(&id) { + match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server }; @@ -143,7 +140,7 @@ impl Server { }; for (id, _user) in users { // TODO continue if skip_id == user key - match self.user_addrs.get(&id) { + match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server }; @@ -169,7 +166,7 @@ impl Handler for Server { let Join { id, addr, username, room_name } = msg; // add refrence to user addr to server session - self.user_addrs.insert(id, addr); + self.sessions.insert(id, addr); // get/create lobby let con = self.redis.get_connection().unwrap(); From 7f93fb41fd8f2eadbcc44701964f222581ae7383 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 8 Apr 2019 21:13:16 -0700 Subject: [PATCH 06/11] redis pubsub demo --- docker-compose.yml | 1 + src/server.rs | 63 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 5dbbe01..e64f023 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -3,6 +3,7 @@ services: actix: environment: - RUST_ENV=docker + - RUST_LOG=debug build: . ports: - '3001:3001' diff --git a/src/server.rs b/src/server.rs index 531cb80..0f6fe1f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,8 +2,10 @@ use actix::prelude::*; use std::collections::{HashMap}; use serde_json::{Result as JSON_Result}; use serde::{Deserialize, Serialize}; -use redis::Commands; +use redis::{PubSubCommands, Commands, ControlFlow}; use set::Set; +use std::thread; +use std::time::Duration; /// Server sends this messages to session #[derive(Message)] @@ -86,13 +88,44 @@ impl Actor for Server { #[allow(unused_must_use)] impl Server { - fn emit_users(&mut self, users: &HashMap) { + fn subscribe( + client: redis::Client, + sessions: &HashMap>, + channel: String + ) -> thread::JoinHandle<()> { + + thread::spawn(move || { + let mut conn = client.get_connection().unwrap(); + + conn.subscribe(&[channel], |msg| { + let ch = msg.get_channel_name(); + let payload: String = msg.get_payload().unwrap(); + match payload.as_ref() { + "exit" => ControlFlow::Break(()), + "user" => { + error!("emit user update here"); + emit_users() + ControlFlow::Continue + }, + a => { + error!("Channel '{}' received '{}'.", ch, a); + ControlFlow::Continue + } + } + }).unwrap(); + }) + } + + fn emit_users(&mut self, room_name: &String) { + let con = self.redis.get_connection().unwrap(); + let lobby: Lobby = con.get(&room_name).unwrap(); + let mut message = UserMessage { eventType: "users".to_string(), users: Vec::new(), }; - for user in users.values() { + for user in lobby.users.values() { message.users.push( ClientUser { name: user.name.clone(), @@ -103,7 +136,6 @@ impl Server { let message_string = serde_json::to_string(&message).unwrap(); for (id, _user) in users { - // TODO continue if skip_id == user key match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server @@ -146,6 +178,21 @@ impl Server { }; } } + + // fn redis_channel_router(msg: redis::RedisResult) { + // error!("something on the channel"); + // // redis::ControlFlow::Continue + // } +} + +fn publish(client: redis::Client, channel: String) { + thread::spawn(move || { + let conn = client.get_connection().unwrap(); + + thread::sleep(Duration::from_millis(500)); + error!("Publish {}.", 10); + let _: () = conn.publish(channel, 10).unwrap(); + }); } /// Join room, if room does not exists create new one. @@ -192,8 +239,12 @@ impl Handler for Server { ); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_users(&new_lobby.users); + // self.emit_users(&new_lobby.users); + + // subscribe to redis channel + let handle = subscribe(self.redis.clone(), &self.sessions, room_name.clone()); + publish(self.redis.clone(), room_name.clone()); + handle.join().unwrap(); } } From bba638c9164e124150572d15af89d2a1a77687c0 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 9 Apr 2019 15:15:30 -0700 Subject: [PATCH 07/11] redis pubsub for join working --- src/server.rs | 138 +++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 113 insertions(+), 25 deletions(-) diff --git a/src/server.rs b/src/server.rs index 0f6fe1f..b095725 100644 --- a/src/server.rs +++ b/src/server.rs @@ -89,14 +89,14 @@ impl Actor for Server { #[allow(unused_must_use)] impl Server { fn subscribe( - client: redis::Client, - sessions: &HashMap>, - channel: String - ) -> thread::JoinHandle<()> { + &mut self, + channel: String, + addr: Addr + ) -> () { + let client = self.redis.clone(); thread::spawn(move || { let mut conn = client.get_connection().unwrap(); - conn.subscribe(&[channel], |msg| { let ch = msg.get_channel_name(); let payload: String = msg.get_payload().unwrap(); @@ -104,7 +104,10 @@ impl Server { "exit" => ControlFlow::Break(()), "user" => { error!("emit user update here"); - emit_users() + addr.do_send(EmitUsers { + room_name: ch.to_string(), + }); + // emit_users(); ControlFlow::Continue }, a => { @@ -113,19 +116,16 @@ impl Server { } } }).unwrap(); - }) + }); } - fn emit_users(&mut self, room_name: &String) { - let con = self.redis.get_connection().unwrap(); - let lobby: Lobby = con.get(&room_name).unwrap(); - + fn emit_users(&mut self, users: &HashMap) { let mut message = UserMessage { eventType: "users".to_string(), users: Vec::new(), }; - for user in lobby.users.values() { + for user in users.values() { message.users.push( ClientUser { name: user.name.clone(), @@ -136,6 +136,7 @@ impl Server { let message_string = serde_json::to_string(&message).unwrap(); for (id, _user) in users { + // TODO continue if skip_id == user key match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), None => Ok(()) // user is not connected to this server @@ -143,6 +144,34 @@ impl Server { } } + + // fn emit_users(&mut self, room_name: &String) { + // let con = self.redis.get_connection().unwrap(); + // let lobby: Lobby = con.get(&room_name).unwrap(); + + // let mut message = UserMessage { + // eventType: "users".to_string(), + // users: Vec::new(), + // }; + + // for user in lobby.users.values() { + // message.users.push( + // ClientUser { + // name: user.name.clone(), + // points: user.points.clone(), + // } + // ) + // } + + // let message_string = serde_json::to_string(&message).unwrap(); + // for (id, _user) in users { + // match self.sessions.get(&id) { + // Some(addr) => addr.do_send(Message(message_string.to_owned())), + // None => Ok(()) // user is not connected to this server + // }; + // } + // } + fn emit_game_type(&mut self, game_type: &String, users: HashMap) { let message = GameTypeMessage { eventType: "setGameType".to_string(), @@ -185,15 +214,33 @@ impl Server { // } } -fn publish(client: redis::Client, channel: String) { - thread::spawn(move || { - let conn = client.get_connection().unwrap(); - - thread::sleep(Duration::from_millis(500)); - error!("Publish {}.", 10); - let _: () = conn.publish(channel, 10).unwrap(); - }); -} +// fn subscribe( +// client: redis::Client, +// channel: String +// ) -> () { + +// let mut conn = client.get_connection().unwrap(); + +// let _: () = conn.subscribe(&[channel], |msg| { +// let ch = msg.get_channel_name(); +// let payload: String = msg.get_payload().unwrap(); +// match payload.as_ref() { +// "exit" => ControlFlow::Break(()), +// a => { +// error!("Channel '{}' received '{}'.", ch, a); +// ControlFlow::Continue +// } +// } +// }).unwrap(); +// } + +// fn publish(client: redis::Client, channel: String) { +// let conn = client.get_connection().unwrap(); + +// thread::sleep(Duration::from_millis(500)); +// error!("Publish {}.", 10); +// let _: () = conn.publish(channel, 10).unwrap(); +// } /// Join room, if room does not exists create new one. #[derive(Message)] @@ -209,7 +256,7 @@ pub struct Join { impl Handler for Server { type Result = (); - fn handle(&mut self, msg: Join, _: &mut Context) { + fn handle(&mut self, msg: Join, ctx: &mut Context) { let Join { id, addr, username, room_name } = msg; // add refrence to user addr to server session @@ -242,9 +289,10 @@ impl Handler for Server { // self.emit_users(&new_lobby.users); // subscribe to redis channel - let handle = subscribe(self.redis.clone(), &self.sessions, room_name.clone()); - publish(self.redis.clone(), room_name.clone()); - handle.join().unwrap(); + let handle = self.subscribe(room_name.clone(), ctx.address()); + thread::sleep(Duration::from_millis(500)); + error!("Publish user."); + let _: () = con.publish(room_name.clone(), "user").unwrap(); } } @@ -417,4 +465,44 @@ impl Handler for Server { self.emit_game_update(&new_lobby.game_state.unwrap(), new_lobby.users); } } +} + +/// EmitUsers room, if room does not exists create new one. +#[derive(Message)] +pub struct EmitUsers { + room_name: String, +} + +impl Handler for Server { + type Result = (); + + fn handle(&mut self, msg: EmitUsers, _: &mut Context) { + let EmitUsers { room_name } = msg; + + let con = self.redis.get_connection().unwrap(); + let l: String = con.get(&room_name).unwrap(); + let lobby: Lobby = serde_json::from_str(&l).unwrap(); + + let mut message = UserMessage { + eventType: "users".to_string(), + users: Vec::new(), + }; + + for user in lobby.users.values() { + message.users.push( + ClientUser { + name: user.name.clone(), + points: user.points.clone(), + } + ) + } + + let message_string = serde_json::to_string(&message).unwrap(); + for (id, _user) in lobby.users { + match self.sessions.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; + } + } } \ No newline at end of file From 1c6af5a512cd74cf8ea6985720c5787e840f51dd Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 9 Apr 2019 16:30:57 -0700 Subject: [PATCH 08/11] convert all the server action to use redis pubsub --- src/server.rs | 239 ++++++++++++++++++++------------------------------ 1 file changed, 95 insertions(+), 144 deletions(-) diff --git a/src/server.rs b/src/server.rs index b095725..85fbf4e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -103,144 +103,32 @@ impl Server { match payload.as_ref() { "exit" => ControlFlow::Break(()), "user" => { - error!("emit user update here"); - addr.do_send(EmitUsers { + addr.do_send(EmitUsers { room_name: ch.to_string(),}); + ControlFlow::Continue + }, + "game_type" => { + error!("emit gameType update here"); + addr.do_send(EmitGameType { room_name: ch.to_string(), }); - // emit_users(); ControlFlow::Continue }, - a => { - error!("Channel '{}' received '{}'.", ch, a); + "game_state" => { + error!("emit gameState update here"); + addr.do_send(EmitGameState { + room_name: ch.to_string(), + }); ControlFlow::Continue + }, + a => { + panic!("Channel '{}' received '{}'.", ch, a); } } }).unwrap(); }); } - - fn emit_users(&mut self, users: &HashMap) { - let mut message = UserMessage { - eventType: "users".to_string(), - users: Vec::new(), - }; - - for user in users.values() { - message.users.push( - ClientUser { - name: user.name.clone(), - points: user.points.clone(), - } - ) - } - - let message_string = serde_json::to_string(&message).unwrap(); - for (id, _user) in users { - // TODO continue if skip_id == user key - match self.sessions.get(&id) { - Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server - }; - } - } - - - // fn emit_users(&mut self, room_name: &String) { - // let con = self.redis.get_connection().unwrap(); - // let lobby: Lobby = con.get(&room_name).unwrap(); - - // let mut message = UserMessage { - // eventType: "users".to_string(), - // users: Vec::new(), - // }; - - // for user in lobby.users.values() { - // message.users.push( - // ClientUser { - // name: user.name.clone(), - // points: user.points.clone(), - // } - // ) - // } - - // let message_string = serde_json::to_string(&message).unwrap(); - // for (id, _user) in users { - // match self.sessions.get(&id) { - // Some(addr) => addr.do_send(Message(message_string.to_owned())), - // None => Ok(()) // user is not connected to this server - // }; - // } - // } - - fn emit_game_type(&mut self, game_type: &String, users: HashMap) { - let message = GameTypeMessage { - eventType: "setGameType".to_string(), - gameType: game_type.to_string(), - }; - let message_string = match serde_json::to_string(&message) { - JSON_Result::Ok(u) => u, - _ => panic!("Not able to serialize users") - }; - for (id, _user) in users { - // TODO continue if skip_id == user key - match self.sessions.get(&id) { - Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server - }; - } - } - - fn emit_game_update(&mut self, game_state: &Game, users: HashMap) { - let message = GameUpdateMessage { - eventType: "updateGame".to_string(), - gameState: game_state.clone(), - }; - let message_string = match serde_json::to_string(&message) { - JSON_Result::Ok(u) => u, - _ => panic!("Not able to serialize users") - }; - for (id, _user) in users { - // TODO continue if skip_id == user key - match self.sessions.get(&id) { - Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server - }; - } - } - - // fn redis_channel_router(msg: redis::RedisResult) { - // error!("something on the channel"); - // // redis::ControlFlow::Continue - // } } -// fn subscribe( -// client: redis::Client, -// channel: String -// ) -> () { - -// let mut conn = client.get_connection().unwrap(); - -// let _: () = conn.subscribe(&[channel], |msg| { -// let ch = msg.get_channel_name(); -// let payload: String = msg.get_payload().unwrap(); -// match payload.as_ref() { -// "exit" => ControlFlow::Break(()), -// a => { -// error!("Channel '{}' received '{}'.", ch, a); -// ControlFlow::Continue -// } -// } -// }).unwrap(); -// } - -// fn publish(client: redis::Client, channel: String) { -// let conn = client.get_connection().unwrap(); - -// thread::sleep(Duration::from_millis(500)); -// error!("Publish {}.", 10); -// let _: () = conn.publish(channel, 10).unwrap(); -// } /// Join room, if room does not exists create new one. #[derive(Message)] @@ -258,7 +146,7 @@ impl Handler for Server { fn handle(&mut self, msg: Join, ctx: &mut Context) { let Join { id, addr, username, room_name } = msg; - + // add refrence to user addr to server session self.sessions.insert(id, addr); @@ -286,12 +174,12 @@ impl Handler for Server { ); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // self.emit_users(&new_lobby.users); - - // subscribe to redis channel + + // add user to redis channel let handle = self.subscribe(room_name.clone(), ctx.address()); - thread::sleep(Duration::from_millis(500)); - error!("Publish user."); + // TODO fix so publish happens after successfully subscribed + thread::sleep(Duration::from_millis(5)); + // publish updated state of the lobby to redis channel let _: () = con.publish(room_name.clone(), "user").unwrap(); } } @@ -325,8 +213,8 @@ impl Handler for Server { let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_game_type(&game_type, new_lobby.users); + // publish game type update + let _: () = con.publish(room_name.clone(), "game_type").unwrap(); } } @@ -369,8 +257,9 @@ impl Handler for Server { new_lobby.game_state = Some(game_state.clone()); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_game_update(&game_state, new_lobby.users); + + // publish game update to redis channel + let _: () = con.publish(room_name.clone(), "game_state").unwrap(); } } @@ -408,7 +297,6 @@ impl Handler for Server { let user = new_lobby.users.get_mut(&id).unwrap(); user.points = user.points + 1; } - self.emit_users(&new_lobby.users); // get current game state let name = new_lobby.users.get(&id).unwrap().name.to_string(); @@ -439,30 +327,33 @@ impl Handler for Server { new_lobby.game_state = Some(game_state.clone()); let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_game_update(&game_state, new_lobby.users); + + // publish game and user update to redis channel + let _: () = con.publish(room_name.clone(), "user").unwrap(); + let _: () = con.publish(room_name.clone(), "game_state").unwrap(); } else { // subtract a point to the user who selected the set { let user = new_lobby.users.get_mut(&id).unwrap(); - user.points = user.points + 1; + user.points = user.points - 1; } - self.emit_users(&new_lobby.users); // create previous selection let name = new_lobby.users.get(&id).unwrap().name.to_string(); let previousSelection = Some(Selection{ user: name, valid: false, - selection: selected, + selection: selected.to_string(), }); // update lobby with new previous selection new_lobby.game_state.as_mut().unwrap().previousSelection = previousSelection; let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // TODO: call emit functions from redis subscription - self.emit_game_update(&new_lobby.game_state.unwrap(), new_lobby.users); + + // publish game and user update to redis channel + let _: () = con.publish(room_name.clone(), "user").unwrap(); + let _: () = con.publish(room_name.clone(), "game_state").unwrap(); } } } @@ -497,6 +388,66 @@ impl Handler for Server { ) } + let message_string = serde_json::to_string(&message).unwrap(); + for (id, _user) in lobby.users { + match self.sessions.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; + } + } +} + +#[derive(Message)] +pub struct EmitGameType { + room_name: String, +} + +impl Handler for Server { + type Result = (); + + fn handle(&mut self, msg: EmitGameType, _: &mut Context) { + let EmitGameType { room_name } = msg; + + let con = self.redis.get_connection().unwrap(); + let l: String = con.get(&room_name).unwrap(); + let lobby: Lobby = serde_json::from_str(&l).unwrap(); + + let message = GameTypeMessage { + eventType: "setGameType".to_string(), + gameType: lobby.game_type.unwrap(), + }; + + let message_string = serde_json::to_string(&message).unwrap(); + for (id, _user) in lobby.users { + match self.sessions.get(&id) { + Some(addr) => addr.do_send(Message(message_string.to_owned())), + None => Ok(()) // user is not connected to this server + }; + } + } +} + +#[derive(Message)] +pub struct EmitGameState { + room_name: String, +} + +impl Handler for Server { + type Result = (); + + fn handle(&mut self, msg: EmitGameState, _: &mut Context) { + let EmitGameState { room_name } = msg; + + let con = self.redis.get_connection().unwrap(); + let l: String = con.get(&room_name).unwrap(); + let lobby: Lobby = serde_json::from_str(&l).unwrap(); + + let message = GameUpdateMessage { + eventType: "updateGame".to_string(), + gameState: lobby.game_state.unwrap(), + }; + let message_string = serde_json::to_string(&message).unwrap(); for (id, _user) in lobby.users { match self.sessions.get(&id) { From bb8487f8143b6d16d6af2470f73297ded4263616 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 9 Apr 2019 17:13:36 -0700 Subject: [PATCH 09/11] remove duration --- src/server.rs | 33 ++++++++++++++++++++++----------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/src/server.rs b/src/server.rs index 85fbf4e..908e36d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,9 @@ use actix::prelude::*; use std::collections::{HashMap}; -use serde_json::{Result as JSON_Result}; use serde::{Deserialize, Serialize}; use redis::{PubSubCommands, Commands, ControlFlow}; use set::Set; use std::thread; -use std::time::Duration; /// Server sends this messages to session #[derive(Message)] @@ -97,7 +95,7 @@ impl Server { thread::spawn(move || { let mut conn = client.get_connection().unwrap(); - conn.subscribe(&[channel], |msg| { + let _: () = conn.subscribe(&[channel.clone()], |msg| { let ch = msg.get_channel_name(); let payload: String = msg.get_payload().unwrap(); match payload.as_ref() { @@ -147,8 +145,10 @@ impl Handler for Server { fn handle(&mut self, msg: Join, ctx: &mut Context) { let Join { id, addr, username, room_name } = msg; + // add user to redis channel + self.subscribe(room_name.clone(), ctx.address()); // add refrence to user addr to server session - self.sessions.insert(id, addr); + self.sessions.insert(id, addr.clone()); // get/create lobby let con = self.redis.get_connection().unwrap(); @@ -175,12 +175,23 @@ impl Handler for Server { let lobby_json = serde_json::to_string(&new_lobby).unwrap(); let _: () = con.set(&room_name, lobby_json).unwrap(); - // add user to redis channel - let handle = self.subscribe(room_name.clone(), ctx.address()); - // TODO fix so publish happens after successfully subscribed - thread::sleep(Duration::from_millis(5)); // publish updated state of the lobby to redis channel let _: () = con.publish(room_name.clone(), "user").unwrap(); + // update client that joined with lobby + let mut message = UserMessage { + eventType: "users".to_string(), + users: Vec::new(), + }; + for user in new_lobby.users.values() { + message.users.push( + ClientUser { + name: user.name.clone(), + points: user.points.clone(), + } + ) + } + let message_string = serde_json::to_string(&message).unwrap(); + addr.do_send(Message(message_string.to_owned())); } } @@ -392,7 +403,7 @@ impl Handler for Server { for (id, _user) in lobby.users { match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server + _ => Ok(()) // user is not connected to this server }; } } @@ -422,7 +433,7 @@ impl Handler for Server { for (id, _user) in lobby.users { match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server + _ => Ok(()) // user is not connected to this server }; } } @@ -452,7 +463,7 @@ impl Handler for Server { for (id, _user) in lobby.users { match self.sessions.get(&id) { Some(addr) => addr.do_send(Message(message_string.to_owned())), - None => Ok(()) // user is not connected to this server + _ => Ok(()) // user is not connected to this server }; } } From 541b6c992dc30f4399c9bee34a5d88fd7993caf6 Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 29 Apr 2019 16:47:29 -0700 Subject: [PATCH 10/11] adds image name of the dockerhub repository this is needed so that kompose can pull it inot the kubernetes yaml files --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index e64f023..820d3f3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,7 @@ version: '3' services: actix: + image: generalset/multiplayer_server:init environment: - RUST_ENV=docker - RUST_LOG=debug From 4757f2115fc3414577908ce57e5c721bb17b57ae Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 29 Apr 2019 17:00:29 -0700 Subject: [PATCH 11/11] run on kubernetes using minikude --- README.md | 8 ++++++ actix-deployment.yaml | 32 ++++++++++++++++++++++++ actix-service.yaml | 19 +++++++++++++++ redis-data-persistentvolumeclaim.yaml | 14 +++++++++++ redis-deployment.yaml | 35 +++++++++++++++++++++++++++ redis-service.yaml | 19 +++++++++++++++ 6 files changed, 127 insertions(+) create mode 100644 actix-deployment.yaml create mode 100644 actix-service.yaml create mode 100644 redis-data-persistentvolumeclaim.yaml create mode 100644 redis-deployment.yaml create mode 100644 redis-service.yaml diff --git a/README.md b/README.md index 5f8b3f9..04cd3c4 100644 --- a/README.md +++ b/README.md @@ -9,4 +9,12 @@ cargo run ## Run With Docker ``` docker-compose up +``` + +## notes +``` +kompose convert +kubectl apply -f actix-service.yaml actix-doployment.yaml +kubectl apply -f redis-service.yaml redis-doployment.yaml redis-data-persistentvolumeclaim.yaml +kubectl expose deployment/actix --type="NodePort" --port 3001 --name=mp ``` \ No newline at end of file diff --git a/actix-deployment.yaml b/actix-deployment.yaml new file mode 100644 index 0000000..23df07e --- /dev/null +++ b/actix-deployment.yaml @@ -0,0 +1,32 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + annotations: + kompose.cmd: kompose convert + kompose.version: 1.18.0 () + creationTimestamp: null + labels: + io.kompose.service: actix + name: actix +spec: + replicas: 1 + strategy: {} + template: + metadata: + creationTimestamp: null + labels: + io.kompose.service: actix + spec: + containers: + - env: + - name: RUST_ENV + value: docker + - name: RUST_LOG + value: debug + image: generalset/multiplayer_server:init + name: actix + ports: + - containerPort: 3001 + resources: {} + restartPolicy: Always +status: {} diff --git a/actix-service.yaml b/actix-service.yaml new file mode 100644 index 0000000..5bd15de --- /dev/null +++ b/actix-service.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + kompose.cmd: kompose convert + kompose.version: 1.18.0 () + creationTimestamp: null + labels: + io.kompose.service: actix + name: actix +spec: + ports: + - name: "3001" + port: 3001 + targetPort: 3001 + selector: + io.kompose.service: actix +status: + loadBalancer: {} diff --git a/redis-data-persistentvolumeclaim.yaml b/redis-data-persistentvolumeclaim.yaml new file mode 100644 index 0000000..f9ab537 --- /dev/null +++ b/redis-data-persistentvolumeclaim.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + creationTimestamp: null + labels: + io.kompose.service: redis-data + name: redis-data +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 100Mi +status: {} diff --git a/redis-deployment.yaml b/redis-deployment.yaml new file mode 100644 index 0000000..55fd5a9 --- /dev/null +++ b/redis-deployment.yaml @@ -0,0 +1,35 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + annotations: + kompose.cmd: kompose convert + kompose.version: 1.18.0 () + creationTimestamp: null + labels: + io.kompose.service: redis + name: redis +spec: + replicas: 1 + strategy: + type: Recreate + template: + metadata: + creationTimestamp: null + labels: + io.kompose.service: redis + spec: + containers: + - image: redis + name: redis + ports: + - containerPort: 6379 + resources: {} + volumeMounts: + - mountPath: /data + name: redis-data + restartPolicy: Always + volumes: + - name: redis-data + persistentVolumeClaim: + claimName: redis-data +status: {} diff --git a/redis-service.yaml b/redis-service.yaml new file mode 100644 index 0000000..3220ae9 --- /dev/null +++ b/redis-service.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + annotations: + kompose.cmd: kompose convert + kompose.version: 1.18.0 () + creationTimestamp: null + labels: + io.kompose.service: redis + name: redis +spec: + ports: + - name: "6379" + port: 6379 + targetPort: 6379 + selector: + io.kompose.service: redis +status: + loadBalancer: {}