Skip to content

Commit 175ebce

Browse files
lutterJannis Pohlmann
authored andcommitted
runtime/wasm (ipfs_map): log progress every 15s on a dedicated logger
1 parent b28e294 commit 175ebce

File tree

2 files changed

+36
-10
lines changed

2 files changed

+36
-10
lines changed

graph/src/components/link_resolver.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@ const MAX_IPFS_MAP_FILE_SIZE_ENV_VAR: &str = "GRAPH_MAX_IPFS_MAP_FILE_SIZE";
1717
// 256MB
1818
const MAX_IPFS_MAP_FILE_SIZE: u64 = 256 * 1024 * 1024;
1919

20-
type ValueStream = Box<Stream<Item = Value, Error = failure::Error> + Send + 'static>;
20+
/// The values that `json_stream` returns. The struct contains the deserialized
21+
/// JSON value from the input stream, together with the line number from which
22+
/// the value was read.
23+
pub struct StreamValue {
24+
pub value: Value,
25+
pub line: usize,
26+
}
27+
28+
type ValueStream = Box<Stream<Item = StreamValue, Error = failure::Error> + Send + 'static>;
2129

2230
/// Resolves links to subgraph manifests and resources referenced by them.
2331
pub trait LinkResolver: Send + Sync + 'static {
@@ -90,7 +98,7 @@ impl LinkResolver for ipfs_api::IpfsClient {
9098
/// Supports links of the form `/ipfs/ipfs_hash` or just `ipfs_hash`.
9199
fn cat(&self, link: &Link) -> Box<Future<Item = Vec<u8>, Error = failure::Error> + Send> {
92100
// Grab env vars.
93-
let max_file_bytes = read_u64_env_var(MAX_IPFS_FILE_BYTES_ENV_VAR);
101+
let max_file_bytes = read_u64_from_env(MAX_IPFS_FILE_BYTES_ENV_VAR);
94102

95103
// Discard the `/ipfs/` prefix (if present) to get the hash.
96104
let path = link.link.trim_start_matches("/ipfs/").to_owned();
@@ -120,16 +128,19 @@ impl LinkResolver for ipfs_api::IpfsClient {
120128
// to the line number in the overall file
121129
let mut count = 0;
122130

123-
let stream: ValueStream =
124-
Box::new(poll_fn(move || -> Poll<Option<Value>, failure::Error> {
131+
let stream: ValueStream = Box::new(poll_fn(
132+
move || -> Poll<Option<StreamValue>, failure::Error> {
125133
loop {
126134
if let Some(offset) = buf.iter().position(|b| *b == b'\n') {
127135
let line_bytes = buf.split_to(offset + 1);
128136
count += 1;
129137
if line_bytes.len() > 1 {
130138
let line = std::str::from_utf8(&line_bytes)?;
131139
let res = match serde_json::from_str::<Value>(line) {
132-
Ok(v) => Ok(Async::Ready(Some(v))),
140+
Ok(v) => Ok(Async::Ready(Some(StreamValue {
141+
value: v,
142+
line: count,
143+
}))),
133144
Err(e) => {
134145
// Adjust the line number in the serde error. This
135146
// is fun because we can only get at the full error
@@ -162,7 +173,8 @@ impl LinkResolver for ipfs_api::IpfsClient {
162173
}
163174
}
164175
}
165-
}));
176+
},
177+
));
166178
// Check the size of the file
167179
let max_file_bytes =
168180
read_u64_from_env(MAX_IPFS_MAP_FILE_SIZE_ENV_VAR).unwrap_or(MAX_IPFS_MAP_FILE_SIZE);
@@ -209,7 +221,7 @@ mod tests {
209221
let link = runtime.block_on(client.add(text.as_bytes())).unwrap().hash;
210222
runtime.block_on(
211223
LinkResolver::json_stream(&client, &Link { link: link.clone() })
212-
.and_then(|stream| stream.collect()),
224+
.and_then(|stream| stream.map(|sv| sv.value).collect()),
213225
)
214226
}
215227

runtime/wasm/src/host_exports.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,17 +369,31 @@ where
369369
&*callback, &link
370370
);
371371

372+
let start = Instant::now();
373+
let mut last_log = Instant::now();
374+
let logger = ctx.logger.new(o!("ipfs_map" => link.clone()));
372375
let operations = self.block_on(
373376
self.link_resolver
374377
.json_stream(&Link { link })
375-
.and_then(|stream| {
378+
.and_then(move |stream| {
376379
stream
377-
.and_then(move |v| {
380+
.and_then(move |sv| {
378381
let module = WasmiModule::from_valid_module_with_ctx(
379382
valid_module.clone(),
380383
ctx.clone(),
381384
)?;
382-
module.handle_json_callback(&*callback, &v)
385+
let result = module.handle_json_callback(&*callback, &sv.value);
386+
// Log progress every 15s
387+
if last_log.elapsed() > Duration::from_secs(15) {
388+
debug!(
389+
logger,
390+
"Processed {} lines in {}s so far",
391+
sv.line,
392+
start.elapsed().as_secs()
393+
);
394+
last_log = Instant::now();
395+
}
396+
result
383397
})
384398
.collect()
385399
})

0 commit comments

Comments
 (0)