Skip to content

Commit 14578f6

Browse files
[Pull-based Ingestion] Fix poller bug and remove persisted pointers (#19607)
* fix poller bug and remove persisted pointers * update kafka consumer to use opensearch max_batch_size as max.poll.records when not explicitly configured * fix kinesis tests --------- Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent f15cbbd commit 14578f6

File tree

20 files changed

+122
-224
lines changed

20 files changed

+122
-224
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
2323
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
2424
- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436))
25+
- Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607))
2526

2627
### Dependencies
2728
- Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575))

plugins/ingestion-fs/src/main/java/org/opensearch/plugin/ingestion/fs/FileConsumerFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88

99
package org.opensearch.plugin.ingestion.fs;
1010

11+
import org.opensearch.cluster.metadata.IngestionSource;
1112
import org.opensearch.index.IngestionConsumerFactory;
1213

13-
import java.util.Map;
14-
1514
/**
1615
* Factory for creating file-based ingestion consumers.
1716
*/
@@ -25,8 +24,8 @@ public class FileConsumerFactory implements IngestionConsumerFactory<FilePartiti
2524
public FileConsumerFactory() {}
2625

2726
@Override
28-
public void initialize(Map<String, Object> params) {
29-
this.config = new FileSourceConfig(params);
27+
public void initialize(IngestionSource ingestionSource) {
28+
this.config = new FileSourceConfig(ingestionSource.params());
3029
}
3130

3231
@Override

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -493,14 +493,14 @@ public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
493493
refresh(indexName);
494494
waitForSearchableDocs(40, List.of(nodeA, nodeB));
495495

496-
// Verify both primary and replica have polled only remaining 20 messages
496+
// Verify both primary and replica have indexed remaining messages
497497
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
498498
assertNotNull(shardTypeToStats.get("primary"));
499499
assertNotNull(shardTypeToStats.get("replica"));
500-
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(20L));
500+
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(21L));
501501
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
502502
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
503-
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(20L));
503+
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(21L));
504504
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
505505
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
506506
}
@@ -557,13 +557,13 @@ public void testResetPollerInAllActiveIngestion() throws Exception {
557557
);
558558
});
559559

560-
// validate there are 8 duplicate messages encountered after reset
560+
// validate there are 8 messages polled after reset
561561
waitForState(() -> {
562562
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
563563
assertNotNull(shardTypeToStats.get("primary"));
564564
assertNotNull(shardTypeToStats.get("replica"));
565-
return shardTypeToStats.get("primary").getConsumerStats().totalDuplicateMessageSkippedCount() == 8
566-
&& shardTypeToStats.get("replica").getConsumerStats().totalDuplicateMessageSkippedCount() == 8;
565+
return shardTypeToStats.get("primary").getConsumerStats().totalPolledCount() == 8
566+
&& shardTypeToStats.get("replica").getConsumerStats().totalPolledCount() == 8;
567567
});
568568
}
569569

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -568,7 +568,8 @@ public void testOffsetUpdateOnBlockErrorPolicy() throws Exception {
568568
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
569569
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
570570
.getPollingIngestStats();
571-
assertThat(stats.getConsumerStats().totalDuplicateMessageSkippedCount(), is(0L));
571+
assertThat(stats.getConsumerStats().totalPolledCount(), is(3L));
572+
assertThat(stats.getConsumerStats().totalPollerMessageFailureCount(), is(0L));
572573
}
573574

574575
public void testConsumerResetByTimestamp() throws Exception {
@@ -625,10 +626,12 @@ public void testConsumerResetByTimestamp() throws Exception {
625626
resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP, "102");
626627
assertTrue(resumeResponse.isAcknowledged());
627628
assertTrue(resumeResponse.isShardsAcknowledged());
629+
630+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
628631
waitForState(() -> {
629632
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
630633
.getPollingIngestStats();
631-
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 3;
634+
return stats.getConsumerStats().totalPolledCount() == 3;
632635
});
633636
}
634637

@@ -694,10 +697,10 @@ public void testRemoteSnapshotRestore() throws Exception {
694697
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
695698
assertHitCount(client().prepareSearch(indexName).get(), 4);
696699

697-
// after index is restored, it should only poll remaining 2 messages
700+
// after index is restored, it should resume ingestion from batchStartPointer available in the latest commit
698701
PollingIngestStats stats2 = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
699702
.getPollingIngestStats();
700-
assertEquals(2, stats2.getConsumerStats().totalPolledCount());
703+
assertEquals(3, stats2.getConsumerStats().totalPolledCount());
701704
assertEquals(0, stats2.getMessageProcessorStats().totalVersionConflictsCount());
702705
}
703706

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaConsumerFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,9 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.opensearch.cluster.metadata.IngestionSource;
1112
import org.opensearch.index.IngestionConsumerFactory;
1213

