Skip to content

Commit 9ed54af

Browse files
committed
More refactoring
1 parent b3f4e8a commit 9ed54af

File tree

6 files changed

+317
-123
lines changed

6 files changed

+317
-123
lines changed

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDataFormatWriterPool.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,15 @@
2323
import java.util.Set;
2424
import java.util.function.Supplier;
2525

26-
public class CompositeWriterPool implements Iterable<CompositeWriter>, Closeable {
26+
public class CompositeDataFormatWriterPool implements Iterable<CompositeWriter>, Closeable {
2727

2828
private final Set<CompositeWriter> writers;
2929
private final LockableConcurrentQueue<CompositeWriter> availableWriters;
3030
private final Supplier<CompositeWriter> writerSupplier;
3131
private volatile boolean closed;
3232

33-
public CompositeWriterPool(
33+
34+
public CompositeDataFormatWriterPool(
3435
Supplier<CompositeWriter> writerSupplier,
3536
Supplier<Queue<CompositeWriter>> queueSupplier,
3637
int concurrency
@@ -79,6 +80,7 @@ public void releaseAndUnlock(CompositeWriter state) {
7980
availableWriters.addAndUnlock(state);
8081
}
8182

83+
8284
/**
8385
* Lock and checkout all CompositeWriters from the pool for flush.
8486
*

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import org.opensearch.index.engine.dataformat.FieldTypeCapabilities;
1717
import org.opensearch.index.mapper.MappedFieldType;
1818

19+
import java.util.ArrayList;
1920
import java.util.Collections;
2021
import java.util.HashMap;
2122
import java.util.HashSet;
23+
import java.util.List;
2224
import java.util.Map;
2325
import java.util.Objects;
2426
import java.util.Set;
@@ -36,13 +38,13 @@
3638
* @opensearch.experimental
3739
*/
3840
@ExperimentalApi
39-
public class CompositeDocumentInput implements DocumentInput<Map<DataFormat, Object>> {
41+
public class CompositeDocumentInput implements DocumentInput<List<? extends DocumentInput<?>>> {
4042

4143
private static final Logger logger = LogManager.getLogger(CompositeDocumentInput.class);
4244

43-
private final DocumentInput<?> primaryInput;
45+
private final DocumentInput<?> primaryDocumentInput;
4446
private final DataFormat primaryFormat;
45-
private final Map<DataFormat, DocumentInput<?>> secondaryInputs;
47+
private final Map<DataFormat, DocumentInput<?>> secondaryDocumentInputs;
4648
private final Map<String, Set<DataFormat>> fieldTypeToFormats;
4749

4850
/**
@@ -53,21 +55,23 @@ public class CompositeDocumentInput implements DocumentInput<Map<DataFormat, Obj
5355
* {@link DataFormat#supportedFields()}.
5456
*
5557
* @param primaryFormat the primary data format
56-
* @param primaryInput the document input for the primary format
57-
* @param secondaryInputs a map of secondary data formats to their corresponding document inputs
58+
* @param primaryDocumentInput the document input for the primary format
59+
* @param secondaryDocumentInputs a map of secondary data formats to their corresponding document inputs
5860
*/
5961
public CompositeDocumentInput(
6062
DataFormat primaryFormat,
61-
DocumentInput<?> primaryInput,
62-
Map<DataFormat, DocumentInput<?>> secondaryInputs
63+
DocumentInput<?> primaryDocumentInput,
64+
Map<DataFormat, DocumentInput<?>> secondaryDocumentInputs
6365
) {
6466
this.primaryFormat = Objects.requireNonNull(primaryFormat, "primaryFormat must not be null");
65-
this.primaryInput = Objects.requireNonNull(primaryInput, "primaryInput must not be null");
66-
this.secondaryInputs = Map.copyOf(Objects.requireNonNull(secondaryInputs, "secondaryInputs must not be null"));
67+
this.primaryDocumentInput = Objects.requireNonNull(primaryDocumentInput, "primaryDocumentInput must not be null");
68+
this.secondaryDocumentInputs = Map.copyOf(
69+
Objects.requireNonNull(secondaryDocumentInputs, "secondaryDocumentInputs must not be null")
70+
);
6771

6872
Map<String, Set<DataFormat>> routing = new HashMap<>();
6973
addFormatToRouting(routing, primaryFormat);
70-
for (DataFormat format : this.secondaryInputs.keySet()) {
74+
for (DataFormat format : this.secondaryDocumentInputs.keySet()) {
7175
addFormatToRouting(routing, format);
7276
}
7377
this.fieldTypeToFormats = Collections.unmodifiableMap(routing);
@@ -88,9 +92,9 @@ public void addField(MappedFieldType fieldType, Object value) {
8892
return;
8993
}
9094
if (formats.contains(primaryFormat)) {
91-
primaryInput.addField(fieldType, value);
95+
primaryDocumentInput.addField(fieldType, value);
9296
}
93-
for (Map.Entry<DataFormat, DocumentInput<?>> entry : secondaryInputs.entrySet()) {
97+
for (Map.Entry<DataFormat, DocumentInput<?>> entry : secondaryDocumentInputs.entrySet()) {
9498
if (formats.contains(entry.getKey())) {
9599
entry.getValue().addField(fieldType, value);
96100
}
@@ -99,30 +103,25 @@ public void addField(MappedFieldType fieldType, Object value) {
99103

100104
@Override
101105
public void setRowId(String rowIdFieldName, long rowId) {
102-
primaryInput.setRowId(rowIdFieldName, rowId);
103-
for (DocumentInput<?> input : secondaryInputs.values()) {
106+
primaryDocumentInput.setRowId(rowIdFieldName, rowId);
107+
for (DocumentInput<?> input : secondaryDocumentInputs.values()) {
104108
input.setRowId(rowIdFieldName, rowId);
105109
}
106110
}
107111

108112
@Override
109-
public Map<DataFormat, Object> getFinalInput() {
110-
Map<DataFormat, Object> result = new HashMap<>();
111-
result.put(primaryFormat, primaryInput.getFinalInput());
112-
for (Map.Entry<DataFormat, DocumentInput<?>> entry : secondaryInputs.entrySet()) {
113-
result.put(entry.getKey(), entry.getValue().getFinalInput());
114-
}
115-
return Collections.unmodifiableMap(result);
113+
public List<? extends DocumentInput<?>> getFinalInput() {
114+
return null;
116115
}
117116

118117
@Override
119118
public void close() {
120119
try {
121-
primaryInput.close();
120+
primaryDocumentInput.close();
122121
} catch (Exception e) {
123122
logger.warn("Failed to close primary DocumentInput", e);
124123
}
125-
for (DocumentInput<?> input : secondaryInputs.values()) {
124+
for (DocumentInput<?> input : secondaryDocumentInputs.values()) {
126125
try {
127126
input.close();
128127
} catch (Exception e) {
@@ -137,7 +136,7 @@ public void close() {
137136
* @return the primary document input
138137
*/
139138
public DocumentInput<?> getPrimaryInput() {
140-
return primaryInput;
139+
return primaryDocumentInput;
141140
}
142141

143142
/**
@@ -155,6 +154,6 @@ public DataFormat getPrimaryFormat() {
155154
* @return the secondary inputs
156155
*/
157156
public Map<DataFormat, DocumentInput<?>> getSecondaryInputs() {
158-
return secondaryInputs;
157+
return secondaryDocumentInputs;
159158
}
160159
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 50 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,12 @@
2828
import java.io.IOException;
2929
import java.util.ArrayList;
3030
import java.util.Collection;
31-
import java.util.HashMap;
3231
import java.util.LinkedHashMap;
3332
import java.util.List;
3433
import java.util.Map;
3534
import java.util.Objects;
35+
import java.util.concurrent.ConcurrentLinkedQueue;
36+
import java.util.concurrent.atomic.AtomicLong;
3637

3738
/**
3839
* A composite {@link IndexingExecutionEngine} that orchestrates indexing across
@@ -52,10 +53,11 @@ public class CompositeIndexingExecutionEngine implements IndexingExecutionEngine
5253

5354
private static final Logger logger = LogManager.getLogger(CompositeIndexingExecutionEngine.class);
5455

55-
private final IndexingExecutionEngine<?, ?> primaryEngine;
56-
private final List<IndexingExecutionEngine<?, ?>> secondaryEngines;
57-
private final List<IndexingExecutionEngine<?, ?>> allEngines;
56+
private final IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>> primaryEngine;
57+
private final List<IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>>> secondaryEngines;
5858
private final CompositeDataFormat compositeDataFormat;
59+
private final CompositeDataFormatWriterPool dataFormatWriterPool;
60+
private final AtomicLong writerGenerationCounter;
5961

6062
/**
6163
* Constructs a CompositeIndexingExecutionEngine by reading index settings to
@@ -110,16 +112,18 @@ public CompositeIndexingExecutionEngine(
110112
}
111113
this.secondaryEngines = List.copyOf(secondaries);
112114

113-
List<IndexingExecutionEngine<?, ?>> all = new ArrayList<>();
114-
all.add(this.primaryEngine);
115-
all.addAll(this.secondaryEngines);
116-
this.allEngines = List.copyOf(all);
117-
118115
List<DataFormat> allFormats = new ArrayList<>();
119-
for (IndexingExecutionEngine<?, ?> engine : this.allEngines) {
116+
allFormats.add(primaryEngine.getDataFormat());
117+
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
120118
allFormats.add(engine.getDataFormat());
121119
}
122120
this.compositeDataFormat = new CompositeDataFormat(allFormats);
121+
this.writerGenerationCounter = new AtomicLong(0);
122+
this.dataFormatWriterPool = new CompositeDataFormatWriterPool(
123+
() -> new CompositeWriter(this, primaryEngine.getDataFormat(), writerGenerationCounter.getAndIncrement()),
124+
ConcurrentLinkedQueue::new,
125+
Runtime.getRuntime().availableProcessors()
126+
);
123127
}
124128

125129
/**
@@ -160,12 +164,7 @@ static void validateFormatsRegistered(
160164

161165
@Override
162166
public Writer<CompositeDocumentInput> createWriter(long writerGeneration) {
163-
Map<DataFormat, Writer<?>> writerMap = new LinkedHashMap<>();
164-
for (IndexingExecutionEngine<?, ?> engine : allEngines) {
165-
Writer<?> writer = engine.createWriter(writerGeneration);
166-
writerMap.put(engine.getDataFormat(), writer);
167-
}
168-
return new CompositeWriter(writerMap);
167+
return new CompositeWriter(this, primaryEngine.getDataFormat(), writerGeneration);
169168
}
170169

171170
@Override
@@ -176,7 +175,9 @@ public Merger getMerger() {
176175
@Override
177176
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
178177
List<Segment> allSegments = new ArrayList<>();
179-
for (IndexingExecutionEngine<?, ?> engine : allEngines) {
178+
RefreshResult primaryResult = primaryEngine.refresh(refreshInput);
179+
allSegments.addAll(primaryResult.refreshedSegments());
180+
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
180181
RefreshResult result = engine.refresh(refreshInput);
181182
allSegments.addAll(result.refreshedSegments());
182183
}
@@ -191,27 +192,55 @@ public CompositeDataFormat getDataFormat() {
191192

192193
@Override
193194
public long getNativeBytesUsed() {
194-
long total = 0;
195-
for (IndexingExecutionEngine<?, ?> engine : allEngines) {
195+
long total = primaryEngine.getNativeBytesUsed();
196+
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
196197
total += engine.getNativeBytesUsed();
197198
}
198199
return total;
199200
}
200201

201202
@Override
202203
public void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IOException {
203-
for (IndexingExecutionEngine<?, ?> engine : allEngines) {
204+
primaryEngine.deleteFiles(filesToDelete);
205+
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
204206
engine.deleteFiles(filesToDelete);
205207
}
206208
}
207209

208210
@Override
209211
public CompositeDocumentInput newDocumentInput() {
210212
DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
211-
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new HashMap<>();
213+
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
212214
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
213215
secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
214216
}
215217
return new CompositeDocumentInput(primaryEngine.getDataFormat(), primaryInput, secondaryInputMap);
216218
}
219+
220+
/**
221+
* Returns the primary delegate engine.
222+
*
223+
* @return the primary engine
224+
*/
225+
public IndexingExecutionEngine<?, ?> getPrimaryDelegate() {
226+
return primaryEngine;
227+
}
228+
229+
/**
230+
* Returns the secondary delegate engines.
231+
*
232+
* @return the secondary engines
233+
*/
234+
public List<IndexingExecutionEngine<? extends DataFormat, ? extends DocumentInput<?>>> getSecondaryDelegates() {
235+
return secondaryEngines;
236+
}
237+
238+
/**
239+
* Returns the composite writer pool for this engine.
240+
*
241+
* @return the writer pool
242+
*/
243+
public CompositeDataFormatWriterPool getDataFormatWriterPool() {
244+
return dataFormatWriterPool;
245+
}
217246
}

0 commit comments

Comments
 (0)