Skip to content

Commit 1d8bcd0

Browse files
mbeckerlecgivre
authored andcommitted
Adding Daffodil to Drill as a 'contrib'
Requires Daffodil version 3.7.0 or higher. New format-daffodil module created Still uses absolute paths for the schemaFileURI. (which is cheating. Wouldn't work in a true distributed drill environment.) We have yet to work out how to enable Drill to provide access for DFDL schemas in XML form with include/import to be resolved. The input data stream is, however, being accessed in the proper Drill manner. Gunzip happened automatically. Nice. Note: Fix boxed Boolean vs. boolean problem. Don't use boxed primitives in Format config objects. Test show this works for data as complex as having nested repeating sub-records. These DFDL types are supported: - int - long - short - byte - boolean - double - float (does not work. Bug DAFFODIL-2367) - hexBinary - string #2835
1 parent b4dd738 commit 1d8bcd0

36 files changed

+2480
-3
lines changed

contrib/format-daffodil/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Daffodil 'Format' Reader
2+
This plugin enables Drill to read DFDL-described data from files by way of the Apache Daffodil DFDL implementation.
3+
4+
## Validation
5+
6+
Data read by Daffodil is always validated using Daffodil's Limited Validation mode.
7+
8+
TBD: do we need an option to control escalating validation errors to fatal? Currently this is not provided.
9+
10+
## Limitations: TBD
11+
12+
At the moment, the DFDL schema is found on the local file system, which won't support Drill's distributed architecture.
13+
14+
There are restrictions on the DFDL schemas that this can handle.
15+
16+
In particular, all element children must have distinct element names, including across choice branches.
17+
(This rules out a number of large DFDL schemas.)
18+
19+
TBD: Auto renaming as part of the Daffodil-to-Drill metadata mapping?
20+
21+
The data is parsed fully from its native form into a Drill data structure held in memory.
22+
No attempt is made to avoid access to parts of the DFDL-described data that are not needed to answer the query.
23+
24+
If the data is not well-formed, an error occurs and the query fails.
25+
26+
If the data is invalid, and validity checking by Daffodil is enabled, then an error occurs and the query fails.
27+

contrib/format-daffodil/pom.xml

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?xml version="1.0"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one
5+
or more contributor license agreements. See the NOTICE file
6+
distributed with this work for additional information
7+
regarding copyright ownership. The ASF licenses this file
8+
to you under the Apache License, Version 2.0 (the
9+
"License"); you may not use this file except in compliance
10+
with the License. You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
20+
-->
21+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<artifactId>drill-contrib-parent</artifactId>
27+
<groupId>org.apache.drill.contrib</groupId>
28+
<version>1.22.0-SNAPSHOT</version>
29+
</parent>
30+
31+
<artifactId>drill-format-daffodil</artifactId>
32+
<name>Drill : Contrib : Format : Daffodil</name>
33+
34+
<dependencies>
35+
<dependency>
36+
<groupId>org.apache.drill.exec</groupId>
37+
<artifactId>drill-java-exec</artifactId>
38+
<version>${project.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.daffodil</groupId>
42+
<artifactId>daffodil-japi_2.12</artifactId>
43+
<version>3.7.0</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.daffodil</groupId>
47+
<artifactId>daffodil-runtime1_2.12</artifactId>
48+
<version>3.7.0</version>
49+
</dependency>
50+
<!-- Test dependencies -->
51+
<dependency>
52+
<groupId>org.apache.drill.exec</groupId>
53+
<artifactId>drill-java-exec</artifactId>
54+
<classifier>tests</classifier>
55+
<version>${project.version}</version>
56+
<scope>test</scope>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>org.apache.drill</groupId>
61+
<artifactId>drill-common</artifactId>
62+
<classifier>tests</classifier>
63+
<version>${project.version}</version>
64+
<scope>test</scope>
65+
</dependency>
66+
</dependencies>
67+
68+
<build>
69+
<plugins>
70+
<plugin>
71+
<artifactId>maven-resources-plugin</artifactId>
72+
<executions>
73+
<execution>
74+
<id>copy-java-sources</id>
75+
<phase>process-sources</phase>
76+
<goals>
77+
<goal>copy-resources</goal>
78+
</goals>
79+
<configuration>
80+
<outputDirectory>${basedir}/target/classes/org/apache/drill/exec/store/daffodil
81+
</outputDirectory>
82+
<resources>
83+
<resource>
84+
<directory>src/main/java/org/apache/drill/exec/store/daffodil</directory>
85+
<filtering>true</filtering>
86+
</resource>
87+
</resources>
88+
</configuration>
89+
</execution>
90+
</executions>
91+
</plugin>
92+
</plugins>
93+
</build>
94+
</project>
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.drill.exec.store.daffodil;
20+
21+
import org.apache.daffodil.japi.DataProcessor;
22+
import org.apache.drill.common.AutoCloseables;
23+
import org.apache.drill.common.exceptions.CustomErrorContext;
24+
import org.apache.drill.common.exceptions.UserException;
25+
import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
26+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
27+
import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
28+
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
29+
import org.apache.drill.exec.record.metadata.TupleMetadata;
30+
import org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory;
31+
import org.apache.drill.exec.store.dfs.DrillFileSystem;
32+
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
33+
import org.apache.hadoop.fs.Path;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.io.IOException;
38+
import java.io.InputStream;
39+
import java.net.URI;
40+
import java.net.URISyntaxException;
41+
import java.util.Objects;
42+
43+
import static org.apache.drill.exec.store.daffodil.schema.DaffodilDataProcessorFactory.*;
44+
import static org.apache.drill.exec.store.daffodil.schema.DrillDaffodilSchemaUtils.daffodilDataProcessorToDrillSchema;
45+
46+
public class DaffodilBatchReader implements ManagedReader {
47+
48+
private static final Logger logger = LoggerFactory.getLogger(DaffodilBatchReader.class);
49+
private final RowSetLoader rowSetLoader;
50+
private final CustomErrorContext errorContext;
51+
private final DaffodilMessageParser dafParser;
52+
private final InputStream dataInputStream;
53+
54+
public DaffodilBatchReader(DaffodilReaderConfig readerConfig, EasySubScan scan,
55+
FileSchemaNegotiator negotiator) {
56+
57+
errorContext = negotiator.parentErrorContext();
58+
DaffodilFormatConfig dafConfig = readerConfig.plugin.getConfig();
59+
60+
String schemaURIString = dafConfig.getSchemaURI(); // "schema/complexArray1.dfdl.xsd";
61+
String rootName = dafConfig.getRootName();
62+
String rootNamespace = dafConfig.getRootNamespace();
63+
boolean validationMode = dafConfig.getValidationMode();
64+
65+
URI dfdlSchemaURI;
66+
try {
67+
dfdlSchemaURI = new URI(schemaURIString);
68+
} catch (URISyntaxException e) {
69+
throw UserException.validationError(e).build(logger);
70+
}
71+
72+
FileDescrip file = negotiator.file();
73+
DrillFileSystem fs = file.fileSystem();
74+
URI fsSchemaURI = fs.getUri().resolve(dfdlSchemaURI);
75+
76+
DaffodilDataProcessorFactory dpf = new DaffodilDataProcessorFactory();
77+
DataProcessor dp;
78+
try {
79+
dp = dpf.getDataProcessor(fsSchemaURI, validationMode, rootName, rootNamespace);
80+
} catch (CompileFailure e) {
81+
throw UserException.dataReadError(e)
82+
.message(String.format("Failed to get Daffodil DFDL processor for: %s", fsSchemaURI))
83+
.addContext(errorContext).addContext(e.getMessage()).build(logger);
84+
}
85+
// Create the corresponding Drill schema.
86+
// Note: this could be a very large schema. Think of a large complex RDBMS schema,
87+
// all of it, hundreds of tables, but all part of the same metadata tree.
88+
TupleMetadata drillSchema = daffodilDataProcessorToDrillSchema(dp);
89+
// Inform Drill about the schema
90+
negotiator.tableSchema(drillSchema, true);
91+
92+
//
93+
// DATA TIME: Next we construct the runtime objects, and open files.
94+
//
95+
// We get the DaffodilMessageParser, which is a stateful driver for daffodil that
96+
// actually does the parsing.
97+
rowSetLoader = negotiator.build().writer();
98+
99+
// We construct the Daffodil InfosetOutputter which the daffodil parser uses to
100+
// convert infoset event calls to fill in a Drill row via a rowSetLoader.
101+
DaffodilDrillInfosetOutputter outputter = new DaffodilDrillInfosetOutputter(rowSetLoader);
102+
103+
// Now we can set up the dafParser with the outputter it will drive with
104+
// the parser-produced infoset.
105+
dafParser = new DaffodilMessageParser(dp); // needs further initialization after this.
106+
dafParser.setInfosetOutputter(outputter);
107+
108+
Path dataPath = file.split().getPath();
109+
// Lastly, we open the data stream
110+
try {
111+
dataInputStream = fs.openPossiblyCompressedStream(dataPath);
112+
} catch (IOException e) {
113+
throw UserException.dataReadError(e)
114+
.message(String.format("Failed to open input file: %s", dataPath.toString()))
115+
.addContext(errorContext).addContext(e.getMessage()).build(logger);
116+
}
117+
// And lastly,... tell daffodil the input data stream.
118+
dafParser.setInputStream(dataInputStream);
119+
}
120+
121+
/**
122+
* This is the core of actual processing - data movement from Daffodil to Drill.
123+
* <p>
124+
* If there is space in the batch, and there is data available to parse then this calls the
125+
* daffodil parser, which parses data, delivering it to the rowWriter by way of the infoset
126+
* outputter.
127+
* <p>
128+
* Repeats until the rowWriter is full (a batch is full), or there is no more data, or a parse
129+
* error ends execution with a throw.
130+
* <p>
131+
* Validation errors and other warnings are not errors and are logged but do not cause parsing to
132+
* fail/throw.
133+
*
134+
* @return true if there are rows retrieved, false if no rows were retrieved, which means no more
135+
* will ever be retrieved (end of data).
136+
* @throws RuntimeException
137+
* on parse errors.
138+
*/
139+
@Override
140+
public boolean next() {
141+
// Check assumed invariants
142+
// We don't know if there is data or not. This could be called on an empty data file.
143+
// We DO know that this won't be called if there is no space in the batch for even 1
144+
// row.
145+
if (dafParser.isEOF()) {
146+
return false; // return without even checking for more rows or trying to parse.
147+
}
148+
while (rowSetLoader.start() && !dafParser.isEOF()) { // we never zero-trip this loop.
149+
// the predicate is always true once.
150+
dafParser.parse();
151+
if (dafParser.isProcessingError()) {
152+
assert (Objects.nonNull(dafParser.getDiagnostics()));
153+
throw UserException.dataReadError().message(dafParser.getDiagnosticsAsString())
154+
.addContext(errorContext).build(logger);
155+
}
156+
if (dafParser.isValidationError()) {
157+
logger.warn(dafParser.getDiagnosticsAsString());
158+
// Note that even if daffodil is set to not validate, validation errors may still occur
159+
// from DFDL's "recoverableError" assertions.
160+
}
161+
rowSetLoader.save();
162+
}
163+
int nRows = rowSetLoader.rowCount();
164+
assert nRows > 0; // This cannot be zero. If the parse failed we will have already thrown out
165+
// of here.
166+
return true;
167+
}
168+
169+
@Override
170+
public void close() {
171+
AutoCloseables.closeSilently(dataInputStream);
172+
}
173+
}
174+
175+
class DaffodilReaderConfig {
176+
final DaffodilFormatPlugin plugin;
177+
178+
DaffodilReaderConfig(DaffodilFormatPlugin plugin) {
179+
this.plugin = plugin;
180+
}
181+
}

0 commit comments

Comments
 (0)