Skip to content

Commit 2418aa1

Browse files
committed
fix(core, graph): minor adjustments after rebase
1 parent 353f6e2 commit 2418aa1

File tree

11 files changed

+362
-160
lines changed

11 files changed

+362
-160
lines changed

core/src/amp_subgraph/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ where
146146
metrics,
147147
);
148148

149-
let runner_result = runner::new_runner(runner_context)(cancel_token).await;
149+
let runner_result = runner::new_runner(runner_context, cancel_token).await;
150150

151151
match manager.subgraph_store.stop_subgraph(&deployment).await {
152152
Ok(()) => {

core/src/amp_subgraph/monitor.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
//!
33
//! # Terminology used in this module
44
//!
5-
//! `active subgraph` - A subgraph that was started and is still tracked.
5+
//! `active subgraph` - A subgraph that was started and is still kept in memory in the list of started subgraphs.
66
//! `running subgraph` - A subgraph that has an instance that is making progress or stopping.
7-
//! `subgraph instance` - A background process that executes the subgraph runner future.
7+
//! `subgraph instance` - A background task that executes the subgraph runner future.
88
99
use std::{
1010
collections::{hash_map::Entry, HashMap},
@@ -65,7 +65,7 @@ pub(super) struct Monitor {
6565
/// The channel that is used to send subgraph commands.
6666
///
6767
/// Every subgraph start and stop request results in a command that is sent to the
68-
/// background process that manages the subgraph instances.
68+
/// background task that manages the subgraph instances.
6969
command_tx: mpsc::UnboundedSender<Command>,
7070

7171
/// When a subgraph starts it is assigned a sequential ID.
@@ -87,10 +87,10 @@ pub(super) struct Monitor {
8787
impl Monitor {
8888
/// Creates a new subgraph monitor.
8989
///
90-
/// Spawns a background process that manages the subgraph start and stop requests.
90+
/// Spawns a background task that manages the subgraph start and stop requests.
9191
///
9292
/// A new cancel token is derived from the `cancel_token` and only the derived token is used by the
93-
/// subgraph monitor and its background process.
93+
/// subgraph monitor and its background task.
9494
pub(super) fn new(logger_factory: &LoggerFactory, cancel_token: &CancellationToken) -> Self {
9595
let logger = logger_factory.component_logger("AmpSubgraphMonitor", None);
9696
let logger_factory = Arc::new(logger_factory.with_parent(logger));
@@ -394,9 +394,9 @@ impl Monitor {
394394
}
395395
}
396396

397-
/// Spawns a background process that executes the subgraph runner future.
397+
/// Spawns a background task that executes the subgraph runner future.
398398
///
399-
/// An additional background process is spawned to handle the graceful shutdown of the subgraph runner,
399+
/// An additional background task is spawned to handle the graceful shutdown of the subgraph runner,
400400
/// and to ensure correct behaviour even if the subgraph runner panics.
401401
fn start_subgraph(
402402
logger: Logger,
@@ -502,7 +502,7 @@ impl Drop for Monitor {
502502
}
503503
}
504504

505-
/// Represents a background process that executes the subgraph runner future.
505+
/// Represents a background task that executes the subgraph runner future.
506506
struct SubgraphInstance {
507507
id: u32,
508508
handle: JoinHandle<()>,

core/src/amp_subgraph/runner/compat.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
//! This is a temporary compatibility module until the graph-node is fully migrated to `alloy`.
2+
13
use alloy::primitives::{BlockHash, BlockNumber};
24
use chrono::{DateTime, Utc};
35

core/src/amp_subgraph/runner/data_processing.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,14 @@ async fn process_record_batch_group<AC>(
103103
return Ok(entity_cache);
104104
}
105105

106-
let block_timestamp = decode_block_timestamp(&record_batches)
107-
.map_err(|e| e.context("failed to decode block timestamp"))?;
106+
let block_timestamp = if cx.manifest.schema.has_aggregations() {
107+
decode_block_timestamp(&record_batches)
108+
.map_err(|e| e.context("failed to decode block timestamp"))?
109+
} else {
110+
// TODO: Block timestamp is only required for subgraph aggregations.
111+
// Make it optional at the store level.
112+
DateTime::<Utc>::MIN_UTC
113+
};
108114

109115
for record_batch in record_batches {
110116
let StreamRecordBatch {
@@ -231,6 +237,14 @@ async fn process_record_batch<AC>(
231237
Ok(())
232238
}
233239

240+
/// Decodes the block timestamp from the first matching column in `record_batches`.
241+
///
242+
/// Iterates through the provided record batches and returns the timestamp from
243+
/// the first batch that contains a valid block timestamp column.
244+
///
245+
/// # Preconditions
246+
///
247+
/// All entries in `record_batches` must belong to the same record batch group.
234248
fn decode_block_timestamp(record_batches: &[StreamRecordBatch]) -> Result<DateTime<Utc>, Error> {
235249
let mut last_error: Option<Error> = None;
236250

core/src/amp_subgraph/runner/mod.rs

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ mod reorg_handler;
99
use std::time::{Duration, Instant};
1010

1111
use anyhow::Result;
12-
use futures::{future::BoxFuture, StreamExt};
12+
use futures::StreamExt;
1313
use graph::{
1414
amp::Client, cheap_clone::CheapClone, components::store::EntityCache,
1515
data::subgraph::schema::SubgraphError,
@@ -24,48 +24,45 @@ use self::{
2424

2525
pub(super) use self::context::Context;
2626

27-
pub(super) fn new_runner<AC>(
27+
pub(super) async fn new_runner<AC>(
2828
mut cx: Context<AC>,
29-
) -> Box<dyn FnOnce(CancellationToken) -> BoxFuture<'static, Result<()>> + Send + 'static>
29+
cancel_token: CancellationToken,
30+
) -> Result<()>
3031
where
3132
AC: Client + Send + Sync + 'static,
3233
{
33-
Box::new(move |cancel_token| {
34-
Box::pin(async move {
35-
let indexing_duration_handle = tokio::spawn({
36-
let mut instant = Instant::now();
37-
let indexing_duration = cx.metrics.indexing_duration.clone();
38-
39-
async move {
40-
loop {
41-
tokio::time::sleep(Duration::from_secs(1)).await;
42-
43-
let prev_instant = std::mem::replace(&mut instant, Instant::now());
44-
indexing_duration.record(prev_instant.elapsed());
45-
}
46-
}
47-
});
48-
49-
let result = cancel_token
50-
.run_until_cancelled(run_indexing_with_retries(&mut cx))
51-
.await;
34+
let indexing_duration_handle = tokio::spawn({
35+
let mut instant = Instant::now();
36+
let indexing_duration = cx.metrics.indexing_duration.clone();
5237

53-
indexing_duration_handle.abort();
38+
async move {
39+
loop {
40+
tokio::time::sleep(Duration::from_secs(1)).await;
5441

55-
match result {
56-
Some(result) => result?,
57-
None => {
58-
debug!(cx.logger, "Processed cancel signal");
59-
}
42+
let prev_instant = std::mem::replace(&mut instant, Instant::now());
43+
indexing_duration.record(prev_instant.elapsed());
6044
}
45+
}
46+
});
47+
48+
let result = cancel_token
49+
.run_until_cancelled(run_indexing_with_retries(&mut cx))
50+
.await;
51+
52+
indexing_duration_handle.abort();
53+
54+
match result {
55+
Some(result) => result?,
56+
None => {
57+
debug!(cx.logger, "Processed cancel signal");
58+
}
59+
}
6160

62-
cx.metrics.deployment_status.stopped();
61+
cx.metrics.deployment_status.stopped();
6362

64-
debug!(cx.logger, "Waiting for the store to finish processing");
65-
cx.store.flush().await?;
66-
Ok(())
67-
})
68-
})
63+
debug!(cx.logger, "Waiting for the store to finish processing");
64+
cx.store.flush().await?;
65+
Ok(())
6966
}
7067

7168
async fn run_indexing<AC>(cx: &mut Context<AC>) -> Result<(), Error>

0 commit comments

Comments
 (0)