Skip to content

Commit 893590e

Browse files
committed
Added additional unit tests
1 parent d23b786 commit 893590e

File tree

5 files changed

+303
-11
lines changed

5 files changed

+303
-11
lines changed

contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilBatchReader.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,30 @@ public DaffodilBatchReader(DaffodilReaderConfig readerConfig, EasySubScan scan,
5757
errorContext = negotiator.parentErrorContext();
5858
DaffodilFormatConfig dafConfig = readerConfig.plugin.getConfig();
5959

60-
String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
60+
String schemaFile = dafConfig.getSchemaFile();
61+
String schemaURIString = dafConfig.getSchemaURI();
6162
String rootName = dafConfig.getRootName();
6263
String rootNamespace = dafConfig.getRootNamespace();
6364
boolean validationMode = dafConfig.getValidationMode();
6465

66+
// Determine the schema URI:
67+
// - If schemaFile is provided, it takes precedence and is looked up in the registry area
68+
// - Otherwise, use schemaURI (full path)
6569
URI dfdlSchemaURI;
6670
try {
67-
dfdlSchemaURI = new URI(schemaURIString);
71+
if (schemaFile != null && !schemaFile.isEmpty()) {
72+
// schemaFile takes precedence - construct path from registry area
73+
Path registryArea = readerConfig.plugin.getContext()
74+
.getRemoteDaffodilSchemaRegistry().getRegistryArea();
75+
Path schemaPath = new Path(registryArea, schemaFile);
76+
dfdlSchemaURI = schemaPath.toUri();
77+
} else if (schemaURIString != null && !schemaURIString.isEmpty()) {
78+
// Use the provided schemaURI
79+
dfdlSchemaURI = new URI(schemaURIString);
80+
} else {
81+
// Neither provided - will result in empty URI
82+
dfdlSchemaURI = new URI("");
83+
}
6884
} catch (URISyntaxException e) {
6985
throw UserException.validationError(e).build(logger);
7086
}

contrib/format-daffodil/src/main/java/org/apache/drill/exec/store/daffodil/DaffodilFormatConfig.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public class DaffodilFormatConfig implements FormatPluginConfig {
3636

3737
public final List<String> extensions;
3838
public final String schemaURI;
39+
public final String schemaFile;
3940
public final boolean validationMode;
4041
public final String rootName;
4142
public final String rootNamespace;
@@ -45,8 +46,11 @@ public class DaffodilFormatConfig implements FormatPluginConfig {
4546
* It creates problems with defaulting them (they default to null) which cannot be unboxed.
4647
*/
4748
@JsonCreator
48-
public DaffodilFormatConfig(@JsonProperty("extensions") List<String> extensions,
49-
@JsonProperty("schemaURI") String schemaURI, @JsonProperty("rootName") String rootName,
49+
public DaffodilFormatConfig(
50+
@JsonProperty("extensions") List<String> extensions,
51+
@JsonProperty("schemaURI") String schemaURI,
52+
@JsonProperty("schemaFile") String schemaFile,
53+
@JsonProperty("rootName") String rootName,
5054
@JsonProperty("rootNamespace") String rootNamespace,
5155
@JsonProperty("validationMode") boolean validationMode) {
5256

@@ -56,6 +60,7 @@ public DaffodilFormatConfig(@JsonProperty("extensions") List<String> extensions,
5660
this.rootName = rootName;
5761
this.rootNamespace = rootNamespace;
5862
this.schemaURI = schemaURI;
63+
this.schemaFile = schemaFile;
5964
// no default. Users must pick.
6065
this.validationMode = validationMode;
6166
}
@@ -69,6 +74,10 @@ public String getSchemaURI() {
6974
return schemaURI;
7075
}
7176

77+
public String getSchemaFile() {
78+
return schemaFile;
79+
}
80+
7281
public String getRootName() {
7382
return rootName;
7483
}
@@ -88,7 +97,7 @@ public DaffodilReaderConfig getReaderConfig(DaffodilFormatPlugin plugin) {
8897

8998
@Override
9099
public int hashCode() {
91-
return Objects.hash(schemaURI, validationMode, rootName, rootNamespace);
100+
return Objects.hash(schemaURI, schemaFile, validationMode, rootName, rootNamespace);
92101
}
93102

94103
@Override
@@ -100,14 +109,21 @@ public boolean equals(Object obj) {
100109
return false;
101110
}
102111
DaffodilFormatConfig other = (DaffodilFormatConfig) obj;
103-
return Objects.equals(schemaURI, other.schemaURI) && Objects.equals(rootName,
104-
other.rootName) && Objects.equals(rootNamespace, other.rootNamespace) && Objects.equals(
105-
validationMode, other.validationMode);
112+
return Objects.equals(schemaURI, other.schemaURI)
113+
&& Objects.equals(schemaFile, other.schemaFile)
114+
&& Objects.equals(rootName, other.rootName)
115+
&& Objects.equals(rootNamespace, other.rootNamespace)
116+
&& Objects.equals(validationMode, other.validationMode);
106117
}
107118

108119
@Override
109120
public String toString() {
110-
return new PlanStringBuilder(this).field("schemaURI", schemaURI).field("rootName", rootName)
111-
.field("rootNamespace", rootNamespace).field("validationMode", validationMode).toString();
121+
return new PlanStringBuilder(this)
122+
.field("schemaURI", schemaURI)
123+
.field("schemaFile", schemaFile)
124+
.field("rootName", rootName)
125+
.field("rootNamespace", rootNamespace)
126+
.field("validationMode", validationMode)
127+
.toString();
112128
}
113129
}

contrib/format-daffodil/src/test/java/org/apache/drill/exec/store/daffodil/TestDaffodilReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public static void setup() throws Exception {
5555
// boilerplate call to start test rig
5656
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
5757

58-
DaffodilFormatConfig formatConfig = new DaffodilFormatConfig(null, "", "", "", false);
58+
DaffodilFormatConfig formatConfig = new DaffodilFormatConfig(null, "", "", "", "", false);
5959

6060
cluster.defineFormat("dfs", "daffodil", formatConfig);
6161

Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.drill.exec.store.daffodil;
20+
21+
import org.apache.commons.io.FileUtils;
22+
import org.apache.drill.categories.RowSetTest;
23+
import org.apache.drill.common.types.TypeProtos.MinorType;
24+
import org.apache.drill.exec.physical.rowSet.RowSet;
25+
import org.apache.drill.exec.record.metadata.SchemaBuilder;
26+
import org.apache.drill.exec.record.metadata.TupleMetadata;
27+
import org.apache.drill.test.ClusterFixture;
28+
import org.apache.drill.test.ClusterTest;
29+
import org.apache.drill.test.QueryBuilder;
30+
import org.apache.drill.test.rowSet.RowSetComparison;
31+
import org.junit.BeforeClass;
32+
import org.junit.Test;
33+
import org.junit.experimental.categories.Category;
34+
35+
import java.io.File;
36+
import java.io.IOException;
37+
import java.net.URISyntaxException;
38+
import java.nio.file.Path;
39+
import java.nio.file.Paths;
40+
import java.util.List;
41+
42+
import static org.apache.drill.test.HadoopUtils.hadoopToJavaPath;
43+
import static org.junit.Assert.assertEquals;
44+
import static org.junit.Assert.assertFalse;
45+
import static org.junit.Assert.assertTrue;
46+
47+
/**
48+
* Integration test for the complete Daffodil schema registration workflow.
49+
* This test verifies:
50+
* 1. Registering a DFDL schema using CREATE DAFFODIL SCHEMA (both JAR and XSD formats)
51+
* 2. Querying data files using the registered schema
52+
* 3. Unregistering the schema using DROP DAFFODIL SCHEMA
53+
*/
54+
@Category(RowSetTest.class)
55+
public class TestDaffodilSchemaRegistrationIntegration extends ClusterTest {
56+
57+
private static Path stagingArea;
58+
private static File schemaResourceDir;
59+
60+
@BeforeClass
61+
public static void setup() throws Exception {
62+
// Start the test cluster
63+
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
64+
65+
// Define the Daffodil format
66+
DaffodilFormatConfig formatConfig = new DaffodilFormatConfig(List.of("dat"), "", "", "", "", false);
67+
cluster.defineFormat("dfs", "daffodil", formatConfig);
68+
69+
// Copy test data and schema files to the test directory
70+
dirTestWatcher.copyResourceToRoot(Paths.get("data/"));
71+
dirTestWatcher.copyResourceToRoot(Paths.get("schema/"));
72+
73+
// Get the staging area for schema files
74+
stagingArea = hadoopToJavaPath(cluster.drillbit().getContext()
75+
.getRemoteDaffodilSchemaRegistry().getStagingArea());
76+
77+
// Locate the schema resource directory
78+
try {
79+
schemaResourceDir = Paths.get(
80+
TestDaffodilSchemaRegistrationIntegration.class
81+
.getClassLoader()
82+
.getResource("schema/")
83+
.toURI()
84+
).toFile();
85+
} catch (URISyntaxException e) {
86+
throw new RuntimeException("Failed to locate test schema directory", e);
87+
}
88+
}
89+
90+
/**
91+
* End-to-end test that:
92+
* 1. Registers a DFDL schema XSD file using CREATE DAFFODIL SCHEMA
93+
* 2. Queries a data file using the registered schema
94+
* 3. Verifies the query results
95+
*/
96+
@Test
97+
public void testRegisterXsdSchemaAndQuery() throws Exception {
98+
String schemaFileName = "simple.dfdl.xsd";
99+
File sourceSchema = new File(schemaResourceDir, schemaFileName);
100+
101+
// Copy the schema XSD to the staging area
102+
copyFileToStaging(sourceSchema, stagingArea, schemaFileName);
103+
104+
// Step 1: Register the schema
105+
client.testBuilder()
106+
.sqlQuery("CREATE DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
107+
.unOrdered()
108+
.baselineColumns("ok", "summary")
109+
.baselineValues(true, String.format("Daffodil schema jar %s has been registered successfully.", schemaFileName))
110+
.go();
111+
112+
// Verify schema was moved from staging to registry
113+
File stagingFile = stagingArea.resolve(schemaFileName).toFile();
114+
Path registryArea = hadoopToJavaPath(cluster.drillbit().getContext()
115+
.getRemoteDaffodilSchemaRegistry().getRegistryArea());
116+
File registryFile = registryArea.resolve(schemaFileName).toFile();
117+
118+
assertFalse("Schema file should be removed from staging after registration", stagingFile.exists());
119+
assertTrue("Schema file should exist in registry after registration", registryFile.exists());
120+
121+
// Step 2: Query data using the registered schema
122+
// After CREATE DAFFODIL SCHEMA, the schema file is moved from staging to the registry area
123+
// Use schemaFile parameter (just the filename) - Drill will automatically look it up in the registry
124+
String query = "SELECT * FROM table(dfs.`data/data01Int.dat` " +
125+
"(type => 'daffodil', " +
126+
"validationMode => 'true', " +
127+
"schemaFile => '" + schemaFileName + "', " +
128+
"rootName => 'row', " +
129+
"rootNamespace => null))";
130+
131+
QueryBuilder qb = client.queryBuilder();
132+
RowSet results = qb.sql(query).rowSet();
133+
134+
// Step 3: Verify results
135+
assertEquals(1, results.rowCount());
136+
137+
TupleMetadata expectedSchema = new SchemaBuilder()
138+
.add("col", MinorType.INT)
139+
.buildSchema();
140+
141+
RowSet expected = client.rowSetBuilder(expectedSchema)
142+
.addRow(0x00000101) // aka 257
143+
.build();
144+
145+
new RowSetComparison(expected).verifyAndClearAll(results);
146+
147+
// Clean up - drop the schema
148+
client.testBuilder()
149+
.sqlQuery("DROP DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
150+
.unOrdered()
151+
.baselineColumns("ok", "summary")
152+
.baselineValues(true, String.format("Daffodil schema jar %s has been unregistered successfully.", schemaFileName))
153+
.go();
154+
}
155+
156+
/**
157+
* Test the complete lifecycle: register, query, then unregister
158+
*/
159+
@Test
160+
public void testCompleteSchemaLifecycle() throws Exception {
161+
String schemaFileName = "complex1.dfdl.xsd";
162+
File sourceSchema = new File(schemaResourceDir, schemaFileName);
163+
164+
// Copy to staging area
165+
copyFileToStaging(sourceSchema, stagingArea, schemaFileName);
166+
167+
// Register the schema
168+
client.testBuilder()
169+
.sqlQuery("CREATE DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
170+
.unOrdered()
171+
.baselineColumns("ok", "summary")
172+
.baselineValues(true, String.format("Daffodil schema jar %s has been registered successfully.", schemaFileName))
173+
.go();
174+
175+
// Unregister the schema
176+
client.testBuilder()
177+
.sqlQuery("DROP DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
178+
.unOrdered()
179+
.baselineColumns("ok", "summary")
180+
.baselineValues(true, String.format("Daffodil schema jar %s has been unregistered successfully.", schemaFileName))
181+
.go();
182+
183+
// Verify the file is removed from the registry
184+
Path registryArea = hadoopToJavaPath(cluster.drillbit().getContext()
185+
.getRemoteDaffodilSchemaRegistry().getRegistryArea());
186+
File registeredFile = registryArea.resolve(schemaFileName).toFile();
187+
assertTrue("Schema file should be removed from registry after dropping", !registeredFile.exists());
188+
}
189+
190+
/**
191+
* Test querying with multiple rows of data using a registered schema
192+
*/
193+
@Test
194+
public void testQueryMultipleRowsWithRegisteredSchema() throws Exception {
195+
String schemaFileName = "simple.dfdl.xsd";
196+
File sourceSchema = new File(schemaResourceDir, schemaFileName);
197+
198+
// Copy to staging area and register
199+
copyFileToStaging(sourceSchema, stagingArea, schemaFileName);
200+
201+
client.testBuilder()
202+
.sqlQuery("CREATE DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
203+
.unOrdered()
204+
.baselineColumns("ok", "summary")
205+
.baselineValues(true, String.format("Daffodil schema jar %s has been registered successfully.", schemaFileName))
206+
.go();
207+
208+
// Query data with 6 rows using the registered schema
209+
// Use schemaFile parameter (just the filename) - Drill will automatically look it up in the registry
210+
String query = "SELECT * FROM table(dfs.`data/data06Int.dat` " +
211+
"(type => 'daffodil', " +
212+
"validationMode => 'true', " +
213+
"schemaFile => '" + schemaFileName + "', " +
214+
"rootName => 'row', " +
215+
"rootNamespace => null))";
216+
217+
QueryBuilder qb = client.queryBuilder();
218+
RowSet results = qb.sql(query).rowSet();
219+
220+
// Verify results
221+
assertEquals(6, results.rowCount());
222+
223+
TupleMetadata expectedSchema = new SchemaBuilder()
224+
.add("col", MinorType.INT)
225+
.buildSchema();
226+
227+
RowSet expected = client.rowSetBuilder(expectedSchema)
228+
.addRow(0x00000101)
229+
.addRow(0x00000102)
230+
.addRow(0x00000103)
231+
.addRow(0x00000104)
232+
.addRow(0x00000105)
233+
.addRow(0x00000106)
234+
.build();
235+
236+
new RowSetComparison(expected).verifyAndClearAll(results);
237+
238+
// Clean up - drop the schema
239+
client.testBuilder()
240+
.sqlQuery("DROP DAFFODIL SCHEMA USING JAR '%s'", schemaFileName)
241+
.unOrdered()
242+
.baselineColumns("ok", "summary")
243+
.baselineValues(true, String.format("Daffodil schema jar %s has been unregistered successfully.", schemaFileName))
244+
.go();
245+
}
246+
247+
/**
248+
* Helper method to copy a file to the staging area
249+
*/
250+
private void copyFileToStaging(File sourceFile, Path destination, String fileName) throws IOException {
251+
File destFile = destination.resolve(fileName).toFile();
252+
253+
// Ensure the staging directory exists
254+
if (!destination.toFile().exists()) {
255+
destination.toFile().mkdirs();
256+
}
257+
258+
FileUtils.copyFile(sourceFile, destFile);
259+
}
260+
}
Binary file not shown.

0 commit comments

Comments
 (0)