13-
import java.util.Map;
14-
1514
/**
1615
* Factory for creating Kafka consumers
1716
*/
@@ -28,8 +27,8 @@ public class KafkaConsumerFactory implements IngestionConsumerFactory<KafkaParti
2827
public KafkaConsumerFactory() {}
2928

3029
@Override
31-
public void initialize(Map<String, Object> params) {
32-
config = new KafkaSourceConfig(params);
30+
public void initialize(IngestionSource ingestionSource) {
31+
config = new KafkaSourceConfig((int) ingestionSource.getMaxPollSize(), ingestionSource.params());
3332
}
3433

3534
@Override

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.plugin.kafka;
1010

1111
import org.apache.kafka.clients.consumer.Consumer;
12+
import org.apache.kafka.clients.consumer.ConsumerConfig;
1213
import org.apache.kafka.clients.consumer.ConsumerRecord;
1314
import org.apache.kafka.clients.consumer.ConsumerRecords;
1415
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -89,25 +90,32 @@ protected KafkaPartitionConsumer(String clientId, KafkaSourceConfig config, int
8990
}
9091

9192
/**
92-
* Create a Kafka consumer. visible for testing
93+
* Create consumer properties with default configurations and apply user provided overrides on top.
9394
* @param clientId the client id
9495
* @param config the Kafka source config
95-
* @return the Kafka consumer
96+
* @return the consumer properties
9697
*/
97-
protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaSourceConfig config) {
98+
protected static Properties createConsumerProperties(String clientId, KafkaSourceConfig config) {
9899
Properties consumerProp = new Properties();
99-
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
100-
consumerProp.put("client.id", clientId);
100+
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
101+
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
101102

102-
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
103+
// apply user provided overrides
103104
consumerProp.putAll(config.getConsumerConfigurations());
104105

105-
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
106-
// consumerProp.put("key.deserializer",
107-
// "org.apache.kafka.common.serialization.StringDeserializer");
108-
// consumerProp.put("value.deserializer",
109-
// "org.apache.kafka.common.serialization.StringDeserializer");
110-
//
106+
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), consumerProp);
107+
return consumerProp;
108+
}
109+
110+
/**
111+
* Create a Kafka consumer. visible for testing
112+
* @param clientId the client id
113+
* @param config the Kafka source config
114+
* @return the Kafka consumer
115+
*/
116+
protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaSourceConfig config) {
117+
Properties consumerProp = createConsumerProperties(clientId, config);
118+
111119
// wrap the kafka consumer creation in a privileged block to apply plugin security policies
112120
final ClassLoader restore = Thread.currentThread().getContextClassLoader();
113121
try {
@@ -124,6 +132,15 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
124132
}
125133
}
126134

135+
/**
136+
* Read the next batch of messages from Kafka, starting from the provided offset.
137+
* @param offset the pointer to start reading from,
138+
* @param includeStart whether to include the start pointer in the read
139+
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
140+
* @param timeoutMillis the maximum time to wait for messages
141+
* @return
142+
* @throws TimeoutException
143+
*/
127144
@Override
128145
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
129146
KafkaOffset offset,
@@ -132,25 +149,22 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
132149
int timeoutMillis
133150
) throws TimeoutException {
134151
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
135-
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
136-
offset.getOffset(),
137-
includeStart,
138-
maxMessages,
139-
timeoutMillis
140-
)
152+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(), includeStart, timeoutMillis)
141153
);
142154
return records;
143155
}
144156

157+
/**
158+
* Read the next batch of messages from Kafka.
159+
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
160+
* @param timeoutMillis the maximum time to wait for messages
161+
* @return
162+
* @throws TimeoutException
163+
*/
145164
@Override
146165
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
147166
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
148-
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
149-
lastFetchedOffset,
150-
false,
151-
maxMessages,
152-
timeoutMillis
153-
)
167+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(lastFetchedOffset, false, timeoutMillis)
154168
);
155169
return records;
156170
}
@@ -209,12 +223,7 @@ public IngestionShardPointer pointerFromOffset(String offset) {
209223
return new KafkaOffset(offsetValue);
210224
}
211225

