Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 2 additions & 126 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,132 +19,8 @@ aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos

# Get started

We’ve created a [Quickstart Guide to Aptos Indexer SDK](https://github.com/aptos-labs/aptos-indexer-processor-example) which gets you setup and running an events processor that indexes events on the Aptos blockchain.
We’ve created a [Quickstart Guide to Aptos Indexer SDK](https://aptos.dev/build/indexer/indexer-sdk/quickstart) which gets you setup and running an events processor that indexes events on the Aptos blockchain.

# Documentation

## Creating a step

To create a step in the SDK, implement these traits:

1. **Processable**

```rust
#[async_trait]
impl Processable for MyExtractorStep {
type Input = Transaction;
type Output = ExtractedDataModel;
type RunType = AsyncRunType;

// Processes a batch of input items and returns a batch of output items.
async fn process(
&mut self,
input: TransactionContext<Transaction>,
) -> Result<Option<TransactionContext<ExtractedDataModel>>, ProcessorError> {
let extracted_data = ...
// Extract data from items.data

Ok(Some(TransactionContext {
data: extracted_data,
start_version: input.start_version,
end_version: input.end_version,
start_transaction_timestamp: input.start_transaction_timestamp,
end_transaction_timestamp: input.end_transaction_timestamp,
total_size_in_bytes: input.total_size_in_bytes,
}))
}
}
```

2. **NamedStep**

```rust
impl NamedStep for MyExtractorStep {
fn name(&self) -> String {
"MyExtractorStep".to_string()
}
}
```

3. Either **AsyncStep** or **PollableAsyncStep**, which defines how the step will be run in the processor.
1. The most basic step is an `AsyncStep`, which processes a batch of input items and returns a batch of output items.

```rust
#[async_trait]
impl Processable for MyExtractorStep {
...
type RunType = AsyncRunType;
...
}

impl AsyncStep for MyExtractorStep {}
```

2. A `PollableAsyncStep` does the same as `AsyncStep`, but it also periodically polls its internal state and returns a batch of output items if available.

```rust
#[async_trait]
impl<T> Processable for MyPollStep<T>
where
Self: Sized + Send + 'static,
T: Send + 'static,
{
...
type RunType = PollableAsyncRunType;
...
}

#[async_trait]
impl<T: Send + 'static> PollableAsyncStep for MyPollStep<T>
where
Self: Sized + Send + Sync + 'static,
T: Send + 'static,
{
/// Returns the duration between polls
fn poll_interval(&self) -> std::time::Duration {
// Define duration
}

/// Polls the internal state and returns a batch of output items if available.
async fn poll(&mut self) -> Result<Option<Vec<TransactionContext<T>>>, ProcessorError> {
// Define polling logic
}
}
```


## Common steps

The SDK provides several common steps to use in your processor.

1. `TransactionStreamStep` provides a stream of Aptos transactions to the processor
2. `TimedBufferStep` buffers a batch of items and periodically polls to release the items to the next step

## Connecting steps

When `ProcessorBuilder` connects two steps, a channel is created linking the two steps and the output of the first step becomes the input of the next step.

```rust
let (pb, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
first_step.into_runnable_step(),
)
.connect_to(second_step.into_runnable_step(), channel_size)
.connect_to(third_step.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);
```

## Adding a new processor

1. Use [aptos-indexer-processor-example](https://github.com/aptos-labs/aptos-indexer-processor-example) as a starting point
2. Add the new processor to [ProcessorConfig](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/processor_config.rs#L34) and [Processor](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/processor_config.rs#L58)
3. Add the processor to [RunnableConfig](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/indexer_processor_config.rs#L25)

## Running a processor

To run the processor, we recommend using the example in [aptos-indexer-processor-example](https://github.com/aptos-labs/aptos-indexer-processor-example) and following this [configuration guide](https://github.com/aptos-labs/aptos-indexer-processor-example?tab=readme-ov-file#configuring-your-processor).

## Advanced features (experimental)

1. Fanout + ArcifyStep
2. Fan in
Full documentation can be found [here](https://aptos.dev/build/indexer/indexer-sdk/documentation)

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ where
}
}

async fn run_processor<F, Fut>(
pub async fn run_processor<F, Fut>(
processor_name: String,
transaction_stream_config: TransactionStreamConfig,
postgres_config: PostgresConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod basic_processor_function;
pub mod basic_processor_step;

pub use basic_processor_function::process;
pub use basic_processor_function::{process, run_processor};
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn clean_data_for_db<T: serde::Serialize + for<'de> serde::Deserialize<'de>>
}
}

fn establish_connection(database_url: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
fn establish_connection(database_url: &str) -> BoxFuture<'_, ConnectionResult<AsyncPgConnection>> {
use native_tls::{Certificate, TlsConnector};
use postgres_native_tls::MakeTlsConnector;

Expand Down
154 changes: 76 additions & 78 deletions aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,89 +114,87 @@ where
let poll_step_name = step_name.clone();
let poll_output_sender = output_sender.clone();
let mut polling_task = tokio::spawn(async move {
let mut last_poll = tokio::time::Instant::now();
let poll_duration = poll_step.lock().await.poll_interval();

while poll_step.lock().await.should_continue_polling().await {
// It's possible that the channel always has items, so we need to ensure we call `poll` manually if we need to
if last_poll.elapsed() >= poll_duration {
let polling_duration_for_logging = Instant::now();
let result = match poll_step.lock().await.poll().await {
Ok(result) => result,
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to poll"
);
break;
},
};
match StepMetricsBuilder::default()
.labels(StepMetricLabels {
step_name: poll_step_name.clone(),
})
.polling_duration_in_secs(
polling_duration_for_logging.elapsed().as_secs_f64(),
)
.build()
{
Ok(mut metrics) => metrics.log_metrics(),
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to log metrics"
);
break;
},
}
if let Some(outputs_with_context) = result {
for output_with_context in outputs_with_context {
match StepMetricsBuilder::default()
.labels(StepMetricLabels {
step_name: poll_step_name.clone(),
})
.latest_polled_version(output_with_context.metadata.end_version)
.latest_polled_transaction_timestamp(
output_with_context.get_start_transaction_timestamp_unix(),
)
.polled_transaction_latency(
output_with_context.get_transaction_latency(),
)
.num_polled_transactions_count(
output_with_context.get_num_transactions(),
)
.polled_size_in_bytes(
output_with_context.metadata.total_size_in_bytes,
)
.build()
{
Ok(mut metrics) => metrics.log_metrics(),
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to log metrics"
);
break;
},
}
match poll_output_sender.send(output_with_context).await {
Ok(_) => {},
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Error sending output to channel"
);
break;
},
}
}
};
last_poll = tokio::time::Instant::now();
let polling_duration_for_logging = Instant::now();
let result = match poll_step.lock().await.poll().await {
Ok(result) => result,
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to poll"
);
break;
},
};
match StepMetricsBuilder::default()
.labels(StepMetricLabels {
step_name: poll_step_name.clone(),
})
.polling_duration_in_secs(
polling_duration_for_logging.elapsed().as_secs_f64(),
)
.build()
{
Ok(mut metrics) => metrics.log_metrics(),
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to log metrics"
);
break;
},
}
if let Some(outputs_with_context) = result {
for output_with_context in outputs_with_context {
match StepMetricsBuilder::default()
.labels(StepMetricLabels {
step_name: poll_step_name.clone(),
})
.latest_polled_version(output_with_context.metadata.end_version)
.latest_polled_transaction_timestamp(
output_with_context.get_start_transaction_timestamp_unix(),
)
.polled_transaction_latency(
output_with_context.get_transaction_latency(),
)
.num_polled_transactions_count(
output_with_context.get_num_transactions(),
)
.polled_size_in_bytes(
output_with_context.metadata.total_size_in_bytes,
)
.build()
{
Ok(mut metrics) => metrics.log_metrics(),
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Failed to log metrics"
);
break;
},
}
match poll_output_sender.send(output_with_context).await {
Ok(_) => {},
Err(e) => {
error!(
step_name = poll_step_name,
error = e.to_string(),
"Error sending output to channel"
);
break;
},
}
}
};

tokio::time::sleep(poll_duration).await;
}
});

Expand Down