Skip to content

Commit 0044a0d

Browse files
authored
Add metadata for document version to OpenSearch source (opensearch-project#6416)
Signed-off-by: Taylor Gray <tylgry@amazon.com>
1 parent d858776 commit 0044a0d

File tree

8 files changed

+84
-2
lines changed

8 files changed

+84
-2
lines changed

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/NoSearchContextWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
183183
}
184184
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
185185

186+
LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
187+
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
188+
186189
try {
187190
bufferAccumulator.flush();
188191
} catch (final Exception e) {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/PitWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
221221
}
222222
} while (searchWithSearchAfterResults.getDocuments().size() == searchConfiguration.getBatchSize());
223223

224+
LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
225+
searchWithSearchAfterResults.getDocuments().size(), searchConfiguration.getBatchSize());
226+
224227
try {
225228
bufferAccumulator.flush();
226229
} catch (final Exception e) {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/ScrollWorker.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
188188
} while (searchScrollResponse.getDocuments().size() == batchSize);
189189
}
190190

191+
LOG.info("Received {} documents in latest search request, and batch size is {}, exiting pagination",
192+
searchScrollResponse.getDocuments().size(), batchSize);
193+
191194
deleteScroll(createScrollResponse.getScrollId());
192195

193196
try {

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555

5656
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE;
5757
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
58+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;
5859
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;
5960

6061
public class ElasticsearchAccessor implements SearchAccessor, ClusterClientFactory<ElasticsearchClient> {
@@ -122,6 +123,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
122123
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
123124
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
124125
)
126+
.version(true)
125127
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
126128

127129
if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
@@ -161,6 +163,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
161163
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
162164
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
163165
.size(createScrollRequest.getSize())
166+
.version(true)
164167
.index(createScrollRequest.getIndex())), ObjectNode.class);
165168
} catch (final ElasticsearchException e) {
166169
if (isDueToNoIndexFound(e)) {
@@ -233,6 +236,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
233236
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
234237
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSortBuilder -> fieldSortBuilder.field("_id").order(SortOrder.Asc)))))
235238
)
239+
.version(true)
236240
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
237241

238242
if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
@@ -295,7 +299,10 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
295299
return searchResponse.hits().hits().stream()
296300
.map(hit -> JacksonEvent.builder()
297301
.withData(hit.source())
298-
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
302+
.withEventMetadataAttributes(
303+
Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(),
304+
INDEX_METADATA_ATTRIBUTE_NAME, hit.index(),
305+
DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version()))
299306
.withEventType(EventType.DOCUMENT.toString()).build())
300307
.collect(Collectors.toList());
301308
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchAccessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.stream.Collectors;
5656

5757
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
58+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;
5859
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;
5960

6061
public class OpenSearchAccessor implements SearchAccessor, ClusterClientFactory<OpenSearchClient> {
@@ -120,6 +121,7 @@ public SearchWithSearchAfterResults searchWithPit(final SearchPointInTimeRequest
120121
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
121122
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
122123
)
124+
.version(true)
123125
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
124126

125127
if (Objects.nonNull(searchPointInTimeRequest.getSearchAfter())) {
@@ -157,6 +159,7 @@ public CreateScrollResponse createScroll(final CreateScrollRequest createScrollR
157159
.scroll(Time.of(time -> time.time(createScrollRequest.getScrollTime())))
158160
.sort(SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))))
159161
.size(createScrollRequest.getSize())
162+
.version(true)
160163
.index(createScrollRequest.getIndex())), ObjectNode.class);
161164
} catch (final OpenSearchException e) {
162165
if (isDueToNoIndexFound(e)) {
@@ -227,6 +230,7 @@ public SearchWithSearchAfterResults searchWithoutSearchContext(final NoSearchCon
227230
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.doc(ScoreSort.of(scoreSort -> scoreSort.order(SortOrder.Asc)))),
228231
SortOptions.of(sortOptionsBuilder -> sortOptionsBuilder.field(FieldSort.of(fieldSort -> fieldSort.field("_id").order(SortOrder.Asc)))))
229232
)
233+
.version(true)
230234
.query(Query.of(query -> query.matchAll(MatchAllQuery.of(matchAllQuery -> matchAllQuery))));
231235

232236
if (Objects.nonNull(noSearchContextSearchRequest.getSearchAfter())) {
@@ -294,7 +298,10 @@ private List<Event> getDocumentsFromResponse(final SearchResponse<ObjectNode> se
294298
return searchResponse.hits().hits().stream()
295299
.map(hit -> JacksonEvent.builder()
296300
.withData(hit.source())
297-
.withEventMetadataAttributes(Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(), INDEX_METADATA_ATTRIBUTE_NAME, hit.index()))
301+
.withEventMetadataAttributes(
302+
Map.of(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME, hit.id(),
303+
INDEX_METADATA_ATTRIBUTE_NAME, hit.index(),
304+
DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME, hit.version()))
298305
.withEventType(EventType.DOCUMENT.toString()).build())
299306
.collect(Collectors.toList());
300307
}

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/model/MetadataKeyAttributes.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88
public class MetadataKeyAttributes {
99
public static final String DOCUMENT_ID_METADATA_ATTRIBUTE_NAME = "opensearch-document_id";
1010
public static final String INDEX_METADATA_ATTRIBUTE_NAME = "opensearch-index";
11+
public static final String DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME = "opensearch_document_version";
1112
}

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/ElasticsearchAccessorTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.INDEX_NOT_FOUND_EXCEPTION;
6868
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.PIT_RESOURCE_LIMIT_ERROR_TYPE;
6969
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchAccessor.SCROLL_RESOURCE_LIMIT_EXCEPTION_MESSAGE;
70+
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME;
7071

