Skip to content

Commit 23eb0d9

Browse files
authored
Introduce support for Opensearch data streams (#6249)
Add OpenSearch Data Stream support with automatic action selection Signed-off-by: Jonah Calvo <[email protected]>
1 parent ca1552e commit 23eb0d9

File tree

9 files changed

+616
-4
lines changed

9 files changed

+616
-4
lines changed

data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1637,6 +1637,80 @@ public void testOpenSearchIndexWithInvalidChars() throws IOException, Interrupte
16371637
Assert.assertThrows(RuntimeException.class, () -> sink.doInitialize());
16381638
}
16391639

1640+
@Test
1641+
@DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+")
1642+
public void testDataStreamDetection() throws IOException, InterruptedException {
1643+
final String dataStreamName = "test-data-stream-" + UUID.randomUUID();
1644+
final String templateName = dataStreamName + "-template";
1645+
final File tempDirectory = Files.createTempDirectory("").toFile();
1646+
final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt";
1647+
1648+
try {
1649+
// Create an index template for the data stream first
1650+
final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName);
1651+
final String templateBody = "{" +
1652+
"\"index_patterns\": [\"" + dataStreamName + "\"]," +
1653+
"\"data_stream\": {}," +
1654+
"\"template\": {" +
1655+
"\"mappings\": {" +
1656+
"\"properties\": {" +
1657+
"\"@timestamp\": {\"type\": \"date\"}" +
1658+
"}" +
1659+
"}" +
1660+
"}" +
1661+
"}";
1662+
createTemplateRequest.setJsonEntity(templateBody);
1663+
client.performRequest(createTemplateRequest);
1664+
1665+
// Create a data stream
1666+
final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName);
1667+
client.performRequest(createDataStreamRequest);
1668+
1669+
// Initialize sink AFTER creating the data stream so detection works
1670+
Map<String, Object> metadata = initializeConfigurationMetadata(null, dataStreamName, null);
1671+
metadata.put(RetryConfiguration.DLQ_FILE, dlqFile);
1672+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
1673+
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
1674+
1675+
// Test that the data stream is detected
1676+
final String testIdField = "someId";
1677+
final String testId = "foo";
1678+
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
1679+
1680+
sink.output(testRecords);
1681+
sink.shutdown();
1682+
1683+
// Wait for indexing to complete
1684+
Thread.sleep(2000);
1685+
1686+
// Verify the document was written to the data stream
1687+
final List<Map<String, Object>> retSources = getSearchResponseDocSources(dataStreamName);
1688+
assertThat("Expected 1 document in data stream " + dataStreamName + " but found " + retSources.size(),
1689+
retSources.size(), equalTo(1));
1690+
} catch (Exception e) {
1691+
throw e;
1692+
} finally {
1693+
// Clean up the data stream
1694+
final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName);
1695+
try {
1696+
client.performRequest(deleteDataStreamRequest);
1697+
} catch (IOException e) {
1698+
// Ignore cleanup errors
1699+
}
1700+
1701+
// Clean up the index template
1702+
final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName);
1703+
try {
1704+
client.performRequest(deleteTemplateRequest);
1705+
} catch (IOException e) {
1706+
// Ignore cleanup errors
1707+
}
1708+
1709+
// Clean up DLQ
1710+
FileUtils.deleteQuietly(tempDirectory);
1711+
}
1712+
}
1713+
16401714
@Test
16411715
@Timeout(value = 1, unit = TimeUnit.MINUTES)
16421716
@DisabledIf(value = "isES6",
@@ -1962,6 +2036,96 @@ private static boolean isES6() {
19622036
return DeclaredOpenSearchVersion.OPENDISTRO_0_10.compareTo(OpenSearchIntegrationHelper.getVersion()) >= 0;
19632037
}
19642038

2039+
private static boolean isDataStreamNotSupported() {
2040+
// Data streams require OpenSearch 1.3.0+
2041+
return OpenSearchIntegrationHelper.getVersion().compareTo(DeclaredOpenSearchVersion.parse("opensearch:1.3.0")) < 0;
2042+
}
2043+
2044+
@Test
2045+
@DisabledIf(value = "isDataStreamNotSupported", disabledReason = "Data streams require OpenSearch 1.3.0+")
2046+
public void testDataStreamFirstWriteWinsBehavior() throws IOException, InterruptedException {
2047+
final String dataStreamName = "test-first-write-wins-" + UUID.randomUUID();
2048+
final String templateName = dataStreamName + "-template";
2049+
final File tempDirectory = Files.createTempDirectory("").toFile();
2050+
final String dlqFile = tempDirectory.getAbsolutePath() + "/test-dlq.txt";
2051+
2052+
try {
2053+
// Create an index template for the data stream
2054+
final Request createTemplateRequest = new Request(HttpMethod.PUT, "/_index_template/" + templateName);
2055+
final String templateBody = "{" +
2056+
"\"index_patterns\": [\"" + dataStreamName + "\"]," +
2057+
"\"data_stream\": {}," +
2058+
"\"template\": {" +
2059+
"\"mappings\": {" +
2060+
"\"properties\": {" +
2061+
"\"@timestamp\": {\"type\": \"date\"}," +
2062+
"\"value\": {\"type\": \"keyword\"}" +
2063+
"}" +
2064+
"}" +
2065+
"}" +
2066+
"}";
2067+
createTemplateRequest.setJsonEntity(templateBody);
2068+
client.performRequest(createTemplateRequest);
2069+
2070+
// Create the data stream
2071+
final Request createDataStreamRequest = new Request(HttpMethod.PUT, "/_data_stream/" + dataStreamName);
2072+
client.performRequest(createDataStreamRequest);
2073+
2074+
// Initialize sink with document_id configuration
2075+
final String testIdField = "someId";
2076+
final String testId = "duplicate-id";
2077+
Map<String, Object> metadata = initializeConfigurationMetadata(null, dataStreamName, null);
2078+
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testIdField);
2079+
metadata.put(RetryConfiguration.DLQ_FILE, dlqFile);
2080+
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
2081+
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
2082+
2083+
// Write first document with value "first"
2084+
final String firstDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"first\"}";
2085+
final List<Record<Event>> firstRecords = Collections.singletonList(jsonStringToRecord(firstDoc));
2086+
sink.output(firstRecords);
2087+
2088+
// Wait for indexing
2089+
Thread.sleep(1000);
2090+
2091+
// Write second document with same ID but value "second"
2092+
final String secondDoc = "{\"" + testIdField + "\": \"" + testId + "\", \"value\": \"second\"}";
2093+
final List<Record<Event>> secondRecords = Collections.singletonList(jsonStringToRecord(secondDoc));
2094+
sink.output(secondRecords);
2095+
2096+
sink.shutdown();
2097+
2098+
// Wait for indexing to complete
2099+
Thread.sleep(2000);
2100+
2101+
// Verify only one document exists
2102+
final List<Map<String, Object>> retSources = getSearchResponseDocSources(dataStreamName);
2103+
assertThat("Expected exactly 1 document due to first-write-wins", retSources.size(), equalTo(1));
2104+
2105+
// Verify the document has the FIRST value (first-write-wins)
2106+
final Map<String, Object> document = retSources.get(0);
2107+
assertThat("Expected first write to win", document.get("value"), equalTo("first"));
2108+
2109+
} finally {
2110+
// Clean up
2111+
final Request deleteDataStreamRequest = new Request(HttpMethod.DELETE, "/_data_stream/" + dataStreamName);
2112+
try {
2113+
client.performRequest(deleteDataStreamRequest);
2114+
} catch (IOException e) {
2115+
// Ignore cleanup errors
2116+
}
2117+
2118+
final Request deleteTemplateRequest = new Request(HttpMethod.DELETE, "/_index_template/" + templateName);
2119+
try {
2120+
client.performRequest(deleteTemplateRequest);
2121+
} catch (IOException e) {
2122+
// Ignore cleanup errors
2123+
}
2124+
2125+
FileUtils.deleteQuietly(tempDirectory);
2126+
}
2127+
}
2128+
19652129
private static Stream<Object> getAttributeTestSpecialAndExtremeValues() {
19662130
return Stream.of(
19672131
null,

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@
6161
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedBulkOperationConverter;
6262
import org.opensearch.dataprepper.plugins.sink.opensearch.dlq.FailedDlqData;
6363
import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser;
64+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamDetector;
65+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DataStreamIndex;
66+
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexCache;
6467
import org.opensearch.dataprepper.plugins.sink.opensearch.index.DocumentBuilder;
6568
import org.opensearch.dataprepper.plugins.sink.opensearch.index.ExistingDocumentQueryManager;
6669
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager;
@@ -150,6 +153,9 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
150153
private final ExpressionEvaluator expressionEvaluator;
151154

152155
private FailedBulkOperationConverter failedBulkOperationConverter;
156+
private DataStreamDetector dataStreamDetector;
157+
private DataStreamIndex dataStreamIndex;
158+
IndexCache indexCache;
153159

154160
private DlqProvider dlqProvider;
155161
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
@@ -306,6 +312,10 @@ private void doInitializeInternal() throws IOException {
306312
queryExecutorService.submit(existingDocumentQueryManager);
307313
}
308314

315+
this.indexCache = new IndexCache();
316+
this.dataStreamDetector = new DataStreamDetector(openSearchClient, indexCache);
317+
this.dataStreamIndex = new DataStreamIndex(dataStreamDetector, openSearchSinkConfig.getIndexConfiguration());
318+
309319
this.initialized = true;
310320
LOG.info("Initialized OpenSearch sink");
311321
}
@@ -436,7 +446,6 @@ public void doOutput(final Collection<Record<Event>> records) {
436446

437447
for (final Record<Event> record : records) {
438448
final Event event = record.getData();
439-
final SerializedJson document = getDocument(event);
440449
String indexName = configuredIndexAlias;
441450
try {
442451
indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator));
@@ -446,6 +455,9 @@ public void doOutput(final Collection<Record<Event>> records) {
446455
logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e);
447456
continue;
448457
}
458+
459+
dataStreamIndex.ensureTimestamp(event, indexName);
460+
final SerializedJson document = getDocument(event);
449461

450462
Long version = null;
451463
String versionExpressionEvaluationResult = null;
@@ -483,6 +495,10 @@ public void doOutput(final Collection<Record<Event>> records) {
483495
if (eventAction.contains("${")) {
484496
eventAction = event.formatString(eventAction, expressionEvaluator);
485497
}
498+
499+
if (dataStreamDetector.isDataStream(indexName)) {
500+
eventAction = dataStreamIndex.determineAction(eventAction, indexName);
501+
}
486502
if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) {
487503
LOG.error("Unknown action {}, skipping the event", eventAction);
488504
invalidActionErrorsCounter.increment();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
12+
13+
import org.opensearch.client.opensearch.OpenSearchClient;
14+
import org.opensearch.client.opensearch.indices.GetDataStreamRequest;
15+
import org.opensearch.client.opensearch.indices.GetDataStreamResponse;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
19+
import java.io.IOException;
20+
21+
/**
22+
* Utility class to detect if an index name refers to a Data Stream
23+
*/
24+
public class DataStreamDetector {
25+
private static final Logger LOG = LoggerFactory.getLogger(DataStreamDetector.class);
26+
27+
private final OpenSearchClient openSearchClient;
28+
private final IndexCache indexCache;
29+
30+
public DataStreamDetector(final OpenSearchClient openSearchClient, final IndexCache indexCache) {
31+
this.openSearchClient = openSearchClient;
32+
this.indexCache = indexCache;
33+
}
34+
35+
/**
36+
* Determines if the given index name refers to a Data Stream
37+
* @param indexName the index name to check
38+
* @return true if it's a Data Stream, false otherwise
39+
*/
40+
public boolean isDataStream(final String indexName) {
41+
final Boolean cached = indexCache.getDataStreamResult(indexName);
42+
if (cached != null) {
43+
return cached;
44+
}
45+
46+
final boolean result = checkDataStream(indexName);
47+
indexCache.putDataStreamResult(indexName, result);
48+
return result;
49+
}
50+
51+
private boolean checkDataStream(final String indexName) {
52+
try {
53+
final GetDataStreamRequest request = GetDataStreamRequest.of(r -> r.name(indexName));
54+
final GetDataStreamResponse response = openSearchClient.indices().getDataStream(request);
55+
56+
// If we get a response without exception, it's a data stream
57+
return response.dataStreams() != null && !response.dataStreams().isEmpty();
58+
59+
} catch (final IOException e) {
60+
// If we get a 404 or similar, it's not a data stream
61+
LOG.debug("Index '{}' is not a Data Stream: {}", indexName, e.getMessage());
62+
return false;
63+
} catch (final Exception e) {
64+
LOG.debug("Data Stream detection not supported or failed for index '{}': {}", indexName, e.getMessage());
65+
return false;
66+
}
67+
}
68+
69+
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.sink.opensearch.index;
7+
8+
import org.opensearch.dataprepper.model.event.Event;
9+
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
14+
public class DataStreamIndex {
15+
private static final Logger LOG = LoggerFactory.getLogger(DataStreamIndex.class);
16+
private static final String TIMESTAMP_FIELD = "@timestamp";
17+
18+
private final DataStreamDetector dataStreamDetector;
19+
private final IndexConfiguration indexConfiguration;
20+
21+
public DataStreamIndex(final DataStreamDetector dataStreamDetector, final IndexConfiguration indexConfiguration) {
22+
this.dataStreamDetector = dataStreamDetector;
23+
this.indexConfiguration = indexConfiguration;
24+
}
25+
26+
27+
public String determineAction(final String configuredAction, final String indexName) {
28+
if (dataStreamDetector.isDataStream(indexName)) {
29+
validateConfigurationForDataStream(indexName);
30+
31+
// Only warn if user explicitly configured a non-create action (excluding the default "index" action)
32+
if (configuredAction != null &&
33+
!configuredAction.equals(OpenSearchBulkActions.CREATE.toString()) &&
34+
!configuredAction.equals(OpenSearchBulkActions.INDEX.toString())) {
35+
LOG.warn("Data Stream '{}' requires 'create' action, but '{}' was configured. Using 'create' action.",
36+
indexName, configuredAction);
37+
}
38+
return OpenSearchBulkActions.CREATE.toString();
39+
}
40+
return configuredAction != null ? configuredAction : OpenSearchBulkActions.INDEX.toString();
41+
}
42+
43+
44+
public void ensureTimestamp(final Event event, final String indexName) {
45+
if (dataStreamDetector.isDataStream(indexName) && !event.containsKey(TIMESTAMP_FIELD)) {
46+
event.put(TIMESTAMP_FIELD, event.getEventHandle().getInternalOriginationTime().toEpochMilli());
47+
}
48+
}
49+
50+
private void validateConfigurationForDataStream(final String indexName) {
51+
if (indexConfiguration.getDocumentIdField() != null || indexConfiguration.getDocumentId() != null) {
52+
LOG.warn("Data Stream '{}' with document ID configuration uses first-write-wins behavior. Subsequent writes to the same ID will be ignored.", indexName);
53+
}
54+
if (indexConfiguration.getRoutingField() != null || indexConfiguration.getRouting() != null) {
55+
LOG.warn("Data Stream '{}' does not support routing. Routing configuration will be ignored.", indexName);
56+
}
57+
}
58+
59+
60+
}

0 commit comments

Comments
 (0)