212-
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
213-
long startOffset,
214-
boolean includeStart,
215-
long maxMessages,
216-
int timeoutMillis
217-
) {
226+
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, boolean includeStart, int timeoutMillis) {
218227
long kafkaStartOffset = startOffset;
219228
if (!includeStart) {
220229
kafkaStartOffset += 1;
@@ -229,16 +238,10 @@ private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
229238

230239
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(timeoutMillis));
231240
List<ConsumerRecord<byte[], byte[]>> messageAndOffsets = consumerRecords.records(topicPartition);
232-
233-
long endOffset = kafkaStartOffset + maxMessages;
234241
List<ReadResult<KafkaOffset, KafkaMessage>> results = new ArrayList<>();
235242

236243
for (ConsumerRecord<byte[], byte[]> messageAndOffset : messageAndOffsets) {
237244
long currentOffset = messageAndOffset.offset();
238-
if (currentOffset >= endOffset) {
239-
// fetched more message than max
240-
break;
241-
}
242245
lastFetchedOffset = currentOffset;
243246
KafkaOffset kafkaOffset = new KafkaOffset(currentOffset);
244247
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp());

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
1112
import org.opensearch.core.util.ConfigurationUtils;
1213

1314
import java.util.HashMap;
@@ -19,27 +20,39 @@
1920
public class KafkaSourceConfig {
2021
private final String PROP_TOPIC = "topic";
2122
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
22-
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";
2323

2424
private final String topic;
2525
private final String bootstrapServers;
2626
private final String autoOffsetResetConfig;
27+
private final int maxPollRecords;
2728

2829
private final Map<String, Object> consumerConfigsMap;
2930

3031
/**
3132
* Extracts and look for required and optional kafka consumer configurations.
33+
* @param maxPollSize the maximum batch size to read in a single poll
3234
* @param params the configuration parameters
3335
*/
34-
public KafkaSourceConfig(Map<String, Object> params) {
36+
public KafkaSourceConfig(int maxPollSize, Map<String, Object> params) {
37+
this.consumerConfigsMap = new HashMap<>(params);
3538
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
3639
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
37-
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
38-
this.consumerConfigsMap = new HashMap<>(params);
3940

40-
// remove above configurations
41+
// 'auto.offset.reset' is handled differently for Kafka sources, with the default set to none.
42+
// This ensures out-of-bounds offsets throw an error, unless the user explicitly sets different value.
43+
this.autoOffsetResetConfig = ConfigurationUtils.readStringProperty(params, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
44+
45+
// OpenSearch supports 'maxPollSize' setting for consumers. If user did not provide a 'max.poll.records' setting,
46+
// maxPollSize will be used instead.
47+
this.maxPollRecords = ConfigurationUtils.readIntProperty(params, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollSize);
48+
49+
// remove metadata configurations
4150
consumerConfigsMap.remove(PROP_TOPIC);
4251
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);
52+
53+
// add or overwrite required configurations with defaults if not present
54+
consumerConfigsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
55+
consumerConfigsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
4356
}
4457

4558
/**

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaConsumerFactoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.opensearch.cluster.metadata.IngestionSource;
1112
import org.opensearch.test.OpenSearchTestCase;
1213
import org.junit.Assert;
1314

@@ -21,7 +22,7 @@ public void testInitialize() {
2122
params.put("topic", "test-topic");
2223
params.put("bootstrap_servers", "localhost:9092");
2324

24-
factory.initialize(params);
25+
factory.initialize(new IngestionSource.Builder("KAFKA").setParams(params).build());
2526

2627
KafkaSourceConfig config = factory.config;
2728
Assert.assertNotNull("Config should be initialized", config);

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void setUp() throws Exception {
4444
params.put("topic", "test-topic");
4545
params.put("bootstrap_servers", "localhost:9092");
4646

47-
config = new KafkaSourceConfig(params);
47+
config = new KafkaSourceConfig(1000, params);
4848
mockConsumer = mock(KafkaConsumer.class);
4949
// Mock the partitionsFor method
5050
PartitionInfo partitionInfo = new PartitionInfo("test-topic", 0, null, null, null);
@@ -109,7 +109,7 @@ public void testTopicDoesNotExist() {
109109
Map<String, Object> params = new HashMap<>();
110110
params.put("topic", "non-existent-topic");
111111
params.put("bootstrap_servers", "localhost:9092");
112-
var kafkaSourceConfig = new KafkaSourceConfig(params);
112+
var kafkaSourceConfig = new KafkaSourceConfig(1000, params);
113113
when(mockConsumer.partitionsFor(eq("non-existent-topic"), any(Duration.class))).thenReturn(null);
114114
try {
115115
new KafkaPartitionConsumer("client1", kafkaSourceConfig, 0, mockConsumer);

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaSingleNodeTests.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,16 @@ public void testPauseAndResumeAPIs() throws Exception {
125125
);
126126
});
127127

128-
// validate duplicate messages are skipped
128+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(29);
129129
waitForState(() -> {
130+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
130131
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
131132
.getPollingIngestStats();
132-
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;
133+
134+
return response.getHits().getTotalHits().value() == 2
135+
&& stats != null
136+
&& stats.getConsumerStats().totalPolledCount() == 4
137+
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 0;
133138
});
134139
}
135140

0 commit comments

Comments
 (0)