Skip to content

Commit e330197

Browse files
authored
Merge pull request #241 from AdExNetwork/issue-105-validator_worker-logging
Issue #105 validator worker logging
2 parents 12908f2 + f8824e1 commit e330197

File tree

11 files changed

+65
-30
lines changed

11 files changed

+65
-30
lines changed

.travis.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
language: rust
22
rust:
33
- stable
4+
os: linux
45
# add nodejs for ganche-cli
56
node_js: "12.12.0"
67
# Need to cache the whole `.cargo` directory to keep .crates.toml for
@@ -13,15 +14,14 @@ env:
1314
- CARGO_MAKE_RUN_CHECK_FORMAT="true"
1415
- CARGO_MAKE_RUN_CLIPPY="true"
1516
services:
16-
- redis-server
17-
matrix:
18-
fast_finish: true
17+
- redis
1918

2019
stages:
2120
- test
2221
- deploy
2322

2423
jobs:
24+
fast_finish: true
2525
include:
2626
- stage: test
2727
script:

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Components:
1414
## Local & Testing setup
1515

1616
#### Linux
17+
- `build-essentials` is required to build the project (error: `linker ``cc`` not found`)
1718
- The crate `openssl-sys` requires `libssl-dev` and `pkg-config` for Ubuntu.
1819

1920
### Run Postgres

primitives/src/channel.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ impl FromHex for ChannelId {
4545
}
4646
}
4747

48+
impl fmt::Display for ChannelId {
49+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
50+
write!(f, "0x{}", hex::encode(self.0))
51+
}
52+
}
53+
4854
#[derive(Serialize, Deserialize, Debug, Clone)]
4955
#[serde(rename_all = "camelCase")]
5056
pub struct Channel {

sentry/src/main.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,11 @@ use hyper::service::{make_service_fn, service_fn};
88
use hyper::{Error, Server};
99
use primitives::adapter::{Adapter, DummyAdapterOptions, KeystoreOptions};
1010
use primitives::config::configuration;
11-
use primitives::util::logging::{Async, PrefixedCompactFormat, TermDecorator};
1211
use primitives::util::tests::prep_db::{AUTH, IDS};
1312
use primitives::ValidatorId;
1413
use sentry::db::{postgres_connection, redis_connection, setup_migrations};
1514
use sentry::Application;
16-
use slog::{error, info, o, Drain, Logger};
15+
use slog::{error, info, Logger};
1716
use std::convert::TryFrom;
1817

1918
const DEFAULT_PORT: u16 = 8005;
@@ -146,11 +145,14 @@ async fn run<A: Adapter + 'static>(app: Application<A>, _clustered: bool, port:
146145
let server = Server::bind(&addr).serve(make_service);
147146

148147
if let Err(e) = server.await {
149-
error!(&logger, "server error: {}", e);
148+
error!(&logger, "server error: {}", e; "main" => "run");
150149
}
151150
}
152151

153152
fn logger() -> Logger {
153+
use primitives::util::logging::{Async, PrefixedCompactFormat, TermDecorator};
154+
use slog::{o, Drain};
155+
154156
let decorator = TermDecorator::new().build();
155157
let drain = PrefixedCompactFormat::new("sentry", decorator).fuse();
156158
let drain = Async::new(drain).build().fuse();

sentry/src/routes/analytics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,6 @@ async fn cache(
111111
.query_async::<_, ()>(&mut redis.clone())
112112
.await
113113
{
114-
error!(&logger, "server error: {}", err);
114+
error!(&logger, "Server error: {}", err; "module" => "analytics-cache");
115115
}
116116
}

validator_worker/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ num-traits = "0.2.0"
1818
# To/From Hex
1919
hex = "0.3.2"
2020
byteorder = "1.3"
21+
# Logging
22+
slog = { version = "^2.5.2" , features = ["max_level_trace"] }
2123
# Futures
2224
futures = "0.3"
2325
# Concurrency

validator_worker/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ mod test {
4747
use primitives::config::configuration;
4848
use primitives::util::tests::prep_db::{AUTH, DUMMY_CHANNEL, IDS};
4949
use primitives::{BalancesMap, Channel};
50+
use slog::{o, Discard, Logger};
5051

5152
fn setup_iface(channel: &Channel) -> SentryApi<DummyAdapter> {
5253
let adapter_options = DummyAdapterOptions {
@@ -56,8 +57,9 @@ mod test {
5657
};
5758
let config = configuration("development", None).expect("Dev config should be available");
5859
let dummy_adapter = DummyAdapter::init(adapter_options, &config);
60+
let logger = Logger::root(Discard, o!());
5961

60-
SentryApi::init(dummy_adapter, &channel, &config, false).expect("should succeed")
62+
SentryApi::init(dummy_adapter, &channel, &config, logger).expect("should succeed")
6163
}
6264

6365
#[test]

validator_worker/src/main.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use primitives::adapter::{Adapter, DummyAdapterOptions, KeystoreOptions};
1515
use primitives::config::{configuration, Config};
1616
use primitives::util::tests::prep_db::{AUTH, IDS};
1717
use primitives::{Channel, SpecValidator, ValidatorId};
18+
use slog::{error, Logger};
1819
use validator_worker::error::ValidatorWorker as ValidatorWorkerError;
1920
use validator_worker::{all_channels, follower, leader, SentryApi};
2021

@@ -105,12 +106,14 @@ fn main() -> Result<(), Box<dyn Error>> {
105106
_ => panic!("We don't have any other adapters implemented yet!"),
106107
};
107108

109+
let logger = logger();
110+
108111
match adapter {
109112
AdapterTypes::EthereumAdapter(ethadapter) => {
110-
run(is_single_tick, &sentry_url, &config, *ethadapter)
113+
run(is_single_tick, &sentry_url, &config, *ethadapter, &logger)
111114
}
112115
AdapterTypes::DummyAdapter(dummyadapter) => {
113-
run(is_single_tick, &sentry_url, &config, *dummyadapter)
116+
run(is_single_tick, &sentry_url, &config, *dummyadapter, &logger)
114117
}
115118
}
116119
}
@@ -120,6 +123,7 @@ fn run<A: Adapter + 'static>(
120123
sentry_url: &str,
121124
config: &Config,
122125
mut adapter: A,
126+
logger: &Logger,
123127
) -> Result<(), Box<dyn Error>> {
124128
// unlock adapter
125129
adapter.unlock()?;
@@ -134,29 +138,29 @@ fn run<A: Adapter + 'static>(
134138
let mut rt = Runtime::new()?;
135139

136140
if is_single_tick {
137-
rt.block_on(iterate_channels(args));
141+
rt.block_on(iterate_channels(args, &logger));
138142
} else {
139-
rt.block_on(infinite(args));
143+
rt.block_on(infinite(args, &logger));
140144
}
141145

142146
Ok(())
143147
}
144148

145-
async fn infinite<A: Adapter + 'static>(args: Args<A>) {
149+
async fn infinite<A: Adapter + 'static>(args: Args<A>, logger: &Logger) {
146150
loop {
147151
let arg = args.clone();
148152
let delay_future = delay_for(Duration::from_secs(arg.config.wait_time as u64));
149-
let _result = join(iterate_channels(arg), delay_future).await;
153+
let _result = join(iterate_channels(arg, logger), delay_future).await;
150154
}
151155
}
152156

