Skip to content

Commit ce2f839

Browse files
author
DogLooksGood
committed
Add credential consolidator
1 parent a924182 commit ce2f839

File tree

7 files changed

+196
-178
lines changed

7 files changed

+196
-178
lines changed

encryptor/src/lib.rs

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -366,36 +366,9 @@ impl EncryptorT for Encryptor {
366366
Ok(())
367367
}
368368

369-
// fn add_public_key(&self, addr: String, raw: &NodePublicKeyRaw) -> EncryptorResult<()> {
370-
// let ec = import_ec_public(&raw.ec)?;
371-
// let rsa = import_rsa_public(&raw.rsa)?;
372-
373-
// let mut public_keys = self
374-
// .publics
375-
// .lock()
376-
// .map_err(|_| EncryptorError::AddPublicKeyError)?;
377-
378-
// public_keys.insert(addr, NodePublicKey { rsa, ec });
379-
// Ok(())
380-
// }
381-
382369
fn digest(&self, text: &[u8]) -> SecretDigest {
383370
Sha256::digest(text).to_vec()
384371
}
385-
386-
// fn export_public_key(&self, addr: Option<&str>) -> EncryptorResult<NodePublicKeyRaw> {
387-
// let publics = self
388-
// .publics
389-
// .lock()
390-
// .map_err(|_| EncryptorError::ReadPublicKeyError)?;
391-
// Ok(match addr {
392-
// Some(addr) => publics
393-
// .get(addr)
394-
// .ok_or(EncryptorError::PublicKeyNotfound)?
395-
// .try_into()?,
396-
// None => (&self.private).try_into()?,
397-
// })
398-
// }
399372
}
400373

401374
#[cfg(test)]
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
//! This component read the raw Sync event and update the encryptor to include their credentials.
2+
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
6+
use async_trait::async_trait;
7+
use race_transactor_frames::EventFrame;
8+
use race_core::credentials::Credentials;
9+
use race_core::transport::TransportT;
10+
use race_core::encryptor::EncryptorT;
11+
use race_core::types::GameAccount;
12+
use tracing::{info, error};
13+
use borsh::BorshDeserialize;
14+
use super::{common::PipelinePorts, ComponentEnv};
15+
use crate::Component;
16+
use crate::CloseReason;
17+
18+
pub async fn maybe_fetch_server_credentials(
19+
server_addr: &str,
20+
cached_server_credentials: &mut HashMap<String, Credentials>,
21+
transport: Arc<dyn TransportT>,
22+
env: &ComponentEnv,
23+
) -> Credentials {
24+
if let Some(credentials) = cached_server_credentials.get(server_addr) {
25+
return credentials.clone();
26+
} else {
27+
loop {
28+
if let Ok(Some(profile)) = transport.get_server_account(&server_addr).await {
29+
info!("{} Load server credentials: {}", env.log_prefix, server_addr);
30+
let credentials = Credentials::try_from_slice(&profile.credentials).expect("Failed to deserialize Credentials");
31+
cached_server_credentials.insert(server_addr.to_string(), credentials.clone());
32+
return credentials;
33+
} else {
34+
error!("Failed to fetch server profile for {}, will retry.", server_addr);
35+
}
36+
}
37+
}
38+
}
39+
40+
pub async fn maybe_fetch_player_credentials(
41+
player_addr: &str,
42+
cached_player_credentials: &mut HashMap<String, Credentials>,
43+
transport: Arc<dyn TransportT>,
44+
env: &ComponentEnv,
45+
) -> Credentials {
46+
if let Some(credentials) = cached_player_credentials.get(player_addr) {
47+
return credentials.clone();
48+
} else {
49+
loop {
50+
if let Ok(Some(profile)) = transport.get_player_profile(player_addr).await {
51+
info!("{} Load server credentials: {}", env.log_prefix, player_addr);
52+
let credentials = Credentials::try_from_slice(&profile.credentials).expect("Failed to deserialize Credentials");
53+
cached_player_credentials.insert(player_addr.to_string(), credentials.clone());
54+
return credentials;
55+
} else {
56+
error!("Failed to fetch player profile for {}, will retry.", player_addr);
57+
}
58+
}
59+
}
60+
}
61+
62+
pub struct CredentialConsolidatorContext {
63+
transport: Arc<dyn TransportT>,
64+
encryptor: Arc<dyn EncryptorT>,
65+
game_addr: String,
66+
}
67+
68+
pub struct CredentialConsolidator {}
69+
70+
impl CredentialConsolidator {
71+
pub fn init(
72+
transport: Arc<dyn TransportT>,
73+
encryptor: Arc<dyn EncryptorT>,
74+
game_account: &GameAccount
75+
) -> (Self, CredentialConsolidatorContext) {
76+
(
77+
Self {},
78+
CredentialConsolidatorContext {
79+
transport,
80+
encryptor,
81+
game_addr: game_account.addr.clone(),
82+
},
83+
)
84+
}
85+
}
86+
87+
#[async_trait]
88+
impl Component<PipelinePorts, CredentialConsolidatorContext> for CredentialConsolidator {
89+
fn name() -> &'static str {
90+
"Credential Consolidator"
91+
}
92+
93+
async fn run(
94+
mut ports: PipelinePorts,
95+
ctx: CredentialConsolidatorContext,
96+
env: ComponentEnv,
97+
) -> CloseReason {
98+
let CredentialConsolidatorContext {
99+
transport, encryptor, game_addr: _
100+
} = ctx;
101+
102+
let mut cached_player_credentials = HashMap::<String, Credentials>::default();
103+
let mut cached_server_credentials = HashMap::<String, Credentials>::default();
104+
105+
while let Some(event_frame) = ports.recv().await {
106+
match event_frame {
107+
EventFrame::Shutdown => {
108+
return CloseReason::Complete;
109+
}
110+
EventFrame::Sync {
111+
new_players,
112+
new_servers,
113+
new_deposits,
114+
access_version,
115+
transactor_addr,
116+
} => {
117+
for p in new_players.iter() {
118+
let credentials = maybe_fetch_player_credentials(&p.addr, &mut cached_player_credentials, transport.clone(), &env).await;
119+
if let Err(e) = encryptor.import_credentials(&p.addr, credentials) {
120+
return CloseReason::Fault(e.into());
121+
}
122+
}
123+
124+
for s in new_servers.iter() {
125+
let credentials = maybe_fetch_server_credentials(&s.addr, &mut cached_server_credentials, transport.clone(), &env).await;
126+
if let Err(e) = encryptor.import_credentials(&s.addr, credentials) {
127+
return CloseReason::Fault(e.into());
128+
}
129+
}
130+
131+
ports.send(EventFrame::SyncWithCredentials {
132+
new_players,
133+
new_servers,
134+
new_deposits,
135+
access_version,
136+
transactor_addr,
137+
}).await;
138+
}
139+
_ => {},
140+
}
141+
}
142+
143+
CloseReason::Complete
144+
}
145+
}

