Skip to content

Commit c537209

Browse files
varunbharadwajkolchfa-awsnatebower
authored
[Pull-based Ingestion] Update pull-based ingestion metrics and reset settings (opensearch-project#9993)
* Update pull-based ingestion documentation with new metrics and reset settings in resume API Signed-off-by: Varun Bharadwaj <[email protected]> * Update resume api description Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update reset settings description Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion-management.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Update _api-reference/document-apis/pull-based-ingestion.md Co-authored-by: Nathan Bower <[email protected]> Signed-off-by: Varun Bharadwaj <[email protected]> * Address review comments Signed-off-by: Varun Bharadwaj <[email protected]> --------- Signed-off-by: Varun Bharadwaj <[email protected]> Co-authored-by: kolchfa-aws <[email protected]> Co-authored-by: Nathan Bower <[email protected]>
1 parent cc225df commit c537209

File tree

2 files changed

+49
-8
lines changed

2 files changed

+49
-8
lines changed

_api-reference/document-apis/pull-based-ingestion-management.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ POST /my-index/ingestion/_pause
5454

5555
Resumes ingestion for one or more indexes. When resumed, OpenSearch continues consuming data from the streaming source for all shards in the specified indexes.
5656

57+
As part of the resume operation, you can optionally reset the stream consumer to start reading from a specific offset or timestamp. If reset settings are specified, all consumers for the selected shards are reset before the resume operation is applied to the index. Resetting a consumer also triggers an internal flush to persist the changes.
58+
5759
### Endpoint
5860

5961
```json
@@ -77,10 +79,39 @@ The following table lists the available query parameters. All query parameters a
7779
| `cluster_manager_timeout` | Time units | The amount of time to wait for a connection to the cluster manager node. Default is `30s`. |
7880
| `timeout` | Time units | The amount of time to wait for a response from the cluster. Default is `30s`. |
7981

82+
### Request body fields
83+
84+
The following table lists the available request body fields.
85+
86+
| Field | Data type | Required/Optional | Description |
87+
| :--- | :--- | :--- | :--- |
88+
| `reset_settings` | Array | Optional | A list of reset settings for each shard. If not provided, OpenSearch resumes ingestion from the current position for each shard in the specified index. |
89+
| `reset_settings.shard` | Integer | Required | The shard to reset. |
90+
| `reset_settings.mode` | String | Required | The reset mode. Valid values are `offset` (a positive integer offset) and `timestamp` (a Unix timestamp in milliseconds). |
91+
| `reset_settings.value` | String | Required | &ensp;&#x2022; `offset`: The Apache Kafka offset or Amazon Kinesis sequence number<br>&ensp;&#x2022; `timestamp`: A Unix timestamp in milliseconds. |
92+
8093
### Example request
8194

95+
To resume ingestion without specifying reset settings, send the following request:
96+
97+
```json
98+
POST /my-index/ingestion/_resume
99+
```
100+
{% include copy-curl.html %}
101+
102+
To provide reset settings when resuming ingestion, send the following request:
103+
82104
```json
83105
POST /my-index/ingestion/_resume
106+
{
107+
"reset_settings": [
108+
{
109+
"shard": 0,
110+
"mode": "offset",
111+
"value": "1"
112+
}
113+
]
114+
}
84115
```
85116
{% include copy-curl.html %}
86117

_api-reference/document-apis/pull-based-ingestion.md

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ The `ingestion_source` parameters control how OpenSearch pulls data from the str
6868
| Parameter | Description |
6969
| :--- | :--- |
7070
| `type` | The streaming source type. Required. Valid values are `kafka` or `kinesis`. |
71-
| `pointer.init.reset` | Determines where to start reading from the stream. Optional. Valid values are `earliest`, `latest`, `rewind_by_offset`, `rewind_by_timestamp`, or `none`. See [Stream position](#stream-position). |
72-
| `pointer.init.reset.value` | Required only for `rewind_by_offset` or `rewind_by_timestamp`. Specifies the offset value or timestamp in milliseconds. See [Stream position](#stream-position). |
71+
| `pointer.init.reset` | Determines the stream location from which to start reading. Optional. Valid values are `earliest`, `latest`, `reset_by_offset`, `reset_by_timestamp`, or `none`. See [Stream position](#stream-position). |
72+
| `pointer.init.reset.value` | Required only for `reset_by_offset` or `reset_by_timestamp`. Specifies the offset value or timestamp in milliseconds. See [Stream position](#stream-position). |
7373
| `error_strategy` | How to handle failed messages. Optional. Valid values are `DROP` (failed messages are skipped and ingestion continues) and `BLOCK` (when a message fails, ingestion stops). Default is `DROP`. We recommend using `DROP` for the current experimental release. |
7474
| `max_batch_size` | The maximum number of records to retrieve in each poll operation. Optional. |
7575
| `poll.timeout` | The maximum time to wait for data in each poll operation. Optional. |
@@ -84,11 +84,11 @@ The following table provides the valid `pointer.init.reset` values and their cor
8484

8585
| `pointer.init.reset` | Starting ingestion point | `pointer.init.reset.value` |
8686
| :--- | :--- | :--- |
87-
| `earliest` | Beginning of stream | None |
88-
| `latest` | Current end of stream | None |
89-
| `rewind_by_offset` | Specific offset in the stream | A positive integer offset. Required. |
90-
| `rewind_by_timestamp` | Specific point in time | A Unix timestamp in milliseconds. Required. <br> For Kafka streams, defaults to Kafka's `auto.offset.reset` policy if no messages are found for the given timestamp. |
91-
| `none` | Last committed position for existing indexes | None |
87+
| `earliest` | The beginning of the stream | None |
88+
| `latest` | The current end of the stream | None |
89+
| `reset_by_offset` | A specific offset in the stream | A positive integer offset. Required. |
90+
| `reset_by_timestamp` | A specific point in time | A Unix timestamp in milliseconds. Required. <br> For Kafka streams, defaults to Kafka's `auto.offset.reset` policy if no messages are found for the given timestamp. |
91+
| `none` | The last committed position for existing indexes | None |
9292

9393
### Stream partitioning
9494

@@ -131,7 +131,7 @@ Each data unit in the streaming source (Kafka message or Kinesis record) must in
131131
| :--- | :--- | :--- | :--- |
132132
| `_id` | String | No | A unique identifier for a document. If not provided, OpenSearch auto-generates an ID. Required for document updates or deletions. |
133133
| `_version` | Long | No | A document version number, which must be maintained externally. If provided, OpenSearch drops messages with versions earlier than the current document version. If not provided, no version checking occurs. |
134-
| `_op_type` | String | No | The operation to perform. Valid values are:<br>- `index`: Creates a new document or updates an existing one<br>- `delete`: Soft deletes a document |
134+
| `_op_type` | String | No | The operation to perform. Valid values are:<br>- `index`: Creates a new document or updates an existing one.<br>- `create`: Creates a new document in append mode. Note that this will not update existing documents. <br>- `delete`: Soft deletes a document. |
135135
| `_source` | Object | Yes | The message payload containing the document data. |
136136

137137
## Pull-based ingestion metrics
@@ -143,7 +143,17 @@ The following table lists the available `polling_ingest_stats` metrics.
143143
| Metric | Description |
144144
| :--- | :--- |
145145
| `message_processor_stats.total_processed_count` | The total number of messages processed by the message processor. |
146+
| `message_processor_stats.total_invalid_message_count` | The number of invalid messages encountered. |
147+
| `message_processor_stats.total_version_conflicts_count` | The number of version conflicts due to which older version messages will be dropped. |
148+
| `message_processor_stats.total_failed_count` | The total number of failed messages, which error out during processing. |
149+
| `message_processor_stats.total_failures_dropped_count` | The total number of failed messages, which are dropped after exhausting retries. Note that messages are only dropped when the DROP error policy is used. |
150+
| `message_processor_stats.total_processor_thread_interrupt_count` | Indicates the number of thread interruptions on the processor thread. |
146151
| `consumer_stats.total_polled_count` | The total number of messages polled from the stream consumer. |
152+
| `consumer_stats.total_consumer_error_count` | The total number of fatal consumer read errors. |
153+
| `consumer_stats.total_poller_message_failure_count` | The total number of failed messages on the poller. |
154+
| `consumer_stats.total_poller_message_dropped_count` | The total number of failed messages on the poller that were dropped. |
155+
| `consumer_stats.total_duplicate_message_skipped_count` | The total number of skipped messages that were previously processed. |
156+
| `consumer_stats.lag_in_millis` | Lag in milliseconds, computed as the time elapsed since the last processed message timestamp. |
147157

148158
To retrieve shard-level pull-based ingestion metrics, use the [Nodes Stats API]({{site.url}}{{site.baseurl}}/api-reference/index-apis/update-settings/):
149159

0 commit comments

Comments
 (0)