diff --git a/README.md b/README.md index 48d603e..97853ad 100644 --- a/README.md +++ b/README.md @@ -19,132 +19,8 @@ aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos # Get started -We’ve created a [Quickstart Guide to Aptos Indexer SDK](https://github.com/aptos-labs/aptos-indexer-processor-example) which gets you setup and running an events processor that indexes events on the Aptos blockchain. +We’ve created a [Quickstart Guide to Aptos Indexer SDK](https://aptos.dev/build/indexer/indexer-sdk/quickstart) which gets you setup and running an events processor that indexes events on the Aptos blockchain. # Documentation - -## Creating a step - -To create a step in the SDK, implement these traits: - -1. **Processable** - - ```rust - #[async_trait] - impl Processable for MyExtractorStep { - type Input = Transaction; - type Output = ExtractedDataModel; - type RunType = AsyncRunType; - - // Processes a batch of input items and returns a batch of output items. - async fn process( - &mut self, - input: TransactionContext, - ) -> Result>, ProcessorError> { - let extracted_data = ... - // Extract data from items.data - - Ok(Some(TransactionContext { - data: extracted_data, - start_version: input.start_version, - end_version: input.end_version, - start_transaction_timestamp: input.start_transaction_timestamp, - end_transaction_timestamp: input.end_transaction_timestamp, - total_size_in_bytes: input.total_size_in_bytes, - })) - } - } - ``` - -2. **NamedStep** - - ```rust - impl NamedStep for MyExtractorStep { - fn name(&self) -> String { - "MyExtractorStep".to_string() - } - } - ``` - -3. Either **AsyncStep** or **PollableAsyncStep**, which defines how the step will be run in the processor. - 1. The most basic step is an `AsyncStep`, which processes a batch of input items and returns a batch of output items. - - ```rust - #[async_trait] - impl Processable for MyExtractorStep { - ... - type RunType = AsyncRunType; - ... - } - - impl AsyncStep for MyExtractorStep {} - ``` - - 2. A `PollableAsyncStep` does the same as `AsyncStep`, but it also periodically polls its internal state and returns a batch of output items if available. - - ```rust - #[async_trait] - impl Processable for MyPollStep - where - Self: Sized + Send + 'static, - T: Send + 'static, - { - ... - type RunType = PollableAsyncRunType; - ... - } - - #[async_trait] - impl PollableAsyncStep for MyPollStep - where - Self: Sized + Send + Sync + 'static, - T: Send + 'static, - { - /// Returns the duration between polls - fn poll_interval(&self) -> std::time::Duration { - // Define duration - } - - /// Polls the internal state and returns a batch of output items if available. - async fn poll(&mut self) -> Result>>, ProcessorError> { - // Define polling logic - } - } - ``` - - -## Common steps - -The SDK provides several common steps to use in your processor. - -1. `TransactionStreamStep` provides a stream of Aptos transactions to the processor -2. `TimedBufferStep` buffers a batch of items and periodically polls to release the items to the next step - -## Connecting steps - -When `ProcessorBuilder` connects two steps, a channel is created linking the two steps and the output of the first step becomes the input of the next step. - -```rust -let (pb, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( - first_step.into_runnable_step(), - ) - .connect_to(second_step.into_runnable_step(), channel_size) - .connect_to(third_step.into_runnable_step(), channel_size) - .end_and_return_output_receiver(channel_size); -``` - -## Adding a new processor - -1. Use [aptos-indexer-processor-example](https://github.com/aptos-labs/aptos-indexer-processor-example) as a starting point -2. Add the new processor to [ProcessorConfig](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/processor_config.rs#L34) and [Processor](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/processor_config.rs#L58) -3. Add the processor to [RunnableConfig](https://github.com/aptos-labs/aptos-indexer-processor-example/blob/a8bbb23056d55b86b4ded6822c9120e5e8763d50/aptos-indexer-processor-example/src/config/indexer_processor_config.rs#L25) - -## Running a processor - -To run the processor, we recommend using the example in [aptos-indexer-processor-example](https://github.com/aptos-labs/aptos-indexer-processor-example) and following this [configuration guide](https://github.com/aptos-labs/aptos-indexer-processor-example?tab=readme-ov-file#configuring-your-processor). - -## Advanced features (experimental) - -1. Fanout + ArcifyStep -2. Fan in +Full documentation can be found [here](https://aptos.dev/build/indexer/indexer-sdk/documentation) diff --git a/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/basic_processor_function.rs b/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/basic_processor_function.rs index 89e608e..c6abfc7 100644 --- a/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/basic_processor_function.rs +++ b/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/basic_processor_function.rs @@ -79,7 +79,7 @@ where } } -async fn run_processor( +pub async fn run_processor( processor_name: String, transaction_stream_config: TransactionStreamConfig, postgres_config: PostgresConfig, diff --git a/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/mod.rs b/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/mod.rs index 6a1b3c3..d502cd5 100644 --- a/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/mod.rs +++ b/aptos-indexer-processors-sdk/sdk/src/postgres/basic_processor/mod.rs @@ -1,4 +1,4 @@ pub mod basic_processor_function; pub mod basic_processor_step; -pub use basic_processor_function::process; +pub use basic_processor_function::{process, run_processor}; diff --git a/aptos-indexer-processors-sdk/sdk/src/postgres/utils/database.rs b/aptos-indexer-processors-sdk/sdk/src/postgres/utils/database.rs index 4cdf070..ac45e7d 100644 --- a/aptos-indexer-processors-sdk/sdk/src/postgres/utils/database.rs +++ b/aptos-indexer-processors-sdk/sdk/src/postgres/utils/database.rs @@ -44,7 +44,7 @@ pub fn clean_data_for_db serde::Deserialize<'de>> } } -fn establish_connection(database_url: &str) -> BoxFuture> { +fn establish_connection(database_url: &str) -> BoxFuture<'_, ConnectionResult> { use native_tls::{Certificate, TlsConnector}; use postgres_native_tls::MakeTlsConnector; diff --git a/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs b/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs index 07e7104..0b24116 100644 --- a/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs +++ b/aptos-indexer-processors-sdk/sdk/src/traits/pollable_async_step.rs @@ -114,89 +114,87 @@ where let poll_step_name = step_name.clone(); let poll_output_sender = output_sender.clone(); let mut polling_task = tokio::spawn(async move { - let mut last_poll = tokio::time::Instant::now(); let poll_duration = poll_step.lock().await.poll_interval(); while poll_step.lock().await.should_continue_polling().await { // It's possible that the channel always has items, so we need to ensure we call `poll` manually if we need to - if last_poll.elapsed() >= poll_duration { - let polling_duration_for_logging = Instant::now(); - let result = match poll_step.lock().await.poll().await { - Ok(result) => result, - Err(e) => { - error!( - step_name = poll_step_name, - error = e.to_string(), - "Failed to poll" - ); - break; - }, - }; - match StepMetricsBuilder::default() - .labels(StepMetricLabels { - step_name: poll_step_name.clone(), - }) - .polling_duration_in_secs( - polling_duration_for_logging.elapsed().as_secs_f64(), - ) - .build() - { - Ok(mut metrics) => metrics.log_metrics(), - Err(e) => { - error!( - step_name = poll_step_name, - error = e.to_string(), - "Failed to log metrics" - ); - break; - }, - } - if let Some(outputs_with_context) = result { - for output_with_context in outputs_with_context { - match StepMetricsBuilder::default() - .labels(StepMetricLabels { - step_name: poll_step_name.clone(), - }) - .latest_polled_version(output_with_context.metadata.end_version) - .latest_polled_transaction_timestamp( - output_with_context.get_start_transaction_timestamp_unix(), - ) - .polled_transaction_latency( - output_with_context.get_transaction_latency(), - ) - .num_polled_transactions_count( - output_with_context.get_num_transactions(), - ) - .polled_size_in_bytes( - output_with_context.metadata.total_size_in_bytes, - ) - .build() - { - Ok(mut metrics) => metrics.log_metrics(), - Err(e) => { - error!( - step_name = poll_step_name, - error = e.to_string(), - "Failed to log metrics" - ); - break; - }, - } - match poll_output_sender.send(output_with_context).await { - Ok(_) => {}, - Err(e) => { - error!( - step_name = poll_step_name, - error = e.to_string(), - "Error sending output to channel" - ); - break; - }, - } - } - }; - last_poll = tokio::time::Instant::now(); + let polling_duration_for_logging = Instant::now(); + let result = match poll_step.lock().await.poll().await { + Ok(result) => result, + Err(e) => { + error!( + step_name = poll_step_name, + error = e.to_string(), + "Failed to poll" + ); + break; + }, + }; + match StepMetricsBuilder::default() + .labels(StepMetricLabels { + step_name: poll_step_name.clone(), + }) + .polling_duration_in_secs( + polling_duration_for_logging.elapsed().as_secs_f64(), + ) + .build() + { + Ok(mut metrics) => metrics.log_metrics(), + Err(e) => { + error!( + step_name = poll_step_name, + error = e.to_string(), + "Failed to log metrics" + ); + break; + }, } + if let Some(outputs_with_context) = result { + for output_with_context in outputs_with_context { + match StepMetricsBuilder::default() + .labels(StepMetricLabels { + step_name: poll_step_name.clone(), + }) + .latest_polled_version(output_with_context.metadata.end_version) + .latest_polled_transaction_timestamp( + output_with_context.get_start_transaction_timestamp_unix(), + ) + .polled_transaction_latency( + output_with_context.get_transaction_latency(), + ) + .num_polled_transactions_count( + output_with_context.get_num_transactions(), + ) + .polled_size_in_bytes( + output_with_context.metadata.total_size_in_bytes, + ) + .build() + { + Ok(mut metrics) => metrics.log_metrics(), + Err(e) => { + error!( + step_name = poll_step_name, + error = e.to_string(), + "Failed to log metrics" + ); + break; + }, + } + match poll_output_sender.send(output_with_context).await { + Ok(_) => {}, + Err(e) => { + error!( + step_name = poll_step_name, + error = e.to_string(), + "Error sending output to channel" + ); + break; + }, + } + } + }; + + tokio::time::sleep(poll_duration).await; } });