Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run ",
"modification": 1
"https://github.com/apache/beam/pull/36890": "fixing some null errors"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -40,7 +42,6 @@
* org.apache.beam.sdk.extensions.sql.meta.store.MetaStore}. In Beam SQL, a DATABASE refers to a
* {@link BeamCalciteSchema}.
*/
@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497)
public class BeamCalciteSchema implements Schema {
private JdbcConnection connection;
private TableProvider tableProvider;
Expand Down Expand Up @@ -94,7 +95,9 @@ public Schema snapshot(SchemaVersion version) {
}

@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
checkArgumentNotNull(
parentSchema, "Cannot convert BeamCalciteSchema to Expression without parent schema");
return Schemas.subSchemaExpression(parentSchema, name, getClass());
}

Expand All @@ -114,7 +117,7 @@ public Set<String> getTypeNames() {
}

@Override
public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table getTable(
public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.@Nullable Table getTable(
String name) {
Table table = tableProvider.getTable(name);
if (table == null) {
Expand Down Expand Up @@ -148,14 +151,20 @@ public Set<String> getSubSchemaNames() {
* <p>Otherwise, the sub-schema is derived from the {@link TableProvider} implementation.
*/
@Override
public Schema getSubSchema(String name) {
if (!subSchemas.containsKey(name)) {
BeamCalciteSchema subSchema;
@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
subSchema = subProvider != null ? new BeamCalciteSchema(name, connection, subProvider) : null;
subSchemas.put(name, subSchema);
public @Nullable Schema getSubSchema(String name) {
BeamCalciteSchema subSchema = subSchemas.get(name);

if (subSchema != null) {
return subSchema;
}

@Nullable TableProvider subProvider = tableProvider.getSubProvider(name);
if (subProvider == null) {
return null;
}

return subSchemas.get(name);
subSchema = new BeamCalciteSchema(name, connection, subProvider);
subSchemas.put(name, subSchema);
return subSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Factory classes that Calcite uses to create initial schema for JDBC connection.
Expand All @@ -57,22 +58,21 @@
* <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
* org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class BeamCalciteSchemaFactory {

/**
* Called by {@link JdbcConnection} when initializing to convert the initial empty schema to
* actual {@link BeamCalciteSchema}.
*/
static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) {
InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
InitialEmptySchema initialEmptySchema =
(InitialEmptySchema) jdbcConnection.getCurrentBeamSchema();
return initialEmptySchema.getTableProvider();
}

static CatalogManager catalogFromInitialEmptySchema(JdbcConnection jdbcConnection) {
InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
InitialEmptySchema initialEmptySchema =
(InitialEmptySchema) jdbcConnection.getCurrentBeamSchema();
return initialEmptySchema.getCatalogManager();
}

Expand Down Expand Up @@ -209,7 +209,7 @@ public Set<String> getSubSchemaNames() {
}

@Override
public Expression getExpression(SchemaPlus parentSchema, String name) {
public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) {
return illegal();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,21 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.TranslatableTable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Adapter from {@link BeamSqlTable} to a calcite Table. */
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BeamCalciteTable extends AbstractQueryableTable
implements ModifiableTable, TranslatableTable {
private final BeamSqlTable beamTable;
// These two options should be unified.
// https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7590
private final Map<String, String> pipelineOptionsMap;
private PipelineOptions pipelineOptions;
private @Nullable PipelineOptions pipelineOptions;

BeamCalciteTable(
BeamSqlTable beamTable,
Map<String, String> pipelineOptionsMap,
PipelineOptions pipelineOptions) {
@Nullable PipelineOptions pipelineOptions) {
super(Object[].class);
this.beamTable = beamTable;
this.pipelineOptionsMap = pipelineOptionsMap;
Expand Down Expand Up @@ -117,7 +114,7 @@ public <T> Queryable<T> asQueryable(
}

@Override
public Collection getModifiableCollection() {
public @Nullable Collection getModifiableCollection() {
return null;
}

Expand All @@ -128,8 +125,8 @@ public TableModify toModificationRel(
Prepare.CatalogReader catalogReader,
RelNode child,
TableModify.Operation operation,
List<String> updateColumnList,
List<RexNode> sourceExpressionList,
@Nullable List<String> updateColumnList,
@Nullable List<RexNode> sourceExpressionList,
boolean flattened) {
return new BeamIOSinkRel(
cluster,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.extensions.sql.impl;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.sql.SQLException;
import java.util.AbstractMap.SimpleEntry;
Expand Down Expand Up @@ -51,17 +50,13 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Contains the metadata of tables/UDF functions, and exposes APIs to
* query/validate/optimize/translate SQL statements.
*/
@Internal
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class BeamSqlEnv {
JdbcConnection connection;
QueryPlanner planner;
Expand Down Expand Up @@ -151,16 +146,14 @@ public static class BeamSqlEnvBuilder {
"org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
private String queryPlannerClassName;
private CatalogManager catalogManager;
private String currentSchemaName;
private @Nullable String currentSchemaName = null;
private Map<String, TableProvider> schemaMap;
private Set<Map.Entry<String, Function>> functionSet;
private boolean autoLoadUdfs;
private PipelineOptions pipelineOptions;
private @Nullable PipelineOptions pipelineOptions;
private Collection<RuleSet> ruleSets;

private BeamSqlEnvBuilder(TableProvider tableProvider) {
checkNotNull(tableProvider, "Table provider for the default schema must be sets.");

if (tableProvider instanceof MetaStore) {
catalogManager = new InMemoryCatalogManager((MetaStore) tableProvider);
} else {
Expand All @@ -176,8 +169,6 @@ private BeamSqlEnvBuilder(TableProvider tableProvider) {
}

private BeamSqlEnvBuilder(CatalogManager catalogManager) {
checkNotNull(catalogManager, "Catalog manager for the default schema must be set.");

this.catalogManager = catalogManager;
this.queryPlannerClassName = CALCITE_PLANNER;
this.schemaMap = new HashMap<>();
Expand Down Expand Up @@ -287,7 +278,9 @@ private void configureSchemas(JdbcConnection jdbcConnection) {
// Does not update the current default schema.
schemaMap.forEach(jdbcConnection::setSchema);

if (Strings.isNullOrEmpty(currentSchemaName)) {
// Fix it in a local variable so static analysis knows it cannot be mutated.
@Nullable String currentSchemaName = this.currentSchemaName;
if (currentSchemaName == null || currentSchemaName.isEmpty()) {
return;
}

Expand Down Expand Up @@ -328,9 +321,18 @@ private QueryPlanner instantiatePlanner(
"Cannot find requested QueryPlanner class: " + queryPlannerClassName, exc);
}

// This try/catch kept deliberately tight to ensure that we _only_ catch exceptions due to
// this reflective access.
QueryPlanner.Factory factory;
try {
factory = (QueryPlanner.Factory) queryPlannerClass.getField("FACTORY").get(null);
// See https://github.com/typetools/jdk/pull/235#pullrequestreview-3400922783
@SuppressWarnings("nullness")
Object queryPlannerFactoryObj =
checkStateNotNull(
queryPlannerClass.getField("FACTORY").get(null),
"Static field %s.FACTORY is null. It must be a QueryPlanner.Factory instance.",
queryPlannerClass);
factory = (QueryPlanner.Factory) queryPlannerFactoryObj;
} catch (NoSuchFieldException | IllegalAccessException exc) {
throw new RuntimeException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import java.sql.SQLException;
import java.util.Collections;
import java.util.Map;
Expand All @@ -27,6 +29,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteConnection;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -38,9 +41,6 @@
* {@link BeamCalciteSchema BeamCalciteSchemas} keep reference to this connection. Pipeline options
* are stored here.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class JdbcConnection extends CalciteConnectionWrapper {
/**
* Connection string parameters that begin with {@code "beam."} will be interpreted as {@link
Expand All @@ -49,7 +49,7 @@ public class JdbcConnection extends CalciteConnectionWrapper {
private static final String PIPELINE_OPTION_PREFIX = "beam.";

private Map<String, String> pipelineOptionsMap;
private PipelineOptions pipelineOptions;
private @Nullable PipelineOptions pipelineOptions;

private JdbcConnection(CalciteConnection connection) throws SQLException {
super(connection);
Expand All @@ -62,16 +62,16 @@ private JdbcConnection(CalciteConnection connection) throws SQLException {
* <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
* created by {@link BeamCalciteSchemaFactory}.
*/
static @Nullable JdbcConnection initialize(CalciteConnection connection) {
static JdbcConnection initialize(CalciteConnection connection) {
try {
if (connection == null) {
return null;
}
String currentSchemaName =
checkStateNotNull(
connection.getSchema(), "When trying to initialize JdbcConnection: No schema set.");

JdbcConnection jdbcConnection = new JdbcConnection(connection);
jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
jdbcConnection.setSchema(
connection.getSchema(),
currentSchemaName,
BeamCalciteSchemaFactory.catalogFromInitialEmptySchema(jdbcConnection));
return jdbcConnection;
} catch (SQLException e) {
Expand Down Expand Up @@ -107,27 +107,29 @@ public void setPipelineOptions(PipelineOptions pipelineOptions) {
this.pipelineOptions = pipelineOptions;
}

public PipelineOptions getPipelineOptions() {
public @Nullable PipelineOptions getPipelineOptions() {
return this.pipelineOptions;
}

/** Get the current default schema from the root schema. */
@SuppressWarnings("TypeParameterUnusedInFormals")
<T> T getCurrentBeamSchema() {
try {
return (T) CalciteSchema.from(getRootSchema().getSubSchema(getSchema())).schema;
} catch (SQLException e) {
throw new RuntimeException(e);
}
Schema getCurrentBeamSchema() {
return CalciteSchema.from(getCurrentSchemaPlus()).schema;
}

/** Calcite-created {@link SchemaPlus} wrapper for the current schema. */
public SchemaPlus getCurrentSchemaPlus() {
String currentSchema;
try {
return getRootSchema().getSubSchema(getSchema());
currentSchema = checkStateNotNull(getSchema(), "Current schema not set");
} catch (SQLException e) {
throw new RuntimeException(e);
}

return checkStateNotNull(
getRootSchema().getSubSchema(currentSchema),
"SubSchema not found in `%s`: %s",
getRootSchema().getName(),
currentSchema);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.rules.CoreRules;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.runtime.Hook;
import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* Calcite JDBC driver with Beam defaults.
Expand All @@ -56,10 +57,6 @@
* <p>The querystring-style parameters are parsed as {@link PipelineOptions}.
*/
@AutoService(java.sql.Driver.class)
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class JdbcDriver extends Driver {
public static final JdbcDriver INSTANCE = new JdbcDriver();
public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
Expand Down Expand Up @@ -129,10 +126,18 @@ protected String getConnectStringPrefix() {
* CalciteConnection}.
*/
@Override
public Connection connect(String url, Properties info) throws SQLException {
@SuppressWarnings("override.return") // https://github.com/typetools/jdk/pull/246
public @Nullable Connection connect(String url, Properties info) throws SQLException {
@Nullable CalciteConnection connection = (CalciteConnection) super.connect(url, info);

// null here means that CalciteConnection is not a "suitable driver" based on the parameters
if (connection == null) {
return null;
}

// calciteConnection is initialized with an empty Beam schema,
// we need to populate it with pipeline options, load table providers, etc
return JdbcConnection.initialize((CalciteConnection) super.connect(url, info));
return JdbcConnection.initialize(connection);
}

/**
Expand Down Expand Up @@ -176,6 +181,12 @@ private static JdbcConnection getConnection(PipelineOptions options) {
JdbcConnection connection;
try {
connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
// Normally, #connect is allowed to return null when the URL is not suitable. Here, however,
// we are
// deliberately passing a bogus URL to instantiate a connection, so it should never be null.
if (connection == null) {
throw new SQLException("Unexpected null when creating synthetic Beam JdbcDriver");
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand Down
Loading
Loading