Skip to content

Commit 0ae251f

Browse files
committed
Added SQL
1 parent 0854cc8 commit 0ae251f

File tree

10 files changed

+1054
-176
lines changed

10 files changed

+1054
-176
lines changed

exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
3636
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
3737
import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
38-
import org.apache.drill.exec.planner.common.DaffodilSchemaRegistry;
38+
import org.apache.drill.exec.schema.daffodil.RemoteDaffodilSchemaRegistry;
3939
import org.apache.drill.exec.planner.physical.PlannerSettings;
4040
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
4141
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -318,8 +318,8 @@ public RemoteFunctionRegistry getRemoteFunctionRegistry() {
318318
return drillbitContext.getRemoteFunctionRegistry();
319319
}
320320

321-
public DaffodilSchemaRegistry getDaffodilSchemaRegistry() {
322-
return drillbitContext.getDaffodilSchemaProvider();
321+
public RemoteDaffodilSchemaRegistry getDaffodilSchemaRegistry() {
322+
return drillbitContext.getRemoteDaffodilSchemaRegistry();
323323
}
324324

325325
@Override

exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateDaffodilSchemaHandler.java

Lines changed: 61 additions & 136 deletions
Large diffs are not rendered by default.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.drill.exec.planner.sql.handlers;
19+
20+
import com.google.common.collect.Lists;
21+
import org.apache.calcite.sql.SqlCharStringLiteral;
22+
import org.apache.calcite.sql.SqlNode;
23+
import org.apache.drill.common.exceptions.DrillRuntimeException;
24+
import org.apache.drill.exec.exception.VersionMismatchException;
25+
import org.apache.drill.exec.physical.PhysicalPlan;
26+
import org.apache.drill.exec.planner.sql.DirectPlan;
27+
import org.apache.drill.exec.planner.sql.parser.SqlDropDaffodilSchema;
28+
import org.apache.drill.exec.proto.UserBitShared.Jar;
29+
import org.apache.drill.exec.proto.UserBitShared.Registry;
30+
import org.apache.drill.exec.schema.daffodil.RemoteDaffodilSchemaRegistry;
31+
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
32+
import org.apache.drill.exec.work.foreman.ForemanSetupException;
33+
import org.apache.hadoop.fs.FileSystem;
34+
import org.apache.hadoop.fs.Path;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
import java.io.IOException;
39+
import java.util.List;
40+
41+
public class DropDaffodilSchemaHandler extends DefaultSqlHandler {
42+
private static Logger logger = LoggerFactory.getLogger(DropDaffodilSchemaHandler.class);
43+
44+
public DropDaffodilSchemaHandler(SqlHandlerConfig config) {
45+
super(config);
46+
}
47+
48+
/**
49+
* Unregisters Daffodil schema JARs dynamically. Process consists of several steps:
50+
* <ol>
51+
* <li>Registering jar in jar registry to ensure that the jar is not being unregistered elsewhere.</li>
52+
* <li>Starts remote unregistration process, gets list of all jars and excludes jar to be deleted.</li>
53+
* <li>Removes jar from registry area.</li>
54+
* </ol>
55+
*
56+
* Only jars registered dynamically can be unregistered.
57+
*
58+
* Limitation: before jar unregistration make sure no one is using schemas from this jar.
59+
* There is no guarantee that running queries will finish successfully or give correct result.
60+
*
61+
* @return - Single row indicating successful unregistration, raise exception otherwise
62+
*/
63+
@Override
64+
public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
65+
SqlDropDaffodilSchema node = unwrap(sqlNode, SqlDropDaffodilSchema.class);
66+
String jarName = ((SqlCharStringLiteral) node.getJar()).toValue();
67+
RemoteDaffodilSchemaRegistry remoteSchemaRegistry = context.getDaffodilSchemaRegistry();
68+
69+
boolean inProgress = false;
70+
try {
71+
final String action = remoteSchemaRegistry.addToJars(jarName, RemoteDaffodilSchemaRegistry.Action.UNREGISTRATION);
72+
if (!(inProgress = action == null)) {
73+
return DirectPlan.createDirectPlan(context, false, String.format("Jar with %s name is used. Action: %s", jarName, action));
74+
}
75+
76+
Jar deletedJar = unregister(jarName, remoteSchemaRegistry);
77+
if (deletedJar == null) {
78+
return DirectPlan.createDirectPlan(context, false, String.format("Jar %s is not registered in remote registry", jarName));
79+
}
80+
81+
removeJarFromArea(jarName, remoteSchemaRegistry.getFs(), remoteSchemaRegistry.getRegistryArea());
82+
83+
return DirectPlan.createDirectPlan(context, true,
84+
String.format("Daffodil schema jar %s has been unregistered successfully.", jarName));
85+
86+
} catch (Exception e) {
87+
logger.error("Error during Daffodil schema unregistration", e);
88+
return DirectPlan.createDirectPlan(context, false, e.getMessage());
89+
} finally {
90+
if (inProgress) {
91+
remoteSchemaRegistry.removeFromJars(jarName);
92+
}
93+
}
94+
}
95+
96+
/**
97+
* Gets remote schema registry with version.
98+
* Version is used to ensure that we update the same registry we removed jars from.
99+
* Looks for a jar to be deleted, if found,
100+
* attempts to update remote registry with list of jars, that excludes jar to be deleted.
101+
* If during update {@link VersionMismatchException} was detected,
102+
* attempts to repeat unregistration process till retry attempts exceeds the limit.
103+
* If retry attempts number hits 0, throws exception that failed to update remote schema registry.
104+
*
105+
* @param jarName jar name
106+
* @param remoteSchemaRegistry remote schema registry
107+
* @return jar that was unregistered, null otherwise
108+
*/
109+
private Jar unregister(String jarName, RemoteDaffodilSchemaRegistry remoteSchemaRegistry) {
110+
int retryAttempts = remoteSchemaRegistry.getRetryAttempts();
111+
while (retryAttempts >= 0) {
112+
DataChangeVersion version = new DataChangeVersion();
113+
Registry registry = remoteSchemaRegistry.getRegistry(version);
114+
Jar jarToBeDeleted = null;
115+
List<Jar> jars = Lists.newArrayList();
116+
for (Jar j : registry.getJarList()) {
117+
if (j.getName().equals(jarName)) {
118+
jarToBeDeleted = j;
119+
} else {
120+
jars.add(j);
121+
}
122+
}
123+
if (jarToBeDeleted == null) {
124+
return null;
125+
}
126+
Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
127+
try {
128+
remoteSchemaRegistry.updateRegistry(updatedRegistry, version);
129+
return jarToBeDeleted;
130+
} catch (VersionMismatchException ex) {
131+
logger.debug("Failed to update schema registry during unregistration, version mismatch was detected.", ex);
132+
retryAttempts--;
133+
}
134+
}
135+
throw new DrillRuntimeException("Failed to update remote schema registry. Exceeded retry attempts limit.");
136+
}
137+
138+
/**
139+
* Removes jar from indicated area, in case of error log it and proceeds.
140+
*
141+
* @param jarName jar name
142+
* @param fs file system
143+
* @param area path to area
144+
*/
145+
private void removeJarFromArea(String jarName, FileSystem fs, Path area) {
146+
try {
147+
fs.delete(new Path(area, jarName), false);
148+
} catch (IOException e) {
149+
logger.error("Error removing jar {} from area {}", jarName, area.toUri().getPath());
150+
}
151+
}
152+
}

exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropDaffodilSchema.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.calcite.sql.SqlWriter;
2828
import org.apache.calcite.sql.parser.SqlParserPos;
2929
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
30-
import org.apache.drill.exec.planner.sql.handlers.DropFunctionHandler;
30+
import org.apache.drill.exec.planner.sql.handlers.DropDaffodilSchemaHandler;
3131
import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
3232

3333
import java.util.List;
@@ -72,7 +72,7 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
7272

7373
@Override
7474
public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
75-
return new DropFunctionHandler(config);
75+
return new DropDaffodilSchemaHandler(config);
7676
}
7777

7878
public SqlNode getJar() { return jar; }

exec/java-exec/src/main/java/org/apache/drill/exec/schema/daffodil/DaffodilSchemaProvider.java

Lines changed: 12 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,60 +17,40 @@
1717
*/
1818
package org.apache.drill.exec.schema.daffodil;
1919

20-
import org.apache.commons.lang3.StringUtils;
2120
import org.apache.drill.common.AutoCloseables;
2221
import org.apache.drill.common.config.DrillConfig;
2322
import org.apache.drill.common.scanner.ClassPathScanner;
2423
import org.apache.drill.common.scanner.persistence.ScanResult;
25-
import org.apache.drill.exec.oauth.PersistentTokenRegistry;
26-
import org.apache.drill.exec.oauth.TokenRegistry;
24+
import org.apache.drill.exec.coord.ClusterCoordinator;
2725
import org.apache.drill.exec.server.DrillbitContext;
26+
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
2827

