Skip to content

Commit d60b7ae

Browse files
cocoszTanvir Alam
andauthored
Added unit tests for parquet-dataformat plugin (#20180)
* Added unit tests for parquetwriter and parquetexecutionengine classes Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * Added unit tests to cover e2e flow Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * removed excessive mocking from unit tests Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * Added changes to test using real objects Signed-off-by: Tanvir Alam <tanvralm@amazon.com> * added changes to formalize tests Signed-off-by: Tanvir Alam <tanvralm@amazon.com> --------- Signed-off-by: Tanvir Alam <tanvralm@amazon.com> Co-authored-by: Tanvir Alam <tanvralm@amazon.com>
1 parent 46002a9 commit d60b7ae

File tree

6 files changed

+1416
-0
lines changed

6 files changed

+1416
-0
lines changed
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
5+
package com.parquet.parquetdataformat.engine;
6+
7+
import com.parquet.parquetdataformat.merge.ParquetMergeExecutor;
8+
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
9+
import com.parquet.parquetdataformat.writer.ParquetWriter;
10+
import org.apache.arrow.vector.types.Types;
11+
import org.apache.arrow.vector.types.pojo.Field;
12+
import org.apache.arrow.vector.types.pojo.Schema;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.index.Index;
15+
import org.opensearch.core.index.shard.ShardId;
16+
import org.opensearch.index.engine.exec.DataFormat;
17+
import org.opensearch.index.engine.exec.FileInfos;
18+
import org.opensearch.index.engine.exec.FlushIn;
19+
import org.opensearch.index.engine.exec.Merger;
20+
import org.opensearch.index.engine.exec.RefreshInput;
21+
import org.opensearch.index.engine.exec.RefreshResult;
22+
import org.opensearch.index.engine.exec.WriteResult;
23+
import org.opensearch.index.engine.exec.Writer;
24+
import org.opensearch.index.engine.exec.WriterFileSet;
25+
import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter;
26+
import org.opensearch.index.mapper.BooleanFieldMapper;
27+
import org.opensearch.index.mapper.KeywordFieldMapper;
28+
import org.opensearch.index.mapper.MappedFieldType;
29+
import org.opensearch.index.mapper.NumberFieldMapper;
30+
import org.opensearch.index.mapper.TextFieldMapper;
31+
import org.opensearch.index.shard.ShardPath;
32+
import org.opensearch.test.OpenSearchTestCase;
33+
34+
import java.io.IOException;
35+
import java.nio.file.Files;
36+
import java.nio.file.Path;
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.Collection;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.function.Supplier;
44+
45+
import static com.parquet.parquetdataformat.engine.ParquetDataFormat.PARQUET_DATA_FORMAT;
46+
47+
/**
48+
* Unit Tests for ParquetExecutionEngine covering all must-have scenarios.
49+
*/
50+
51+
public class ParquetExecutionEngineTests extends OpenSearchTestCase {
52+
53+
private static final String TEST_INDEX_NAME = "test-index";
54+
private static final String TEST_INDEX_UUID = "test-uuid";
55+
private static final int TEST_SHARD_ID = 0;
56+
private static final String PRIMARY_TERM_FIELD_NAME = "_primary";
57+
private static final String ROW_ID_FIELD_NAME = "_id";
58+
59+
private static final String USER_ID_FIELD_NAME = "user_id";
60+
private static final String COUNT_FIELD_NAME = "count";
61+
private static final String ENABLED_FIELD_NAME = "enabled";
62+
private static final String ID_FIELD_NAME = "id";
63+
private static final String NAME_FIELD_NAME = "name";
64+
private static final String ACTIVE_FIELD_NAME = "active";
65+
private static final String MESSAGE_FIELD_NAME = "message";
66+
private static final String STATUS_FIELD_NAME = "status";
67+
private static final String PRICE_FIELD_NAME = "price";
68+
69+
private static final long TEST_GENERATION = 42L;
70+
private static final long PRIMARY_TERM_VALUE = 1L;
71+
private static final long FIRST_ROW_ID = 1001L;
72+
private static final long SECOND_ROW_ID = 1002L;
73+
private static final long THIRD_ROW_ID = 1003L;
74+
private static final long USER_ID_VALUE = 12345L;
75+
private static final int COUNT_VALUE = 42;
76+
private static final boolean ENABLED_VALUE = true;
77+
78+
private static final String PARQUET_FILE_PREFIX = "test_parquet";
79+
private static final String PARQUET_FILE_EXTENSION = ".parquet";
80+
private static final String OTHER_FILE_PREFIX = "test_other";
81+
private static final String OTHER_FILE_EXTENSION = ".txt";
82+
private static final String OTHER_FORMAT_NAME = "OTHER_FORMAT";
83+
private static final String NON_EXISTENT_FILE_PATH = "/non/existent/file.parquet";
84+
private static final String PARQUET_FILE_GENERATION_PATTERN = "_parquet_file_generation_";
85+
86+
private static final int EXPECTED_TEST_DATA_COUNT = 3;
87+
88+
private Settings settings;
89+
private Supplier<Schema> schemaSupplier;
90+
private ShardPath shardPath;
91+
private Schema testSchema;
92+
private ParquetExecutionEngine engine;
93+
94+
@Override
95+
public void setUp() throws Exception {
96+
super.setUp();
97+
settings = Settings.builder().build();
98+
testSchema = createTestSchema();
99+
schemaSupplier = () -> testSchema;
100+
Path tempDir = createTempDir();
101+
Index index = new Index(TEST_INDEX_NAME, TEST_INDEX_UUID);
102+
ShardId shardId = new ShardId(index, TEST_SHARD_ID);
103+
Path shardDataPath = tempDir.resolve(index.getUUID()).resolve(String.valueOf(TEST_SHARD_ID));
104+
Path shardStatePath = tempDir.resolve(index.getUUID()).resolve(String.valueOf(TEST_SHARD_ID));
105+
shardPath = new ShardPath(false, shardDataPath, shardStatePath, shardId);
106+
107+
Path dataFormatDir = shardDataPath.resolve(PARQUET_DATA_FORMAT.name());
108+
Files.createDirectories(dataFormatDir);
109+
110+
engine = new ParquetExecutionEngine(settings, schemaSupplier, shardPath);
111+
}
112+
113+
@Override
114+
public void tearDown() throws Exception {
115+
if (engine != null) {
116+
engine.close();
117+
engine = null;
118+
}
119+
super.tearDown();
120+
}
121+
122+
private Schema createTestSchema() {
123+
List<Field> fields = Arrays.asList(
124+
Field.nullable(CompositeDataFormatWriter.ROW_ID, Types.MinorType.BIGINT.getType()),
125+
Field.nullable(PRIMARY_TERM_FIELD_NAME, Types.MinorType.BIGINT.getType()),
126+
Field.nullable(ID_FIELD_NAME, Types.MinorType.BIGINT.getType()),
127+
Field.nullable(NAME_FIELD_NAME, Types.MinorType.VARCHAR.getType()),
128+
Field.nullable(ACTIVE_FIELD_NAME, Types.MinorType.BIT.getType()),
129+
Field.nullable(USER_ID_FIELD_NAME, Types.MinorType.BIGINT.getType()),
130+
Field.nullable(MESSAGE_FIELD_NAME, Types.MinorType.VARCHAR.getType()),
131+
Field.nullable(COUNT_FIELD_NAME, Types.MinorType.INT.getType()),
132+
Field.nullable(STATUS_FIELD_NAME, Types.MinorType.VARCHAR.getType()),
133+
Field.nullable(PRICE_FIELD_NAME, Types.MinorType.FLOAT8.getType()),
134+
Field.nullable(ENABLED_FIELD_NAME, Types.MinorType.BIT.getType())
135+
);
136+
return new Schema(fields);
137+
}
138+
139+
140+
141+
public void testDeleteFilesOnlyDeletesParquetFiles() throws IOException {
142+
Path parquetFile = createTempFile(PARQUET_FILE_PREFIX, PARQUET_FILE_EXTENSION);
143+
Path otherFile = createTempFile(OTHER_FILE_PREFIX, OTHER_FILE_EXTENSION);
144+
Map<String, Collection<String>> filesToDelete = new HashMap<>();
145+
filesToDelete.put(PARQUET_DATA_FORMAT.name(), List.of(parquetFile.toString()));
146+
filesToDelete.put(OTHER_FORMAT_NAME, List.of(otherFile.toString()));
147+
engine.deleteFiles(filesToDelete);
148+
assertFalse(Files.exists(parquetFile));
149+
assertTrue(Files.exists(otherFile));
150+
Files.deleteIfExists(otherFile);
151+
}
152+
153+
public void testDeleteFilesThrowsWhenFileDoesNotExist() {
154+
Map<String, Collection<String>> filesToDelete = Map.of(
155+
PARQUET_DATA_FORMAT.name(),
156+
List.of(NON_EXISTENT_FILE_PATH)
157+
);
158+
RuntimeException ex = expectThrows(RuntimeException.class, () -> engine.deleteFiles(filesToDelete));
159+
assertNotNull(ex.getCause());
160+
}
161+
162+
public void testGetDataFormatReturnsParquetDataFormat() {
163+
DataFormat dataFormat = engine.getDataFormat();
164+
assertNotNull(dataFormat);
165+
assertEquals(PARQUET_DATA_FORMAT.name(), dataFormat.name());
166+
}
167+
168+
public void testGetNativeBytesUsedReturnsNonNegative() {
169+
long nativeBytes = engine.getNativeBytesUsed();
170+
assertTrue(nativeBytes >= 0);
171+
}
172+
173+
/**
174+
* Tests complete writer workflow with multiple documents.
175+
*/
176+
public void testCompleteWriterWorkflowWithMultipleDocuments() throws Exception {
177+
Writer<ParquetDocumentInput> writer = engine.createWriter(TEST_GENERATION);
178+
179+
Object[][] testData = {
180+
{FIRST_ROW_ID, new NumberFieldMapper.NumberFieldType(USER_ID_FIELD_NAME, NumberFieldMapper.NumberType.LONG), USER_ID_FIELD_NAME, USER_ID_VALUE},
181+
{SECOND_ROW_ID, new NumberFieldMapper.NumberFieldType(COUNT_FIELD_NAME, NumberFieldMapper.NumberType.INTEGER), COUNT_FIELD_NAME, COUNT_VALUE},
182+
{THIRD_ROW_ID, new BooleanFieldMapper.BooleanFieldType(ENABLED_FIELD_NAME), ENABLED_FIELD_NAME, ENABLED_VALUE}
183+
};
184+
185+
List<WriteResult> writeResults = new ArrayList<>();
186+
List<ParquetDocumentInput> documentInputs = new ArrayList<>();
187+
188+
for (Object[] data : testData) {
189+
ParquetDocumentInput doc = writer.newDocumentInput();
190+
doc.addRowIdField(ROW_ID_FIELD_NAME, (Long) data[0]);
191+
doc.setPrimaryTerm(PRIMARY_TERM_FIELD_NAME, PRIMARY_TERM_VALUE);
192+
doc.addField((MappedFieldType) data[1], data[3]);
193+
WriteResult result = doc.addToWriter();
194+
assertTrue(result.success());
195+
writeResults.add(result);
196+
documentInputs.add(doc);
197+
}
198+
199+
FileInfos fileInfos = writer.flush(new FlushIn() {});
200+
WriterFileSet parquetFileSet = fileInfos.getWriterFileSet(PARQUET_DATA_FORMAT).orElse(null);
201+
202+
assertNotNull(parquetFileSet);
203+
assertFalse(parquetFileSet.getFiles().isEmpty());
204+
205+
boolean hasParquetFile = parquetFileSet.getFiles().stream()
206+
.anyMatch(f -> f.endsWith(PARQUET_FILE_EXTENSION) && f.contains(PARQUET_FILE_GENERATION_PATTERN + TEST_GENERATION));
207+
assertTrue(hasParquetFile);
208+
209+
assertEquals(TEST_GENERATION, parquetFileSet.getWriterGeneration());
210+
assertEquals(EXPECTED_TEST_DATA_COUNT, writeResults.size());
211+
212+
String parquetFileName = parquetFileSet.getFiles().stream()
213+
.filter(f -> f.endsWith(PARQUET_FILE_EXTENSION))
214+
.findFirst()
215+
.orElse(null);
216+
assertNotNull("Parquet file name should be present in file set", parquetFileName);
217+
218+
Path parquetFilePath = Path.of(parquetFileSet.getDirectory(), parquetFileName);
219+
assertTrue("Parquet file should exist on disk: " + parquetFilePath, Files.exists(parquetFilePath));
220+
assertTrue("Parquet file should be a regular file: " + parquetFilePath, Files.isRegularFile(parquetFilePath));
221+
assertTrue("Parquet file should have content (size > 0): " + parquetFilePath, Files.size(parquetFilePath) > 0);
222+
223+
writer.close();
224+
}
225+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat.fields;
10+
11+
import com.parquet.parquetdataformat.fields.core.data.number.LongParquetField;
12+
import org.opensearch.index.mapper.SeqNoFieldMapper;
13+
import org.opensearch.test.OpenSearchTestCase;
14+
15+
import java.util.Set;
16+
17+
/**
18+
* Unit test suite for {@link ArrowFieldRegistry} functionality.
19+
*/
20+
public class ArrowFieldRegistryTests extends OpenSearchTestCase {
21+
22+
private static final String PRIMARY_TERM_FIELD_NAME = SeqNoFieldMapper.PRIMARY_TERM_NAME;
23+
private static final String UNKNOWN_FIELD_NAME = "unknown_field_type";
24+
25+
public void testRegistryInitialization() {
26+
ArrowFieldRegistry.RegistryStats stats = ArrowFieldRegistry.getStats();
27+
28+
assertNotNull(stats);
29+
assertTrue(stats.getTotalFields() > 0);
30+
assertTrue(stats.getAllFieldTypes().contains(PRIMARY_TERM_FIELD_NAME));
31+
}
32+
33+
public void testFieldRetrieval() {
34+
ParquetField primaryTermField = ArrowFieldRegistry.getParquetField(PRIMARY_TERM_FIELD_NAME);
35+
36+
assertNotNull(primaryTermField);
37+
assertTrue(primaryTermField instanceof LongParquetField);
38+
assertNotNull(primaryTermField.getArrowType());
39+
assertNotNull(primaryTermField.getFieldType());
40+
41+
assertNull(ArrowFieldRegistry.getParquetField(UNKNOWN_FIELD_NAME));
42+
}
43+
44+
public void testFieldNameCollectionImmutability() {
45+
Set<String> fieldNames = ArrowFieldRegistry.getRegisteredFieldNames();
46+
47+
assertNotNull(fieldNames);
48+
expectThrows(UnsupportedOperationException.class,
49+
() -> fieldNames.add("test_field"));
50+
}
51+
52+
public void testAllRegisteredFieldsAreValid() {
53+
Set<String> fieldNames = ArrowFieldRegistry.getRegisteredFieldNames();
54+
55+
for (String fieldName : fieldNames) {
56+
ParquetField field = ArrowFieldRegistry.getParquetField(fieldName);
57+
assertNotNull(field);
58+
assertNotNull(field.getArrowType());
59+
assertNotNull(field.getFieldType());
60+
}
61+
}
62+
}

0 commit comments

Comments
 (0)