153-
async fn iterate_channels<A: Adapter + 'static>(args: Args<A>) {
157+
async fn iterate_channels<A: Adapter + 'static>(args: Args<A>, logger: &Logger) {
154158
let result = all_channels(&args.sentry_url, args.adapter.whoami()).await;
155159

156160
let channels = match result {
157161
Ok(channels) => channels,
158162
Err(e) => {
159-
eprintln!("Failed to get channels {}", e);
163+
error!(logger, "Failed to get channels {}", &e; "main" => "iterate_channels");
160164
return;
161165
}
162166
};
@@ -166,29 +170,28 @@ async fn iterate_channels<A: Adapter + 'static>(args: Args<A>) {
166170
let tick = try_join_all(
167171
channels
168172
.into_iter()
169-
.map(|channel| validator_tick(args.adapter.clone(), channel, &args.config)),
173+
.map(|channel| validator_tick(args.adapter.clone(), channel, &args.config, logger)),
170174
)
171175
.await;
172176

173177
if let Err(e) = tick {
174-
eprintln!("An occurred while processing channels {}", e);
178+
error!(logger, "An occurred while processing channels {}", &e; "main" => "iterate_channels");
175179
}
176180

177181
if channels_size >= args.config.max_channels as usize {
178-
eprintln!(
179-
"WARNING: channel limit cfg.MAX_CHANNELS={} reached",
180-
args.config.max_channels
181-
)
182+
error!(logger, "WARNING: channel limit cfg.MAX_CHANNELS={} reached", &args.config.max_channels; "main" => "iterate_channels");
182183
}
183184
}
184185

185186
async fn validator_tick<A: Adapter + 'static>(
186187
adapter: A,
187188
channel: Channel,
188189
config: &Config,
190+
logger: &Logger,
189191
) -> Result<(), ValidatorWorkerError> {
190192
let whoami = adapter.whoami().clone();
191-
let sentry = SentryApi::init(adapter, &channel, &config, true)?;
193+
// Cloning the `Logger` is cheap, see documentation for more info
194+
let sentry = SentryApi::init(adapter, &channel, &config, logger.clone())?;
192195
let duration = Duration::from_secs(config.validator_tick_timeout as u64);
193196

194197
match channel.spec.validators.find(&whoami) {
@@ -210,3 +213,14 @@ async fn validator_tick<A: Adapter + 'static>(
210213
};
211214
Ok(())
212215
}
216+
217+
fn logger() -> Logger {
218+
use primitives::util::logging::{Async, PrefixedCompactFormat, TermDecorator};
219+
use slog::{o, Drain};
220+
221+
let decorator = TermDecorator::new().build();
222+
let drain = PrefixedCompactFormat::new("validator_worker", decorator).fuse();
223+
let drain = Async::new(drain).build().fuse();
224+
225+
Logger::root(drain, o!())
226+
}

validator_worker/src/producer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use primitives::BalancesMap;
88

99
use crate::core::events::merge_aggrs;
1010
use crate::sentry_interface::SentryApi;
11+
use slog::info;
1112

1213
pub type Result = std::result::Result<(BalancesMap, Option<Accounting>), Box<dyn Error>>;
1314

@@ -28,7 +29,12 @@ pub async fn tick<A: Adapter + 'static>(iface: &SentryApi<A>) -> Result {
2829
.await?;
2930

3031
if !aggrs.events.is_empty() {
31-
// TODO: Log the merge
32+
info!(
33+
iface.logger,
34+
"channel {}: processing {} event aggregates",
35+
iface.channel.id,
36+
aggrs.events.len()
37+
);
3238
let (balances, new_accounting) = merge_aggrs(&accounting, &aggrs.events, &iface.channel)?;
3339

3440
let message_types = MessageTypes::Accounting(new_accounting.clone());

0 commit comments

Comments
 (0)