7172
@ExtendWith(MockitoExtension.class)
7273
public class ElasticsearchAccessorTest {
@@ -127,11 +128,13 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException
127128
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
128129
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
129130
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
131+
when(firstHit.version()).thenReturn(1L);
130132

131133
final Hit<ObjectNode> secondHit = mock(Hit.class);
132134
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
133135
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
134136
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
137+
when(secondHit.version()).thenReturn(2L);
135138

136139
hits.add(firstHit);
137140
hits.add(secondHit);
@@ -148,6 +151,14 @@ void create_scroll_returns_expected_create_scroll_response() throws IOException
148151
assertThat(createScrollResponse.getScrollId(), equalTo(scrollId));
149152
assertThat(createScrollResponse.getDocuments(), notNullValue());
150153
assertThat(createScrollResponse.getDocuments().size(), equalTo(2));
154+
assertThat(createScrollResponse.getDocuments().get(0), notNullValue());
155+
assertThat(createScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
156+
assertThat(createScrollResponse.getDocuments().get(1), notNullValue());
157+
assertThat(createScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));
158+
159+
final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
160+
assertThat(searchRequest, notNullValue());
161+
assertThat(searchRequest.version(), equalTo(true));
151162
}
152163

153164
@Test
@@ -452,12 +463,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha
452463
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
453464
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
454465
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
466+
when(firstHit.version()).thenReturn(1L);
455467

456468
final Hit<ObjectNode> secondHit = mock(Hit.class);
457469
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
458470
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
459471
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
460472
when(secondHit.sort()).thenReturn(searchAfter);
473+
when(secondHit.version()).thenReturn(2L);
461474

462475
hits.add(firstHit);
463476
hits.add(secondHit);
@@ -476,6 +489,14 @@ void search_with_pit_returns_expected_SearchPointInTimeResponse(final boolean ha
476489
assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2));
477490

478491
assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort()));
492+
assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue());
493+
assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
494+
assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue());
495+
assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));
496+
497+
final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
498+
assertThat(searchRequest, notNullValue());
499+
assertThat(searchRequest.version(), equalTo(true));
479500
}
480501

481502
@ParameterizedTest
@@ -502,12 +523,14 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi
502523
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
503524
when(firstHit.index()).thenReturn(index);
504525
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
526+
when(firstHit.version()).thenReturn(1L);
505527

506528
final Hit<ObjectNode> secondHit = mock(Hit.class);
507529
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
508530
when(secondHit.index()).thenReturn(index);
509531
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
510532
when(secondHit.sort()).thenReturn(searchAfter);
533+
when(secondHit.version()).thenReturn(2L);
511534

512535
hits.add(firstHit);
513536
hits.add(secondHit);
@@ -524,8 +547,16 @@ void search_without_search_context_returns_expected_SearchPointInTimeResponse(fi
524547
assertThat(searchWithSearchAfterResults, notNullValue());
525548
assertThat(searchWithSearchAfterResults.getDocuments(), notNullValue());
526549
assertThat(searchWithSearchAfterResults.getDocuments().size(), equalTo(2));
550+
assertThat(searchWithSearchAfterResults.getDocuments().get(0), notNullValue());
551+
assertThat(searchWithSearchAfterResults.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
552+
assertThat(searchWithSearchAfterResults.getDocuments().get(1), notNullValue());
553+
assertThat(searchWithSearchAfterResults.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));
527554

528555
assertThat(searchWithSearchAfterResults.getNextSearchAfter(), equalTo(secondHit.sort()));
556+
557+
final SearchRequest searchRequest = searchRequestArgumentCaptor.getValue();
558+
assertThat(searchRequest, notNullValue());
559+
assertThat(searchRequest.version(), equalTo(true));
529560
}
530561

531562
@Test
@@ -544,11 +575,13 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti
544575
when(firstHit.id()).thenReturn(UUID.randomUUID().toString());
545576
when(firstHit.index()).thenReturn(UUID.randomUUID().toString());
546577
when(firstHit.source()).thenReturn(mock(ObjectNode.class));
578+
when(firstHit.version()).thenReturn(1L);
547579

548580
final Hit<ObjectNode> secondHit = mock(Hit.class);
549581
when(secondHit.id()).thenReturn(UUID.randomUUID().toString());
550582
when(secondHit.index()).thenReturn(UUID.randomUUID().toString());
551583
when(secondHit.source()).thenReturn(mock(ObjectNode.class));
584+
when(secondHit.version()).thenReturn(2L);
552585

553586
hits.add(firstHit);
554587
hits.add(secondHit);
@@ -567,5 +600,9 @@ void search_with_scroll_returns_expected_SearchScrollResponse() throws IOExcepti
567600
assertThat(searchScrollResponse.getDocuments(), notNullValue());
568601
assertThat(searchScrollResponse.getDocuments().size(), equalTo(2));
569602
assertThat(searchScrollResponse.getScrollId(), equalTo(scrollId));
603+
assertThat(searchScrollResponse.getDocuments().get(0), notNullValue());
604+
assertThat(searchScrollResponse.getDocuments().get(0).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(1L));
605+
assertThat(searchScrollResponse.getDocuments().get(1), notNullValue());
606+
assertThat(searchScrollResponse.getDocuments().get(1).getMetadata().getAttribute(DOCUMENT_VERSION_METADATA_ATTRIBUTE_NAME), equalTo(2L));
570607
}
571608
}

0 commit comments

Comments
 (0)