Skip to content

Commit 7830acb

Browse files
authored
SOLR-18077: CrossDC Consumer - out-of-order Kafka partition processing (cherry-pick from #4125) (#4152)
1 parent 1fd74e1 commit 7830acb

File tree

4 files changed

+59
-4
lines changed

4 files changed

+59
-4
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
2+
title: "CrossDC Consumer: fix potential out-of-order Kafka partition processing"
3+
type: fixed # added, changed, fixed, deprecated, removed, dependency_update, security, other
4+
authors:
5+
- name: Andrzej Bialecki
6+
nick: ab
7+
links:
8+
- name: SOLR-18077
9+
url: https://issues.apache.org/jira/browse/SOLR-18077

solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/SolrAndKafkaIntegrationTest.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.lang.invoke.MethodHandles;
2626
import java.nio.charset.StandardCharsets;
2727
import java.util.ArrayList;
28+
import java.util.Collections;
2829
import java.util.Date;
2930
import java.util.HashMap;
3031
import java.util.List;
@@ -33,6 +34,7 @@
3334
import java.util.Properties;
3435
import java.util.concurrent.ExecutorService;
3536
import java.util.concurrent.Future;
37+
import java.util.stream.IntStream;
3638
import org.apache.commons.io.IOUtils;
3739
import org.apache.kafka.clients.producer.KafkaProducer;
3840
import org.apache.kafka.clients.producer.Producer;
@@ -122,8 +124,10 @@ public String bootstrapServers() {
122124
};
123125
kafkaCluster.start();
124126

125-
kafkaCluster.createTopic(TOPIC, 1, 1);
127+
kafkaCluster.createTopic(TOPIC, 10, 1);
126128

129+
// ensure small batches to test multi-partition ordering
130+
System.setProperty("batchSizeBytes", "128");
127131
System.setProperty("solr.crossdc.topicName", TOPIC);
128132
System.setProperty("solr.crossdc.bootstrapServers", kafkaCluster.bootstrapServers());
129133
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
@@ -183,6 +187,7 @@ public void afterSolrAndKafkaIntegrationTest() throws Exception {
183187
Thread.setDefaultUncaughtExceptionHandler(uceh);
184188
}
185189

190+
@Test
186191
public void testFullCloudToCloud() throws Exception {
187192
CloudSolrClient client = solrCluster1.getSolrClient(COLLECTION);
188193
SolrInputDocument doc = new SolrInputDocument();
@@ -198,6 +203,7 @@ public void testFullCloudToCloud() throws Exception {
198203
assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1);
199204
}
200205

206+
@Test
201207
public void testProducerToCloud() throws Exception {
202208
Properties properties = new Properties();
203209
properties.put("bootstrap.servers", kafkaCluster.bootstrapServers());
@@ -228,6 +234,39 @@ public void testProducerToCloud() throws Exception {
228234
producer.close();
229235
}
230236

237+
private static final String LOREM_IPSUM =
238+
"Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.";
239+
240+
@Test
241+
public void testStrictOrdering() throws Exception {
242+
CloudSolrClient client = solrCluster1.getSolrClient();
243+
int NUM_DOCS = 5000;
244+
// delay deletes by this many docs
245+
int DELTA = 100;
246+
for (int i = 0; i < NUM_DOCS; i++) {
247+
SolrInputDocument doc = new SolrInputDocument();
248+
doc.addField("id", "id-" + i);
249+
doc.addField("text", "some test with a relatively long field. " + LOREM_IPSUM);
250+
251+
client.add(COLLECTION, doc);
252+
if (i >= DELTA) {
253+
client.deleteById(COLLECTION, "id-" + (i - DELTA));
254+
}
255+
}
256+
257+
// send the remaining deletes in random order
258+
ArrayList<Integer> ids = new ArrayList<>(DELTA);
259+
IntStream.range(0, DELTA).forEach(i -> ids.add(i));
260+
Collections.shuffle(ids, random());
261+
for (Integer id : ids) {
262+
client.deleteById(COLLECTION, "id-" + (NUM_DOCS - DELTA + id));
263+
}
264+
265+
client.commit(COLLECTION);
266+
267+
assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 0);
268+
}
269+
231270
@Test
232271
@Ignore("This relies on collection properties and I don't see where they are read anymore")
233272
public void testMirroringUpdateProcessor() throws Exception {

solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/KafkaMirroringSink.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,15 @@ private void submitRequest(MirroredSolrRequest<?> request, String topicName)
9292
}
9393

9494
final long enqueueStartNanos = System.nanoTime();
95+
// required for multi-partition topics to preserve ordering of requests for a collection
96+
final String recordKey =
97+
request.getSolrRequest() != null ? request.getSolrRequest().getCollection() : null;
9598

9699
// Create Producer record
97100
try {
98101

99102
producer.send(
100-
new ProducerRecord<>(topicName, request),
103+
new ProducerRecord<>(topicName, recordKey, request),
101104
(metadata, exception) -> {
102105
if (exception != null) {
103106
log.error(

solr/solr-ref-guide/modules/deployment-guide/pages/cross-dc-replication.adoc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Optional configuration properties:
130130
`solr.crossdc.retryBackoffMs` _<integer>_:: The amount of time to wait before attempting to retry a failed request to a given topic partition.
131131
`solr.crossdc.deliveryTimeoutMS` _<integer>_:: Updates sent to the Kafka queue will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first
132132
`solr.crossdc.maxRequestSizeBytes` _<integer>_:: The maximum size of a Kafka queue request in bytes - limits the number of requests that will be sent over the queue in a single batch.
133-
`solr.crossdc.dlqTopicName` _<string>_: If not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
133+
`solr.crossdc.dlqTopicName` _<string>_:: If not empty then requests that failed processing `maxAttempts` times will be sent to a "dead letter queue" topic in Kafka (must exist if configured).
134134
`solr.crossdc.mirrorCommits` _<boolean>_:: If `true` then standalone commit requests will be mirrored, otherwise they will be processed only locally.
135135
`solr.crossdc.expandDbq` _<enum>_ :: If set to `expand` (default) then Delete-By-Query will be expanded before mirroring into series of Delete-By-Id, which may help with correct processing of out-of-order requests on the consumer side.
136136
If set to `none` then Delete-By-Query requests will be mirrored as-is.
@@ -212,4 +212,8 @@ Setting the `solr.crossdc.enabled` system property or xref:collection-management
212212
- When `solr.crossdc.expandDbq` property is set to `expand` (default) then Delete-By-Query converts to a series of Delete-By-Id, which can be much less efficient for queries matching large numbers of documents.
213213
Setting this property to `none` results in forwarding a real Delete-By-Query - this reduces the amount of data to mirror but may cause different results due to the potential re-ordering of failed & re-submitted requests between Consumer and the target Solr.
214214
- When `solr.crossdc.collapseUpdates` is set to `all` then multiple requests containing a mix of add and delete ops will be collapsed into a single outgoing request.
215-
This will cause the original ordering of add / delete ops to be lost (because Solr processing of an update request always processes all add ops first, and only then the delete ops), which may affect the final outcome when some of the ops refer to the same document ids.
215+
This will cause the original ordering of add / delete ops to be lost (because Solr processing of an update request always processes all add ops first, and only then the delete ops), which may affect the final outcome when some of the ops refer to the same document ids.
216+
- When the Kafka topic used for mirroring has multiple partitions the CrossDC Producer and Consumer guarantee strict ordering of updates ONLY within the same collection.
217+
In other words, when a multi-partition topic is used for mirroring there's no guarantee of a strict global request ordering across
218+
collections, which normally should not be an issue. However, if a strict global ordering across collections is required then
219+
the mirroring topic must use a single partition.

0 commit comments

Comments
 (0)