transactor-components/src/event_loop.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl Component<PipelinePorts, EventLoopContext> for EventLoop {
147147
}
148148
}
149149

150-
EventFrame::Sync {
150+
EventFrame::SyncWithCredentials {
151151
new_players,
152152
new_servers,
153153
new_deposits,

transactor-components/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod subscriber;
88
mod synchronizer;
99
mod refunder;
1010
mod voter;
11+
mod credential_consolidator;
1112
mod wrapped_client;
1213
mod wrapped_transport;
1314
mod wrapped_storage;
@@ -28,6 +29,7 @@ pub use subscriber::Subscriber;
2829
pub use synchronizer::GameSynchronizer;
2930
#[allow(unused)]
3031
pub use recorder::Recorder;
32+
pub use credential_consolidator::CredentialConsolidator;
3133
pub use voter::Voter;
3234
pub use wrapped_client::WrappedClient;
3335
pub use wrapped_transport::WrappedTransport;

transactor-components/src/subscriber.rs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,25 +89,21 @@ async fn handle_frame(frame: BroadcastFrame, ports: &mut PipelinePorts, env: &Co
8989

9090
new_deposits.retain(|d| d.status == DepositStatus::Pending);
9191

92-
// XXX parse to the version with credentials
93-
// if let Err(e) = ports
94-
// .try_send(EventFrame::Sync {
95-
// new_players,
96-
// new_servers,
97-
// new_deposits,
98-
// transactor_addr,
99-
// access_version,
100-
// })
101-
// .await
102-
// {
103-
// error!("{} Send update node error: {}", env.log_prefix, e);
104-
// Some(CloseReason::Complete)
105-
// } else {
106-
// None
107-
// }
108-
109-
// REMOVE THIS
110-
None
92+
if let Err(e) = ports
93+
.try_send(EventFrame::Sync {
94+
new_players,
95+
new_servers,
96+
new_deposits,
97+
transactor_addr,
98+
access_version,
99+
})
100+
.await
101+
{
102+
error!("{} Send update node error: {}", env.log_prefix, e);
103+
Some(CloseReason::Complete)
104+
} else {
105+
None
106+
}
111107
}
112108

113109
BroadcastFrame::Message { .. } => {

transactor-components/src/synchronizer.rs

Lines changed: 9 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,8 @@ use async_trait::async_trait;
44
use race_core::error::Error;
55
use tokio::select;
66
use tokio_stream::StreamExt;
7-
use std::collections::HashMap;
8-
use borsh::BorshDeserialize;
9-
use race_core::credentials::Credentials;
107

11-
use race_transactor_frames::{EventFrame, PlayerJoinSync, ServerJoinSync};
8+
use race_transactor_frames::EventFrame;
129
use race_core::{
1310
transport::TransportT,
1411
types::{GameAccount, PlayerDeposit, PlayerJoin, ServerJoin},
@@ -25,70 +22,7 @@ pub struct GameSynchronizerContext {
2522
game_addr: String,
2623
}
2724

28-
async fn fetch_server_with_credentials(
29-
server_join: ServerJoin,
30-
cached_server_credentials: &mut HashMap<String, Credentials>,
31-
transport: Arc<dyn TransportT>,
32-
) -> ServerJoinSync {
33-
if let Some(credentials) = cached_server_credentials.get(&server_join.addr) {
34-
return ServerJoinSync {
35-
addr: server_join.addr,
36-
endpoint: server_join.endpoint,
37-
access_version: server_join.access_version,
38-
credentials: credentials.clone(),
39-
}
40-
} else {
41-
loop {
42-
if let Ok(Some(profile)) = transport.get_player_profile(&server_join.addr).await {
43-
let credentials = Credentials::try_from_slice(&profile.credentials).expect("Failed to deserialize Credentials");
44-
cached_server_credentials.insert(server_join.addr.clone(), credentials.clone());
45-
return ServerJoinSync {
46-
addr: server_join.addr,
47-
endpoint: server_join.endpoint,
48-
access_version: server_join.access_version,
49-
credentials,
50-
}
51-
} else {
52-
error!("Failed to fetch server profile for {}, will retry.", server_join.addr);
53-
}
54-
}
55-
}
56-
}
57-
58-
async fn fetch_player_with_credentials(
59-
player_join: PlayerJoin,
60-
cached_player_credentials: &mut HashMap<String, Credentials>,
61-
transport: Arc<dyn TransportT>,
62-
) -> PlayerJoinSync {
63-
if let Some(credentials) = cached_player_credentials.get(&player_join.addr) {
64-
return PlayerJoinSync {
65-
addr: player_join.addr,
66-
position: player_join.position,
67-
access_version: player_join.access_version,
68-
credentials: credentials.clone(),
69-
}
70-
} else {
71-
loop {
72-
if let Ok(Some(profile)) = transport.get_player_profile(&player_join.addr).await {
73-
let credentials = Credentials::try_from_slice(&profile.credentials).expect("Failed to deserialize Credentials");
74-
cached_player_credentials.insert(player_join.addr.clone(), credentials.clone());
75-
return PlayerJoinSync {
76-
addr: player_join.addr,
77-
position: player_join.position,
78-
access_version: player_join.access_version,
79-
credentials,
80-
}
81-
} else {
82-
error!("Failed to fetch player profile for {}, will retry.", player_join.addr);
83-
}
84-
}
85-
}
86-
}
87-
8825
async fn maybe_send_sync(
89-
transport: Arc<dyn TransportT>,
90-
cached_player_credentials: &mut HashMap<String, Credentials>,
91-
cached_server_credentials: &mut HashMap<String, Credentials>,
9226
prev_access_version: u64,
9327
game_account: GameAccount,
9428
ports: &mut PipelinePorts,
@@ -113,22 +47,15 @@ async fn maybe_send_sync(
11347
env.log_prefix, game_account.access_version, game_account.settle_version,
11448
);
11549

116-
let mut new_players: Vec<PlayerJoinSync> = Vec::with_capacity(players.len());
117-
let mut new_servers: Vec<ServerJoinSync> = Vec::with_capacity(servers.len());
118-
119-
for p in players {
120-
if p.access_version > prev_access_version { // We care only new players
121-
let player_sync = fetch_player_with_credentials(p, cached_player_credentials, transport.clone()).await;
122-
new_players.push(player_sync);
123-
}
124-
}
50+
let new_players: Vec<PlayerJoin> = players
51+
.into_iter()
52+
.filter(|p| p.access_version > prev_access_version)
53+
.collect();
12554

126-
for s in servers {
127-
if s.access_version > prev_access_version { // We care only new servers
128-
let server_sync = fetch_server_with_credentials(s, cached_server_credentials, transport.clone()).await;
129-
new_servers.push(server_sync);
130-
}
131-
}
55+
let new_servers: Vec<ServerJoin> = servers
56+
.into_iter()
57+
.filter(|s| s.access_version > prev_access_version)
58+
.collect();
13259

13360
let new_deposits: Vec<PlayerDeposit> = deposits
13461
.into_iter()
@@ -216,9 +143,6 @@ impl Component<PipelinePorts, GameSynchronizerContext> for GameSynchronizer {
216143
ctx: GameSynchronizerContext,
217144
env: ComponentEnv,
218145
) -> CloseReason {
219-
let mut cached_player_credentials = HashMap::<String, Credentials>::default();
220-
let mut cached_server_credentials = HashMap::<String, Credentials>::default();
221-
222146
let mut prev_access_version = ctx.access_version;
223147

224148
let mut sub = match ctx.transport.subscribe_game_account(&ctx.game_addr).await {
@@ -253,9 +177,6 @@ impl Component<PipelinePorts, GameSynchronizerContext> for GameSynchronizer {
253177
match sub_item {
254178
Some(Ok(game_account)) => {
255179
let (new_access_version, close_reason) = maybe_send_sync(
256-
ctx.transport.clone(),
257-
&mut cached_player_credentials,
258-
&mut cached_server_credentials,
259180
prev_access_version,
260181
game_account,
261182
&mut ports,

0 commit comments

Comments
 (0)