Skip to content

Commit 1c21cf4

Browse files
authored
Validate query engine presence when table format engine is configured (#1491)
1 parent 081a30e commit 1c21cf4

File tree

21 files changed

+221
-129
lines changed

21 files changed

+221
-129
lines changed

sqrl-cli/src/main/java/com/datasqrl/cli/AbstractCmd.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.datasqrl.error.ErrorCollector;
2121
import com.datasqrl.error.ErrorPrinter;
2222
import com.datasqrl.util.OsProcessManager;
23+
import com.google.inject.ProvisionException;
2324
import java.nio.file.Path;
2425
import java.util.concurrent.atomic.AtomicInteger;
2526
import lombok.SneakyThrows;
@@ -47,10 +48,14 @@ public void run() {
4748
try {
4849
runInternal(collector);
4950
cli.statusHook.onSuccess(collector);
50-
} catch (CollectedException e) {
51-
if (e.isInternalError()) e.printStackTrace();
52-
e.printStackTrace();
53-
cli.statusHook.onFailure(e, collector);
51+
52+
} catch (ProvisionException | CollectedException e) {
53+
var ce = unwrapCollectedException(e);
54+
if (ce.isInternalError()) {
55+
ce.printStackTrace();
56+
}
57+
cli.statusHook.onFailure(ce, collector);
58+
5459
} catch (Throwable e) { // unknown exception
5560
collector.getCatcher().handle(e);
5661
e.printStackTrace();
@@ -90,4 +95,28 @@ protected Path getTargetDir() {
9095
public int getExitCode() {
9196
return exitCode.get();
9297
}
98+
99+
/**
100+
* Unwraps {@link CollectedException} from Guice {@link ProvisionException} wrappers to provide
101+
* clean error messages.
102+
*
103+
* <p>Validation errors during dependency injection (e.g., in pipeline configuration) are thrown
104+
* as {@link CollectedException} with clear messages. Guice wraps them in {@link
105+
* ProvisionException}, obscuring the original error with verbose DI stack traces.
106+
*
107+
* @param e the runtime exception that may be a CollectedException or contain one as a cause
108+
* @return the unwrapped CollectedException
109+
* @throws RuntimeException the original exception if it doesn't contain a CollectedException
110+
*/
111+
private CollectedException unwrapCollectedException(RuntimeException e) {
112+
if (e instanceof CollectedException ce) {
113+
return ce;
114+
}
115+
116+
if (e.getCause() instanceof CollectedException ce) {
117+
return ce;
118+
}
119+
120+
throw e;
121+
}
93122
}

sqrl-cli/src/main/java/com/datasqrl/compile/CompilationProcess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public Pair<PhysicalPlan, TestPlan> executeCompilation(Optional<Path> testsPath)
6262
var environment =
6363
new Sqrl2FlinkSQLTranslator(
6464
buildPath,
65-
(FlinkStreamEngine) planner.getStreamStage().getEngine(),
65+
(FlinkStreamEngine) planner.getStreamStage().engine(),
6666
config.getCompilerConfig());
6767
planner.planMain(mainScript, environment);
6868
var dagBuilder = planner.getDagBuilder();

sqrl-cli/src/main/java/com/datasqrl/packager/Packager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ public void postprocess(Path targetDir, PhysicalPlan plan, TestPlan testPlan) {
291291
// We'll write a single asset for each folder in the physical plan stage, plus any deployment
292292
// artifacts that the plan has
293293
for (PhysicalStagePlan stagePlan : plan.getStagePlans()) {
294-
writePlan(stagePlan.getStage().getName(), stagePlan.getPlan(), planDir);
294+
writePlan(stagePlan.getStage().name(), stagePlan.getPlan(), planDir);
295295
}
296296

297297
if (testPlan != null) {

sqrl-planner/src/main/java/com/datasqrl/config/PipelineFactory.java

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import com.datasqrl.cli.EngineIds;
1919
import com.datasqrl.engine.ExecutionEngine;
20-
import com.datasqrl.engine.IExecutionEngine;
2120
import com.datasqrl.engine.database.DatabaseEngine;
2221
import com.datasqrl.engine.database.QueryEngine;
2322
import com.datasqrl.engine.export.PrintEngineFactory;
@@ -35,13 +34,8 @@
3534
import java.util.Set;
3635
import lombok.Getter;
3736
import lombok.NonNull;
38-
import org.apache.commons.lang3.tuple.Pair;
3937

40-
/**
41-
* Configuration for the engines
42-
*
43-
* <p>
44-
*/
38+
/** Configuration for the engines */
4539
public class PipelineFactory {
4640

4741
public static final List<String> defaultEngines = List.of(PrintEngineFactory.NAME);
@@ -59,18 +53,28 @@ public PipelineFactory(
5953
this.engineConfig = engineConfig;
6054
}
6155

56+
public ExecutionPipeline createPipeline() {
57+
var errorCollector = injector.getInstance(ErrorCollector.class);
58+
return SimplePipeline.of(getEngines(), errorCollector);
59+
}
60+
61+
public Map<String, ExecutionEngine> getEngines() {
62+
return getEngines(Optional.empty());
63+
}
64+
6265
private Map<String, ExecutionEngine> getEngines(Optional<EngineType> engineType) {
6366
Map<String, ExecutionEngine> engines = new HashMap<>();
6467
Set<String> allEngines = new HashSet<>(enabledEngines);
6568
allEngines.addAll(defaultEngines);
69+
6670
for (String engineId : allEngines) {
6771
if (engineId.equalsIgnoreCase(EngineIds.TEST)) {
6872
continue;
6973
}
7074
var engineFactory =
7175
ServiceLoaderDiscovery.get(EngineFactory.class, EngineFactory::getEngineName, engineId);
7276

73-
IExecutionEngine engine = injector.getInstance(engineFactory.getFactoryClass());
77+
var engine = injector.getInstance(engineFactory.getFactoryClass());
7478
if (engineType.map(type -> engine.getType() == type).orElse(true)) {
7579
engines.put(engineId, (ExecutionEngine) engine);
7680
}
@@ -86,25 +90,4 @@ private Map<String, ExecutionEngine> getEngines(Optional<EngineType> engineType)
8690
.forEach(databaseEngine::addQueryEngine));
8791
return engines;
8892
}
89-
90-
public Map<String, ExecutionEngine> getEngines() {
91-
return getEngines(Optional.empty());
92-
}
93-
94-
public Pair<String, ExecutionEngine> getEngine(EngineType type) {
95-
var engines = getEngines(Optional.of(type));
96-
// Todo: error collector
97-
var errors = ErrorCollector.root();
98-
errors.checkFatal(
99-
!engines.isEmpty(), "Need to configure a %s engine", type.name().toLowerCase());
100-
errors.checkFatal(
101-
engines.size() == 1,
102-
"Currently support only a single %s engine",
103-
type.name().toLowerCase());
104-
return Pair.of(engines.entrySet().iterator().next());
105-
}
106-
107-
public ExecutionPipeline createPipeline() {
108-
return SimplePipeline.of(getEngines(), /*todo*/ ErrorCollector.root());
109-
}
11093
}

sqrl-planner/src/main/java/com/datasqrl/engine/pipeline/EngineStage.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,11 @@
1717

1818
import com.datasqrl.engine.EngineFeature;
1919
import com.datasqrl.engine.ExecutionEngine;
20-
import lombok.Value;
2120

22-
@Value
23-
public class EngineStage implements ExecutionStage {
24-
25-
String name;
26-
ExecutionEngine engine;
21+
public record EngineStage(String name, ExecutionEngine engine) implements ExecutionStage {
2722

2823
@Override
2924
public boolean supportsFeature(EngineFeature capability) {
3025
return engine.supports(capability);
3126
}
32-
33-
// @Override
34-
// public boolean supportsFunction(FunctionDefinition function) {
35-
// return engine.supports(function);
36-
// }
37-
3827
}

sqrl-planner/src/main/java/com/datasqrl/engine/pipeline/ExecutionPipeline.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626

2727
public interface ExecutionPipeline {
2828

29-
List<ExecutionStage> getStages();
29+
List<ExecutionStage> stages();
3030

3131
default List<ExecutionStage> getReadStages() {
32-
return getStages().stream().filter(ExecutionStage::isRead).collect(Collectors.toList());
32+
return stages().stream().filter(ExecutionStage::isRead).collect(Collectors.toList());
3333
}
3434

3535
/**
@@ -40,12 +40,12 @@ default List<ExecutionStage> getReadStages() {
4040
default Optional<ServerEngine> getServerEngine() {
4141
return StreamUtil.getOnlyElement(
4242
getStagesByType(EngineType.SERVER).stream()
43-
.map(stage -> (ServerEngine) stage.getEngine())
43+
.map(stage -> (ServerEngine) stage.engine())
4444
.distinct());
4545
}
4646

4747
default boolean hasReadStages() {
48-
return getStages().stream().anyMatch(ExecutionStage::isRead);
48+
return stages().stream().anyMatch(ExecutionStage::isRead);
4949
}
5050

5151
Set<ExecutionStage> getUpStreamFrom(ExecutionStage stage);
@@ -54,18 +54,18 @@ default boolean hasReadStages() {
5454

5555
default Optional<ExecutionStage> getStage(String name) {
5656
return StreamUtil.getOnlyElement(
57-
getStages().stream().filter(s -> s.getName().equalsIgnoreCase(name)));
57+
stages().stream().filter(s -> s.name().equalsIgnoreCase(name)));
5858
}
5959

6060
default Optional<ExecutionStage> getMutationStage() {
6161
return StreamUtil.getOnlyElement(
6262
getStagesByType(EngineType.LOG).stream()
63-
.filter(stage -> stage.getEngine().supports(EngineFeature.MUTATIONS)));
63+
.filter(stage -> stage.engine().supports(EngineFeature.MUTATIONS)));
6464
}
6565

6666
default Optional<ExecutionStage> getStageByType(EngineType type) {
6767
return StreamUtil.getOnlyElement(
68-
getStages().stream().filter(s -> s.getEngine().getType().equals(type)));
68+
stages().stream().filter(s -> s.engine().getType().equals(type)));
6969
}
7070

7171
/**
@@ -77,8 +77,8 @@ default Optional<ExecutionStage> getStageByType(EngineType type) {
7777
* @return the stage for a given {@link EngineType}.
7878
*/
7979
default List<ExecutionStage> getStagesByType(EngineType type) {
80-
return getStages().stream()
81-
.filter(s -> s.getEngine().getType().equals(type))
80+
return stages().stream()
81+
.filter(s -> s.engine().getType().equals(type))
8282
.collect(Collectors.toList());
8383
}
8484
}

sqrl-planner/src/main/java/com/datasqrl/engine/pipeline/ExecutionStage.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,33 +23,31 @@
2323

2424
public interface ExecutionStage {
2525

26-
String getName();
26+
String name();
2727

2828
default boolean supportsAllFeatures(Collection<EngineFeature> capabilities) {
2929
return capabilities.stream().allMatch(this::supportsFeature);
3030
}
3131

3232
default EngineType getType() {
33-
return getEngine().getType();
33+
return engine().getType();
3434
}
3535

3636
boolean supportsFeature(EngineFeature capability);
3737

38-
// boolean supportsFunction(FunctionDefinition function);
39-
4038
default boolean isRead() {
41-
return getEngine().getType().isRead();
39+
return engine().getType().isRead();
4240
}
4341

4442
default boolean isWrite() {
45-
return getEngine().getType().isWrite();
43+
return engine().getType().isWrite();
4644
}
4745

4846
default boolean isCompute() {
49-
return getEngine().getType().isCompute();
47+
return engine().getType().isCompute();
5048
}
5149

52-
ExecutionEngine getEngine();
50+
ExecutionEngine engine();
5351

5452
default boolean supportsFunction(SqlOperator operator) {
5553
return true;

0 commit comments

Comments
 (0)