Skip to content

Commit dc8d029

Browse files
committed
Add some fixes
1 parent 82ffb04 commit dc8d029

File tree

2 files changed

+20
-15
lines changed

2 files changed

+20
-15
lines changed

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine
5555

5656
private static final Logger logger = LogManager.getLogger(CompositeIndexingExecutionEngine.class);
5757

58-
private final IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>> primaryEngine;
59-
private final List<IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>>> secondaryEngines;
58+
private final IndexingExecutionEngine<?, ?> primaryEngine;
59+
private final List<IndexingExecutionEngine<?, ?>> secondaryEngines;
6060
private final CompositeDataFormat compositeDataFormat;
6161
private final CompositeDataFormatWriterPool dataFormatWriterPool;
6262
private final AtomicLong writerGenerationCounter;
@@ -278,7 +278,7 @@ public CompositeDocumentInput newDocumentInput() {
278278
*
279279
* @return the secondary engines
280280
*/
281-
public List<IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>>> getSecondaryDelegates() {
281+
public List<IndexingExecutionEngine<?, ?>> getSecondaryDelegates() {
282282
return secondaryEngines;
283283
}
284284

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ public class CompositeWriter implements Writer<CompositeDocumentInput>, Lock {
4646

4747
private static final Logger logger = LogManager.getLogger(CompositeWriter.class);
4848

49-
private final Map.Entry<DataFormat, Writer<? extends DocumentInput<?>>> primaryWriter;
50-
private final List<Map.Entry<DataFormat, Writer<? extends DocumentInput<?>>>> secondaryWriters;
49+
private final Map.Entry<DataFormat, Writer<DocumentInput<?>>> primaryWriter;
50+
private final List<Map.Entry<DataFormat, Writer<DocumentInput<?>>>> secondaryWriters;
5151
private final ReentrantLock lock;
5252
private final long writerGeneration;
5353
private final RowIdGenerator rowIdGenerator;
@@ -64,6 +64,7 @@ public class CompositeWriter implements Writer<CompositeDocumentInput>, Lock {
6464
* @param engine the composite indexing execution engine
6565
* @param writerGeneration the writer generation number
6666
*/
67+
@SuppressWarnings("unchecked")
6768
public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGeneration) {
6869
this.lock = new ReentrantLock();
6970
this.aborted = false;
@@ -73,22 +74,26 @@ public CompositeWriter(CompositeIndexingExecutionEngine engine, long writerGener
7374
IndexingExecutionEngine<?, ?> primaryDelegate = engine.getPrimaryDelegate();
7475
this.primaryWriter = new AbstractMap.SimpleImmutableEntry<>(
7576
primaryDelegate.getDataFormat(),
76-
primaryDelegate.createWriter(writerGeneration)
77+
(Writer<DocumentInput<?>>) primaryDelegate.createWriter(writerGeneration)
7778
);
7879

79-
List<Map.Entry<DataFormat, Writer<?>>> secondaries = new ArrayList<>();
80+
List<Map.Entry<DataFormat, Writer<DocumentInput<?>>>> secondaries = new ArrayList<>();
8081
for (IndexingExecutionEngine<?, ?> delegate : engine.getSecondaryDelegates()) {
81-
secondaries.add(new AbstractMap.SimpleImmutableEntry<>(delegate.getDataFormat(), delegate.createWriter(writerGeneration)));
82+
secondaries.add(
83+
new AbstractMap.SimpleImmutableEntry<>(
84+
delegate.getDataFormat(),
85+
(Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration)
86+
)
87+
);
8288
}
8389
this.secondaryWriters = List.copyOf(secondaries);
8490
this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
8591
}
8692

87-
@SuppressWarnings("unchecked")
8893
@Override
8994
public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
9095
// Write to primary first
91-
WriteResult primaryResult = ((Writer<DocumentInput<?>>) primaryWriter.getValue()).addDoc(doc.getPrimaryInput());
96+
WriteResult primaryResult = primaryWriter.getValue().addDoc(doc.getPrimaryInput());
9297
switch (primaryResult) {
9398
case WriteResult.Success s -> logger.trace(
9499
"Successfully added document in primary format [{}]",
@@ -103,9 +108,9 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
103108
// Then write to each secondary
104109
List<DocumentInput<?>> secondaryInputs = new ArrayList<>(doc.getSecondaryInputs().values());
105110
for (int i = 0; i < secondaryWriters.size(); i++) {
106-
Map.Entry<DataFormat, Writer<?>> entry = secondaryWriters.get(i);
111+
Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry = secondaryWriters.get(i);
107112
DocumentInput<?> input = secondaryInputs.get(i);
108-
WriteResult result = ((Writer<DocumentInput<?>>) entry.getValue()).addDoc(input);
113+
WriteResult result = entry.getValue().addDoc(input);
109114
switch (result) {
110115
case WriteResult.Success s -> logger.trace(
111116
"Successfully added document in secondary format [{}]",
@@ -128,7 +133,7 @@ public FileInfos flush() throws IOException {
128133
Optional<WriterFileSet> primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
129134
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
130135
// Flush secondaries
131-
for (Map.Entry<DataFormat, Writer<?>> entry : secondaryWriters) {
136+
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters) {
132137
Optional<WriterFileSet> wfs = entry.getValue().flush().getWriterFileSet(entry.getKey());
133138
wfs.ifPresent(writerFileSet -> builder.putWriterFileSet(entry.getKey(), writerFileSet));
134139
}
@@ -138,7 +143,7 @@ public FileInfos flush() throws IOException {
138143
@Override
139144
public void sync() throws IOException {
140145
primaryWriter.getValue().sync();
141-
for (Map.Entry<DataFormat, Writer<?>> entry : secondaryWriters) {
146+
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters) {
142147
entry.getValue().sync();
143148
}
144149
}
@@ -150,7 +155,7 @@ public void close() {
150155
} catch (Exception e) {
151156
logger.warn("Failed to close primary Writer for format [{}]", primaryWriter.getKey().name(), e);
152157
}
153-
for (Map.Entry<DataFormat, Writer<?>> entry : secondaryWriters) {
158+
for (Map.Entry<DataFormat, Writer<DocumentInput<?>>> entry : secondaryWriters) {
154159
try {
155160
entry.getValue().close();
156161
} catch (Exception e) {

0 commit comments

Comments
 (0)