Skip to content

Commit d936e5b

Browse files
shank9918Shashank Gowri
andauthored
Feature/datafusion (#20134)
* No-op load writer files implementation * Implemented flush conditions and improved engine failure handling --------- Co-authored-by: Shashank Gowri <[email protected]>
1 parent ed27e1d commit d936e5b

File tree

10 files changed

+314
-184
lines changed

10 files changed

+314
-184
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,24 @@
1010
import org.apache.arrow.vector.types.pojo.Schema;
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
13-
import org.opensearch.common.settings.Setting;
1413
import org.opensearch.common.settings.Settings;
1514
import org.opensearch.index.engine.exec.DataFormat;
1615
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
1716
import org.opensearch.index.engine.exec.Merger;
1817
import org.opensearch.index.engine.exec.RefreshInput;
1918
import org.opensearch.index.engine.exec.RefreshResult;
2019
import org.opensearch.index.engine.exec.Writer;
21-
import org.opensearch.index.engine.exec.WriterFileSet;
20+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
2221
import org.opensearch.index.shard.ShardPath;
2322

24-
import java.io.Closeable;
2523
import java.io.IOException;
26-
import java.nio.file.DirectoryStream;
2724
import java.nio.file.Files;
2825
import java.nio.file.Path;
2926
import java.nio.file.Paths;
30-
import java.util.ArrayList;
3127
import java.util.Collection;
3228
import java.util.List;
3329
import java.util.Map;
3430
import java.util.function.Supplier;
35-
import java.util.regex.Matcher;
36-
import java.util.regex.Pattern;
37-
import java.util.stream.StreamSupport;
3831

3932
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
4033

@@ -72,11 +65,9 @@ public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDa
7265
private static final Logger logger = LogManager.getLogger(ParquetExecutionEngine.class);
7366

7467
public static final String FILE_NAME_PREFIX = "_parquet_file_generation";
75-
private static final Pattern FILE_PATTERN = Pattern.compile(".*_(\\d+)\\.parquet$", Pattern.CASE_INSENSITIVE);
7668
public static final String FILE_NAME_EXT = ".parquet";
7769

7870
private final Supplier<Schema> schema;
79-
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
8071
private final ShardPath shardPath;
8172
private final ParquetMerger parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
8273
private final ArrowBufferPool arrowBufferPool;
@@ -88,24 +79,12 @@ public ParquetExecutionEngine(Settings settings, Supplier<Schema> schema, ShardP
8879
}
8980

9081
@Override
91-
public void loadWriterFiles() throws IOException {
92-
try (DirectoryStream<Path> stream = Files.newDirectoryStream(shardPath.getDataPath(), "*" + FILE_NAME_EXT)) {
93-
StreamSupport.stream(stream.spliterator(), false)
94-
.map(Path::getFileName)
95-
.map(Path::toString)
96-
.map(FILE_PATTERN::matcher)
97-
.filter(Matcher::matches)
98-
.map(m -> WriterFileSet.builder()
99-
.directory(shardPath.getDataPath())
100-
.writerGeneration(Long.parseLong(m.group(1)))
101-
.addFile(m.group(0))
102-
.build())
103-
.forEach(filesWrittenAlready::add);
104-
}
82+
public void loadWriterFiles(CatalogSnapshot catalogSnapshot) {
83+
// Noop, as refresh is handled in layers above
10584
}
10685

10786
@Override
108-
public void deleteFiles(Map<String,Collection<String>> filesToDelete) throws IOException {
87+
public void deleteFiles(Map<String, Collection<String>> filesToDelete) {
10988
if (filesToDelete.get(PARQUET_DATA_FORMAT.name()) != null) {
11089
Collection<String> parquetFilesToDelete = filesToDelete.get(PARQUET_DATA_FORMAT.name());
11190
for (String fileName : parquetFilesToDelete) {
@@ -127,7 +106,7 @@ public List<String> supportedFieldTypes() {
127106
}
128107

129108
@Override
130-
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
109+
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) {
131110
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
132111
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool);
133112
}
@@ -138,10 +117,9 @@ public Merger getMerger() {
138117
}
139118

140119
@Override
141-
public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
142-
RefreshResult refreshResult = new RefreshResult();
143-
// NO-OP, as refresh is being handled at CompositeIndexingExecution Engin
144-
return refreshResult;
120+
public RefreshResult refresh(RefreshInput refreshInput) {
121+
// NO-OP, as refresh is being handled at CompositeIndexingExecutionEngine
122+
return new RefreshResult();
145123
}
146124

147125
@Override

server/src/main/java/org/opensearch/index/engine/Engine.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1149,34 +1149,13 @@ public abstract void forceMerge(
11491149
*/
11501150
public abstract SafeCommitInfo getSafeCommitInfo();
11511151

1152-
/**
1153-
* If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure
1154-
* that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled
1155-
* by the uncaught exception handler that we install during bootstrap. If the specified throwable does indeed contain a fatal error,
1156-
* the specified message will attempt to be logged before throwing the fatal error. If the specified throwable does not contain a fatal
1157-
* error, this method is a no-op.
1158-
*
1159-
* @param maybeMessage the message to maybe log
1160-
* @param maybeFatal the throwable that maybe contains a fatal error
1161-
*/
1162-
@SuppressWarnings("finally")
1163-
private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
1164-
ExceptionsHelper.maybeError(maybeFatal).ifPresent(error -> {
1165-
try {
1166-
logger.error(maybeMessage, error);
1167-
} finally {
1168-
throw error;
1169-
}
1170-
});
1171-
}
1172-
11731152
/**
11741153
* fail engine due to some error. the engine will also be closed.
11751154
* The underlying store is marked corrupted iff failure is caused by index corruption
11761155
*/
11771156
public void failEngine(String reason, @Nullable Exception failure) {
11781157
if (failure != null) {
1179-
maybeDie(reason, failure);
1158+
maybeDie(logger, reason, failure);
11801159
}
11811160
if (failEngineLock.tryLock()) {
11821161
try {
@@ -1285,19 +1264,6 @@ protected boolean maybeFailEngine(String source, Exception e) {
12851264
return false;
12861265
}
12871266

1288-
/**
1289-
* Event listener for the engine
1290-
*
1291-
* @opensearch.api
1292-
*/
1293-
@PublicApi(since = "1.0.0")
1294-
public interface EventListener {
1295-
/**
1296-
* Called when a fatal exception occurred
1297-
*/
1298-
default void onFailedEngine(String reason, @Nullable Exception e) {}
1299-
}
1300-
13011267
/**
13021268
* Supplier for the searcher
13031269
*

server/src/main/java/org/opensearch/index/engine/exec/IndexingExecutionEngine.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.opensearch.index.engine.exec;
1010

11+
import org.opensearch.index.engine.exec.coord.CatalogSnapshot;
1112
import org.opensearch.index.shard.ShardPath;
1213

1314
import java.io.Closeable;
@@ -29,11 +30,12 @@ Writer<? extends DocumentInput<?>> createWriter(long writerGeneration)
2930

3031
DataFormat getDataFormat();
3132

32-
void loadWriterFiles() throws IOException;
33+
void loadWriterFiles(CatalogSnapshot catalogSnapshot)
34+
throws IOException; // Bootstrap hook to make engine aware of previously written files from CatalogSnapshot
3335

3436
default long getNativeBytesUsed() {
3537
return 0;
3638
}
3739

38-
void deleteFiles(Map<String,Collection<String>> filesToDelete) throws IOException;
40+
void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IOException;
3941
}

server/src/main/java/org/opensearch/index/engine/exec/bridge/Indexer.java

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88

99
package org.opensearch.index.engine.exec.bridge;
1010

11+
import org.apache.logging.log4j.Logger;
12+
import org.opensearch.ExceptionsHelper;
13+
import org.opensearch.common.Nullable;
1114
import org.opensearch.common.annotation.PublicApi;
15+
import org.opensearch.common.unit.TimeValue;
16+
import org.opensearch.core.common.unit.ByteSizeValue;
1217
import org.opensearch.index.engine.Engine;
1318
import org.opensearch.index.engine.EngineException;
1419
import org.opensearch.index.engine.SafeCommitInfo;
@@ -27,10 +32,32 @@
2732
@PublicApi(since = "1.0.0")
2833
public interface Indexer {
2934

35+
/**
36+
* Perform document index operation on the engine
37+
* @param index operation to perform
38+
* @return {@link Engine.IndexResult} containing updated translog location, version and
39+
* document specific failures
40+
*
41+
* Note: engine level failures (i.e. persistent engine failures) are thrown
42+
*/
3043
Engine.IndexResult index(Engine.Index index) throws IOException;
3144

45+
/**
46+
* Perform document delete operation on the engine
47+
* @param delete operation to perform
48+
* @return {@link Engine.DeleteResult} containing updated translog location, version and
49+
* document specific failures
50+
*
51+
* Note: engine level failures (i.e. persistent engine failures) are thrown
52+
*/
3253
Engine.DeleteResult delete(Engine.Delete delete) throws IOException;
3354

55+
/**
56+
* Perform a no-op equivalent operation on the engine.
57+
* @param noOp
58+
* @return
59+
* @throws IOException
60+
*/
3461
Engine.NoOpResult noOp(Engine.NoOp noOp) throws IOException;
3562

3663
/**
@@ -42,10 +69,22 @@ public interface Indexer {
4269
*/
4370
int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;
4471

72+
/**
73+
* @param reason why is the history requested
74+
* @param startingSeqNo sequence number beyond which history should exist
75+
* @return tru iff minimum retained sequence number during indexing is not less than startingSeqNo
76+
*/
4577
boolean hasCompleteOperationHistory(String reason, long startingSeqNo);
4678

79+
/**
80+
* Total amount of RAM bytes used for active indexing to buffer unflushed documents.
81+
*/
4782
long getIndexBufferRAMBytesUsed();
4883

84+
/**
85+
* @param verbose unused param
86+
* @return list of segments the indexer is aware of (previously created + new ones)
87+
*/
4988
List<Segment> segments(boolean verbose);
5089

5190
/**
@@ -103,9 +142,22 @@ public interface Indexer {
103142
*/
104143
void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);
105144

145+
/**
146+
* @return Time in nanos for last write through the indexer, always increasing.
147+
*/
148+
long getLastWriteNanos();
149+
150+
/**
151+
* Fills up the local checkpoints history with no-ops until the local checkpoint
152+
* and the max seen sequence ID are identical.
153+
* @param primaryTerm the shards primary term this indexer was created for
154+
* @return the number of no-ops added
155+
*/
106156
int fillSeqNoGaps(long primaryTerm) throws IOException;
107157

108-
// File format methods follow below
158+
/**
159+
* Performs a force merge operation on this engine.
160+
*/
109161
void forceMerge(
110162
boolean flush,
111163
int maxNumSegments,
@@ -115,12 +167,35 @@ void forceMerge(
115167
String forceMergeUUID
116168
) throws EngineException, IOException;
117169

170+
/**
171+
* Applies changes to input settings.
172+
*/
173+
void onSettingsChanged(TimeValue translogRetentionAge, ByteSizeValue translogRetentionSize, long softDeletesRetentionOps);
174+
175+
/**
176+
* Flushes active indexing buffer to disk.
177+
*/
118178
void writeIndexingBuffer() throws EngineException;
119179

180+
/**
181+
* Creates segments for data in buffers, and make them available for search.
182+
*/
120183
void refresh(String source) throws EngineException;
121184

185+
/**
186+
* Commits the data and state to disk, resulting in documents being persisted onto the underlying formats.
187+
*/
122188
void flush(boolean force, boolean waitIfOngoing) throws EngineException;
123189

190+
/**
191+
* Checks if data should be committed to disk, mainly based on translog thresholds.
192+
* @return true iff flush should trigger.
193+
*/
194+
boolean shouldPeriodicallyFlush();
195+
196+
/**
197+
* Returns info about the safe commit.
198+
*/
124199
SafeCommitInfo getSafeCommitInfo();
125200

126201
// Translog methods follow below
@@ -135,6 +210,42 @@ Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo
135210

136211
void flushAndClose() throws IOException;
137212

213+
void failEngine(String reason, @Nullable Exception failure);
214+
215+
/**
216+
* If the specified throwable contains a fatal error in the throwable graph, such a fatal error will be thrown. Callers should ensure
217+
* that there are no catch statements that would catch an error in the stack as the fatal error here should go uncaught and be handled
218+
* by the uncaught exception handler that we install during bootstrap. If the specified throwable does indeed contain a fatal error,
219+
* the specified message will attempt to be logged before throwing the fatal error. If the specified throwable does not contain a fatal
220+
* error, this method is a no-op.
221+
*
222+
* @param maybeMessage the message to maybe log
223+
* @param maybeFatal the throwable that maybe contains a fatal error
224+
*/
225+
@SuppressWarnings("finally")
226+
default void maybeDie(final Logger logger, final String maybeMessage, final Throwable maybeFatal) {
227+
ExceptionsHelper.maybeError(maybeFatal).ifPresent(error -> {
228+
try {
229+
logger.error(maybeMessage, error);
230+
} finally {
231+
throw error;
232+
}
233+
});
234+
}
235+
236+
/**
237+
* Event listener for the engine
238+
*
239+
* @opensearch.api
240+
*/
241+
@PublicApi(since = "1.0.0")
242+
public interface EventListener {
243+
/**
244+
* Called when a fatal exception occurred
245+
*/
246+
default void onFailedEngine(String reason, @Nullable Exception e) {}
247+
}
248+
138249
/**
139250
* Reads the current stored history ID from commit data.
140251
*/

server/src/main/java/org/opensearch/index/engine/exec/commit/Committer.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import java.io.Closeable;
1515
import java.io.IOException;
1616
import java.util.Map;
17-
import java.util.Optional;
1817

1918
public interface Committer extends Closeable {
2019

server/src/main/java/org/opensearch/index/engine/exec/commit/LuceneCommitEngine.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@
2626
import java.nio.file.Path;
2727
import java.util.Collection;
2828
import java.util.Map;
29-
import java.util.Optional;
3029
import java.util.function.LongSupplier;
3130

32-
import static org.opensearch.index.engine.exec.coord.CatalogSnapshot.CATALOG_SNAPSHOT_KEY;
33-
3431
public class LuceneCommitEngine implements Committer {
3532

3633
private final Logger logger;

0 commit comments

Comments
 (0)