Skip to content

Commit b684f88

Browse files
authored
DOC-1123 Add is_high_watermark field and metadata (#193)
1 parent 409390a commit b684f88

File tree

2 files changed

+25
-13
lines changed

2 files changed

+25
-13
lines changed

modules/components/pages/inputs/redpanda_migrator_offsets.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ This input adds the following metadata fields to each message:
8080
- `kafka_offset_partition`
8181
- `kafka_offset_commit_timestamp`
8282
- `kafka_offset_metadata`
83+
- `kafka_is_high_watermark`
8384

8485
== Fields
8586

modules/components/pages/outputs/redpanda_migrator_offsets.adoc

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
component_type_dropdown::[]
1313

14-
15-
Use the `redpanda_migrator_offsets` output in conjunction with a `kafka_franz` input that is configured to read the `__consumer_offsets` topic.
16-
This output uses the https://github.com/twmb/franz-go[Franz Kafka client library^].
14+
Migrates offset data to a Redpanda cluster. To achieve this, pair the `redpanda_migrator_offsets` output with a matching xref:components:outputs/redpanda_migrator_bundle.adoc[`redpanda_migrator_bundle` input].
1715

1816
Alternatively, use the `redpanda_migrator_bundle` xref:components:inputs/redpanda_migrator_bundle.adoc[input] and xref:components:outputs/redpanda_migrator_bundle.adoc[output] to complete the migration of topics, messages, and schemas to a Redpanda cluster.
1917

18+
This output uses the https://github.com/twmb/franz-go[Franz Kafka client library^].
19+
2020

2121
ifndef::env-cloud[]
2222
Introduced in version 4.37.0.
@@ -39,6 +39,7 @@ output:
3939
offset_partition: ${! @kafka_offset_partition }
4040
offset_commit_timestamp: ${! @kafka_offset_commit_timestamp }
4141
offset_metadata: ${! @kafka_offset_metadata }
42+
is_high_watermark: ${! @kafka_is_high_watermark }
4243
```
4344
4445
--
@@ -57,16 +58,17 @@ output:
5758
enabled: false
5859
skip_cert_verify: false
5960
enable_renegotiation: false
60-
root_cas: ""
61-
root_cas_file: ""
62-
client_certs: []
61+
root_cas: "" # Optional
62+
root_cas_file: "" # Optional
63+
client_certs: [] # Optional
6364
sasl: [] # No default (optional)
6465
metadata_max_age: 5m
6566
offset_topic: ${! @kafka_offset_topic }
6667
offset_group: ${! @kafka_offset_group }
6768
offset_partition: ${! @kafka_offset_partition }
6869
offset_commit_timestamp: ${! @kafka_offset_commit_timestamp }
6970
offset_metadata: ${! @kafka_offset_metadata }
71+
is_high_watermark: ${! @kafka_is_high_watermark }
7072
timeout: 10s
7173
max_message_bytes: 1MiB
7274
broker_write_max_bytes: 100MiB
@@ -301,7 +303,7 @@ The SASL mechanism to use.
301303
| `SCRAM-SHA-512`
302304
| SCRAM-based authentication as specified in RFC 5802.
303305
| `none`
304-
| Disable SASL authentication
306+
| Disable SASL authentication.
305307

306308
|===
307309

@@ -443,54 +445,63 @@ An external ID to provide when assuming a role.
443445
*Default*: `""`
444446

445447

446-
=== metadata_max_age
448+
=== `metadata_max_age`
447449

448450
The maximum period of time after which metadata is refreshed.
449451

450452
*Type*: `string`
451453

452454
*Default*: `5m`
453455

454-
=== offset_topic
456+
=== `offset_topic`
455457

456458
The name of the Kafka offset topic to process. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
457459

458460
*Type*: `string`
459461

460462
*Default*: `${! @kafka_offset_topic }`
461463

462-
=== offset_group
464+
=== `offset_group`
463465

464466
The name of the Kafka offset consumer group to process. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
465467

466468
*Type*: `string`
467469

468470
*Default*: `${! @kafka_offset_group }`
469471

470-
=== offset_partition
472+
=== `offset_partition`
471473

472474
The Kafka offset partition ID to process. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
473475

474476
*Type*: `string`
475477

476478
*Default*: `${! @kafka_offset_partition }`
477479

478-
=== offset_commit_timestamp
480+
=== `offset_commit_timestamp`
479481

480482
The Kafka offset commit timestamp to process. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
481483

482484
*Type*: `string`
483485

484486
*Default*: `${! @kafka_offset_commit_timestamp }`
485487

486-
=== offset_metadata
488+
=== `offset_metadata`
487489

488490
The Kafka offset metadata value to process. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
489491

490492
*Type*: `string`
491493

492494
*Default*: `${! @kafka_offset_metadata }`
493495

496+
=== `is_high_watermark`
497+
498+
Indicates whether the processed message is the high watermark, which is the offset of the last fully replicated message in the Kafka topic partition. This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
499+
500+
*Type*: `string`
501+
502+
*Default*: `${! @kafka_is_high_watermark }`
503+
504+
494505
=== `timeout`
495506

496507
The maximum period of time to wait for message sends before abandoning the request and retrying it.

0 commit comments

Comments
 (0)