Skip to content

Commit e0ee3b4

Browse files
Update ingestion status in index metadata xcontent and avoid retry for parsing error (opensearch-project#19320)
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 7c6b3c5 commit e0ee3b4

File tree

14 files changed

+228
-31
lines changed

14 files changed

+228
-31
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6767
- Remove unnecessary looping in field data cache clear ([#19116](https://github.com/opensearch-project/OpenSearch/pull/19116))
6868
- [Flaky Test] Fix flaky test IngestFromKinesisIT.testAllActiveIngestion ([#19380](https://github.com/opensearch-project/OpenSearch/pull/19380))
6969
- Fix lag metric for pull-based ingestion when streaming source is empty ([#19393](https://github.com/opensearch-project/OpenSearch/pull/19393))
70+
- Fix ingestion state xcontent serialization in IndexMetadata and fail fast on mapping errors([#19320](https://github.com/opensearch-project/OpenSearch/pull/19320))
7071

7172
### Dependencies
7273
- Update to Gradle 9.1.0 ([#19329](https://github.com/opensearch-project/OpenSearch/pull/19329))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void testKafkaIngestion_RewindByTimeStamp() {
107107
.put("ingestion_source.param.topic", "test")
108108
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
109109
.put("ingestion_source.param.auto.offset.reset", "latest")
110+
.put("ingestion_source.all_active", true)
110111
.build(),
111112
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
112113
);
@@ -134,6 +135,7 @@ public void testKafkaIngestion_RewindByOffset() {
134135
.put("ingestion_source.param.topic", "test")
135136
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
136137
.put("ingestion_source.param.auto.offset.reset", "latest")
138+
.put("ingestion_source.all_active", true)
137139
.build(),
138140
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
139141
);

plugins/ingestion-kinesis/src/internalClusterTest/java/org/opensearch/plugin/kinesis/IngestFromKinesisIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public void testKinesisIngestion_RewindByOffset() throws InterruptedException {
127127
"ingestion_source.param.endpoint_override",
128128
localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS).toString()
129129
)
130+
.put("ingestion_source.all_active", true)
130131
.build(),
131132
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
132133
);

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,8 @@ public Iterator<Setting<?>> settings() {
909909

910910
/**
911911
* Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the
912-
* streaming source and process the updates. This mode is currently not supported along with segment replication.
912+
* streaming source and process the updates. In the default document replication mode, this setting must be enabled.
913+
* This mode is currently not supported with segment replication.
913914
*/
914915
public static final String SETTING_INGESTION_SOURCE_ALL_ACTIVE_INGESTION = "index.ingestion_source.all_active";
915916
public static final Setting<Boolean> INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING = Setting.boolSetting(
@@ -923,21 +924,33 @@ public void validate(final Boolean value) {}
923924
@Override
924925
public void validate(final Boolean value, final Map<Setting<?>, Object> settings) {
925926
final Object replicationType = settings.get(INDEX_REPLICATION_TYPE_SETTING);
926-
if (ReplicationType.SEGMENT.equals(replicationType) && value) {
927+
final Object ingestionSourceType = settings.get(INGESTION_SOURCE_TYPE_SETTING);
928+
boolean isPullBasedIngestionEnabled = NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType) == false;
929+
930+
if (isPullBasedIngestionEnabled && ReplicationType.SEGMENT.equals(replicationType) && value) {
927931
throw new IllegalArgumentException(
928-
"To enable "
929-
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
930-
+ ", "
931-
+ INDEX_REPLICATION_TYPE_SETTING.getKey()
932-
+ " should not be set to "
932+
"Replication type "
933933
+ ReplicationType.SEGMENT
934+
+ " is not supported in pull-based ingestion when "
935+
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
936+
+ " is enabled"
937+
);
938+
}
939+
940+
if (isPullBasedIngestionEnabled && ReplicationType.DOCUMENT.equals(replicationType) && value == false) {
941+
throw new IllegalArgumentException(
942+
"Replication type "
943+
+ ReplicationType.DOCUMENT
944+
+ " is not supported in pull-based ingestion when "
945+
+ INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.getKey()
946+
+ " is not enabled"
934947
);
935948
}
936949
}
937950

938951
@Override
939952
public Iterator<Setting<?>> settings() {
940-
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING);
953+
final List<Setting<?>> settings = List.of(INDEX_REPLICATION_TYPE_SETTING, INGESTION_SOURCE_TYPE_SETTING);
941954
return settings.iterator();
942955
}
943956
},
@@ -981,6 +994,7 @@ public Iterator<Setting<?>> settings() {
981994
public static final String TRANSLOG_METADATA_KEY = "translog_metadata";
982995
public static final String CONTEXT_KEY = "context";
983996
public static final String INGESTION_SOURCE_KEY = "ingestion_source";
997+
public static final String INGESTION_STATUS_KEY = "ingestion_status";
984998

985999
public static final String INDEX_STATE_FILE_PREFIX = "state-";
9861000

@@ -2304,6 +2318,13 @@ public static void toXContent(IndexMetadata indexMetadata, XContentBuilder build
23042318
indexMetadata.context.toXContent(builder, params);
23052319
}
23062320

2321+
if (indexMetadata.getCreationVersion().onOrAfter(Version.V_3_3_0) && indexMetadata.ingestionStatus != null) {
2322+
// ingestionStatus field is introduced from OS 3.x. But this field is included in XContent serialization only from OS 3.3
2323+
// onwards.
2324+
builder.field(INGESTION_STATUS_KEY);
2325+
indexMetadata.ingestionStatus.toXContent(builder, params);
2326+
}
2327+
23072328
builder.endObject();
23082329
}
23092330

@@ -2387,6 +2408,8 @@ public static IndexMetadata fromXContent(XContentParser parser) throws IOExcepti
23872408
parser.skipChildren();
23882409
} else if (CONTEXT_KEY.equals(currentFieldName)) {
23892410
builder.context(Context.fromXContent(parser));
2411+
} else if (INGESTION_STATUS_KEY.equals(currentFieldName)) {
2412+
builder.ingestionStatus(IngestionStatus.fromXContent(parser));
23902413
} else {
23912414
// assume it's custom index metadata
23922415
builder.putCustom(currentFieldName, parser.mapStrings());

server/src/main/java/org/opensearch/cluster/metadata/IngestionStatus.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,18 @@
1212
import org.opensearch.core.common.io.stream.StreamInput;
1313
import org.opensearch.core.common.io.stream.StreamOutput;
1414
import org.opensearch.core.common.io.stream.Writeable;
15+
import org.opensearch.core.xcontent.ToXContent;
16+
import org.opensearch.core.xcontent.XContentBuilder;
17+
import org.opensearch.core.xcontent.XContentParser;
1518

1619
import java.io.IOException;
1720

1821
/**
1922
* Indicates pull-based ingestion status.
2023
*/
2124
@ExperimentalApi
22-
public record IngestionStatus(boolean isPaused) implements Writeable {
25+
public record IngestionStatus(boolean isPaused) implements Writeable, ToXContent {
26+
public static final String IS_PAUSED = "is_paused";
2327

2428
public IngestionStatus(StreamInput in) throws IOException {
2529
this(in.readBoolean());
@@ -30,6 +34,37 @@ public void writeTo(StreamOutput out) throws IOException {
3034
out.writeBoolean(isPaused);
3135
}
3236

37+
@Override
38+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
39+
builder.startObject();
40+
builder.field(IS_PAUSED, isPaused);
41+
builder.endObject();
42+
return builder;
43+
}
44+
45+
public static IngestionStatus fromXContent(XContentParser parser) throws IOException {
46+
boolean isPaused = false;
47+
48+
XContentParser.Token token = parser.currentToken();
49+
if (token == null) {
50+
token = parser.nextToken();
51+
}
52+
53+
if (token == XContentParser.Token.START_OBJECT) {
54+
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
55+
if (token == XContentParser.Token.FIELD_NAME) {
56+
String fieldName = parser.currentName();
57+
if (IS_PAUSED.equals(fieldName)) {
58+
parser.nextToken();
59+
isPaused = parser.booleanValue();
60+
}
61+
}
62+
}
63+
}
64+
65+
return new IngestionStatus(isPaused);
66+
}
67+
3368
public static IngestionStatus getDefaultValue() {
3469
return new IngestionStatus(false);
3570
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ protected void startPoll() {
244244
// Currently we do not have a good way to skip past the failing messages.
245245
// The user will have the option to manually update the offset and resume ingestion.
246246
// todo: support retry?
247-
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {}: {}", shardId, e);
247+
logger.error("Pausing ingestion. Fatal error occurred in polling the shard {} for index {}: {}", shardId, indexName, e);
248248
totalConsumerErrorCount.inc();
249249
pause();
250250
}
@@ -276,7 +276,13 @@ private IngestionShardPointer processRecords(
276276
result.getPointer().asString()
277277
);
278278
} catch (Exception e) {
279-
logger.error("Error in processing a record. Shard {}, pointer {}: {}", shardId, result.getPointer().asString(), e);
279+
logger.error(
280+
"[Default Poller] Error processing record. Index={}, Shard={}, pointer={}: error={}",
281+
indexName,
282+
shardId,
283+
result.getPointer().asString(),
284+
e
285+
);
280286
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
281287
totalPollerMessageFailureCount.inc();
282288

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.opensearch.index.engine.IngestionEngine;
2727
import org.opensearch.index.engine.VersionConflictEngineException;
2828
import org.opensearch.index.mapper.IdFieldMapper;
29+
import org.opensearch.index.mapper.MapperParsingException;
2930
import org.opensearch.index.mapper.ParseContext;
3031
import org.opensearch.index.mapper.ParsedDocument;
3132
import org.opensearch.index.mapper.SourceToParse;
@@ -65,33 +66,50 @@ public class MessageProcessorRunnable implements Runnable, Closeable {
6566
private volatile boolean closed = false;
6667
private volatile IngestionErrorStrategy errorStrategy;
6768

69+
private final String indexName;
70+
private final int shardId;
71+
6872
/**
6973
* Constructor.
7074
*
7175
* @param blockingQueue the blocking queue to poll messages from
7276
* @param engine the ingestion engine
77+
* @param errorStrategy the error strategy/policy to use
7378
*/
7479
public MessageProcessorRunnable(
7580
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
7681
IngestionEngine engine,
7782
IngestionErrorStrategy errorStrategy
7883
) {
79-
this(blockingQueue, new MessageProcessor(engine), errorStrategy);
84+
this(
85+
blockingQueue,
86+
new MessageProcessor(engine),
87+
errorStrategy,
88+
engine.config().getShardId().getIndexName(),
89+
engine.config().getShardId().getId()
90+
);
8091
}
8192

8293
/**
8394
* Constructor visible for testing.
8495
* @param blockingQueue the blocking queue to poll messages from
8596
* @param messageProcessor the message processor
97+
* @param errorStrategy the error strategy/policy to use
98+
* @param indexName the index name
99+
* @param shardId the shard ID
86100
*/
87101
MessageProcessorRunnable(
88102
BlockingQueue<ShardUpdateMessage<? extends IngestionShardPointer, ? extends Message>> blockingQueue,
89103
MessageProcessor messageProcessor,
90-
IngestionErrorStrategy errorStrategy
104+
IngestionErrorStrategy errorStrategy,
105+
String indexName,
106+
int shardId
91107
) {
92108
this.blockingQueue = Objects.requireNonNull(blockingQueue);
93109
this.messageProcessor = messageProcessor;
94110
this.errorStrategy = errorStrategy;
111+
this.indexName = indexName;
112+
this.shardId = shardId;
95113
}
96114

97115
static class MessageProcessor {
@@ -309,9 +327,10 @@ public void run() {
309327
logger.debug("Dropping message due to version conflict. ShardPointer: " + shardUpdateMessage.pointer().asString(), e);
310328
shardUpdateMessage = null;
311329
} catch (Exception e) {
330+
logger.error("[Message Processor] Error processing message. Index={}, Shard={}, error={}", indexName, shardId, e);
312331
messageProcessorMetrics.failedMessageCounter.inc();
313332
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
314-
boolean retriesExhausted = retryCount >= MIN_RETRY_COUNT || e instanceof IllegalArgumentException;
333+
boolean retriesExhausted = hasExhaustedRetries(e, retryCount);
315334
if (retriesExhausted && errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
316335
logDroppedMessage(shardUpdateMessage);
317336
shardUpdateMessage = null;
@@ -336,6 +355,15 @@ private void waitBeforeRetry() {
336355
}
337356
}
338357

358+
private boolean hasExhaustedRetries(Exception e, int retryCount) {
359+
if (retryCount >= MIN_RETRY_COUNT) {
360+
return true;
361+
}
362+
363+
// Don't retry validation/parsing errors
364+
return e instanceof IllegalArgumentException || e instanceof MapperParsingException;
365+
}
366+
339367
public MessageProcessorMetrics getMessageProcessorMetrics() {
340368
return messageProcessorMetrics;
341369
}

server/src/test/java/org/opensearch/action/admin/cluster/reroute/ClusterRerouteResponseTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ public void testToXContent() throws IOException {
146146
+ " \"0\" : [ ]\n"
147147
+ " },\n"
148148
+ " \"rollover_info\" : { },\n"
149-
+ " \"system\" : false\n"
149+
+ " \"system\" : false,\n"
150+
+ " \"ingestion_status\" : {\n"
151+
+ " \"is_paused\" : false\n"
152+
+ " }\n"
150153
+ " }\n"
151154
+ " },\n"
152155
+ " \"index-graveyard\" : {\n"
@@ -252,7 +255,10 @@ public void testToXContent() throws IOException {
252255
+ " \"0\" : [ ]\n"
253256
+ " },\n"
254257
+ " \"rollover_info\" : { },\n"
255-
+ " \"system\" : false\n"
258+
+ " \"system\" : false,\n"
259+
+ " \"ingestion_status\" : {\n"
260+
+ " \"is_paused\" : false\n"
261+
+ " }\n"
256262
+ " }\n"
257263
+ " },\n"
258264
+ " \"index-graveyard\" : {\n"

server/src/test/java/org/opensearch/cluster/ClusterStateTests.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,10 @@ public void testToXContent() throws IOException {
275275
+ " \"time\" : 1\n"
276276
+ " }\n"
277277
+ " },\n"
278-
+ " \"system\" : false\n"
278+
+ " \"system\" : false,\n"
279+
+ " \"ingestion_status\" : {\n"
280+
+ " \"is_paused\" : false\n"
281+
+ " }\n"
279282
+ " }\n"
280283
+ " },\n"
281284
+ " \"index-graveyard\" : {\n"
@@ -477,7 +480,10 @@ public void testToXContent_FlatSettingTrue_ReduceMappingFalse() throws IOExcepti
477480
+ " \"time\" : 1\n"
478481
+ " }\n"
479482
+ " },\n"
480-
+ " \"system\" : false\n"
483+
+ " \"system\" : false,\n"
484+
+ " \"ingestion_status\" : {\n"
485+
+ " \"is_paused\" : false\n"
486+
+ " }\n"
481487
+ " }\n"
482488
+ " },\n"
483489
+ " \"index-graveyard\" : {\n"
@@ -686,7 +692,10 @@ public void testToXContent_FlatSettingFalse_ReduceMappingTrue() throws IOExcepti
686692
+ " \"time\" : 1\n"
687693
+ " }\n"
688694
+ " },\n"
689-
+ " \"system\" : false\n"
695+
+ " \"system\" : false,\n"
696+
+ " \"ingestion_status\" : {\n"
697+
+ " \"is_paused\" : false\n"
698+
+ " }\n"
690699
+ " }\n"
691700
+ " },\n"
692701
+ " \"index-graveyard\" : {\n"
@@ -835,7 +844,10 @@ public void testToXContentSameTypeName() throws IOException {
835844
+ " \"0\" : [ ]\n"
836845
+ " },\n"
837846
+ " \"rollover_info\" : { },\n"
838-
+ " \"system\" : false\n"
847+
+ " \"system\" : false,\n"
848+
+ " \"ingestion_status\" : {\n"
849+
+ " \"is_paused\" : false\n"
850+
+ " }\n"
839851
+ " }\n"
840852
+ " },\n"
841853
+ " \"index-graveyard\" : {\n"

0 commit comments

Comments
 (0)