2928
/**
30-
* Class for managing daffodil schemata. Schemata will be obtained via INSTALL/CREATE SCHEMA queries.
29+
* Class for managing Daffodil schemata. Schemata will be obtained via CREATE DAFFODIL SCHEMA queries.
3130
*/
3231
public class DaffodilSchemaProvider implements AutoCloseable {
33-
private static final String STORAGE_REGISTRY_PATH = "daffodil_schema";
3432

35-
private final DrillbitContext context;
36-
37-
private PersistentTokenRegistry daffodilSchemaRegistry;
33+
private RemoteDaffodilSchemaRegistry remoteDaffodilSchemaRegistry;
3834

3935
public DaffodilSchemaProvider(DrillbitContext context) {
40-
this(context.getConfig(), ClassPathScanner.fromPrescan(context.getConfig()));
36+
this(context.getConfig(), context.getStoreProvider(), context.getClusterCoordinator());
4137
}
4238

4339
public DaffodilSchemaProvider(DrillConfig config, ScanResult classpathScan) {
44-
this(config, classpathScan, null);
40+
// This constructor is incomplete - needs StoreProvider and ClusterCoordinator
4541
}
4642

47-
public DaffodilSchemaProvider(DrillConfig config, ScanResult classpathScan, String username) {
48-
43+
public DaffodilSchemaProvider(DrillConfig config, PersistentStoreProvider storeProvider, ClusterCoordinator coordinator) {
44+
this.remoteDaffodilSchemaRegistry = new RemoteDaffodilSchemaRegistry();
45+
this.remoteDaffodilSchemaRegistry.init(config, storeProvider, coordinator);
4946
}
5047

51-
public TokenRegistry getDaffodilSchemaRegistry(String username) {
52-
if (daffodilSchemaRegistry == null) {
53-
initRemoteRegistries(username);
54-
}
55-
return daffodilSchemaRegistry;
56-
}
57-
58-
private synchronized void initRemoteRegistries(String username) {
59-
// Add the username to the path if present
60-
String finalpath;
61-
if (StringUtils.isNotEmpty(username)) {
62-
finalpath = STORAGE_REGISTRY_PATH + "/" + username;
63-
} else {
64-
finalpath = STORAGE_REGISTRY_PATH;
65-
}
66-
67-
if (daffodilSchemaRegistry == null) {
68-
daffodilSchemaRegistry = new PersistentTokenRegistry(context, finalpath);
69-
}
48+
public RemoteDaffodilSchemaRegistry getRemoteDaffodilSchemaRegistry() {
49+
return remoteDaffodilSchemaRegistry;
7050
}
7151

7252
@Override
7353
public void close() throws Exception {
74-
AutoCloseables.closeSilently(daffodilSchemaRegistry);
54+
AutoCloseables.closeSilently(remoteDaffodilSchemaRegistry);
7555
}
7656
}

0 commit comments

Comments
 (0)