diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 6cc79a7a0325..3700163b2999 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,3 @@ { - "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 1 + "https://github.com/apache/beam/pull/36890": "fixing some null errors" } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java index c76ae79dd45d..6ef6e82e6a70 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -40,7 +42,6 @@ * org.apache.beam.sdk.extensions.sql.meta.store.MetaStore}. In Beam SQL, a DATABASE refers to a * {@link BeamCalciteSchema}. */ -@SuppressWarnings({"keyfor", "nullness"}) // TODO(https://github.com/apache/beam/issues/20497) public class BeamCalciteSchema implements Schema { private JdbcConnection connection; private TableProvider tableProvider; @@ -94,7 +95,9 @@ public Schema snapshot(SchemaVersion version) { } @Override - public Expression getExpression(SchemaPlus parentSchema, String name) { + public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) { + checkArgumentNotNull( + parentSchema, "Cannot convert BeamCalciteSchema to Expression without parent schema"); return Schemas.subSchemaExpression(parentSchema, name, getClass()); } @@ -114,7 +117,7 @@ public Set getTypeNames() { } @Override - public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table getTable( + public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.@Nullable Table getTable( String name) { Table table = tableProvider.getTable(name); if (table == null) { @@ -148,14 +151,20 @@ public Set getSubSchemaNames() { *

Otherwise, the sub-schema is derived from the {@link TableProvider} implementation. */ @Override - public Schema getSubSchema(String name) { - if (!subSchemas.containsKey(name)) { - BeamCalciteSchema subSchema; - @Nullable TableProvider subProvider = tableProvider.getSubProvider(name); - subSchema = subProvider != null ? new BeamCalciteSchema(name, connection, subProvider) : null; - subSchemas.put(name, subSchema); + public @Nullable Schema getSubSchema(String name) { + BeamCalciteSchema subSchema = subSchemas.get(name); + + if (subSchema != null) { + return subSchema; + } + + @Nullable TableProvider subProvider = tableProvider.getSubProvider(name); + if (subProvider == null) { + return null; } - return subSchemas.get(name); + subSchema = new BeamCalciteSchema(name, connection, subProvider); + subSchemas.put(name, subSchema); + return subSchema; } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java index ce25610422c1..ab1d07eec0a1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java @@ -40,6 +40,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Factory classes that Calcite uses to create initial schema for JDBC connection. @@ -57,9 +58,6 @@ *

{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider, * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class BeamCalciteSchemaFactory { /** @@ -67,12 +65,14 @@ class BeamCalciteSchemaFactory { * actual {@link BeamCalciteSchema}. */ static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) { - InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema(); + InitialEmptySchema initialEmptySchema = + (InitialEmptySchema) jdbcConnection.getCurrentBeamSchema(); return initialEmptySchema.getTableProvider(); } static CatalogManager catalogFromInitialEmptySchema(JdbcConnection jdbcConnection) { - InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema(); + InitialEmptySchema initialEmptySchema = + (InitialEmptySchema) jdbcConnection.getCurrentBeamSchema(); return initialEmptySchema.getCatalogManager(); } @@ -209,7 +209,7 @@ public Set getSubSchemaNames() { } @Override - public Expression getExpression(SchemaPlus parentSchema, String name) { + public Expression getExpression(@Nullable SchemaPlus parentSchema, String name) { return illegal(); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java index eb2c384b1e6f..9f3ff6478ad6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java @@ -42,24 +42,21 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.TranslatableTable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.checkerframework.checker.nullness.qual.Nullable; /** Adapter from {@link BeamSqlTable} to a calcite Table. */ -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class BeamCalciteTable extends AbstractQueryableTable implements ModifiableTable, TranslatableTable { private final BeamSqlTable beamTable; // These two options should be unified. // https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7590 private final Map pipelineOptionsMap; - private PipelineOptions pipelineOptions; + private @Nullable PipelineOptions pipelineOptions; BeamCalciteTable( BeamSqlTable beamTable, Map pipelineOptionsMap, - PipelineOptions pipelineOptions) { + @Nullable PipelineOptions pipelineOptions) { super(Object[].class); this.beamTable = beamTable; this.pipelineOptionsMap = pipelineOptionsMap; @@ -117,7 +114,7 @@ public Queryable asQueryable( } @Override - public Collection getModifiableCollection() { + public @Nullable Collection getModifiableCollection() { return null; } @@ -128,8 +125,8 @@ public TableModify toModificationRel( Prepare.CatalogReader catalogReader, RelNode child, TableModify.Operation operation, - List updateColumnList, - List sourceExpressionList, + @Nullable List updateColumnList, + @Nullable List sourceExpressionList, boolean flattened) { return new BeamIOSinkRel( cluster, diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java index c77ff1f85b79..d84783118bbd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.extensions.sql.impl; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.sql.SQLException; import java.util.AbstractMap.SimpleEntry; @@ -51,17 +50,13 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Contains the metadata of tables/UDF functions, and exposes APIs to * query/validate/optimize/translate SQL statements. */ @Internal -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class BeamSqlEnv { JdbcConnection connection; QueryPlanner planner; @@ -151,16 +146,14 @@ public static class BeamSqlEnvBuilder { "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner"; private String queryPlannerClassName; private CatalogManager catalogManager; - private String currentSchemaName; + private @Nullable String currentSchemaName = null; private Map schemaMap; private Set> functionSet; private boolean autoLoadUdfs; - private PipelineOptions pipelineOptions; + private @Nullable PipelineOptions pipelineOptions; private Collection ruleSets; private BeamSqlEnvBuilder(TableProvider tableProvider) { - checkNotNull(tableProvider, "Table provider for the default schema must be sets."); - if (tableProvider instanceof MetaStore) { catalogManager = new InMemoryCatalogManager((MetaStore) tableProvider); } else { @@ -176,8 +169,6 @@ private BeamSqlEnvBuilder(TableProvider tableProvider) { } private BeamSqlEnvBuilder(CatalogManager catalogManager) { - checkNotNull(catalogManager, "Catalog manager for the default schema must be set."); - this.catalogManager = catalogManager; this.queryPlannerClassName = CALCITE_PLANNER; this.schemaMap = new HashMap<>(); @@ -287,7 +278,9 @@ private void configureSchemas(JdbcConnection jdbcConnection) { // Does not update the current default schema. schemaMap.forEach(jdbcConnection::setSchema); - if (Strings.isNullOrEmpty(currentSchemaName)) { + // Fix it in a local variable so static analysis knows it cannot be mutated. + @Nullable String currentSchemaName = this.currentSchemaName; + if (currentSchemaName == null || currentSchemaName.isEmpty()) { return; } @@ -328,9 +321,18 @@ private QueryPlanner instantiatePlanner( "Cannot find requested QueryPlanner class: " + queryPlannerClassName, exc); } + // This try/catch kept deliberately tight to ensure that we _only_ catch exceptions due to + // this reflective access. QueryPlanner.Factory factory; try { - factory = (QueryPlanner.Factory) queryPlannerClass.getField("FACTORY").get(null); + // See https://github.com/typetools/jdk/pull/235#pullrequestreview-3400922783 + @SuppressWarnings("nullness") + Object queryPlannerFactoryObj = + checkStateNotNull( + queryPlannerClass.getField("FACTORY").get(null), + "Static field %s.FACTORY is null. It must be a QueryPlanner.Factory instance.", + queryPlannerClass); + factory = (QueryPlanner.Factory) queryPlannerFactoryObj; } catch (NoSuchFieldException | IllegalAccessException exc) { throw new RuntimeException( String.format( diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java index baea5ae155b4..9c54f059f214 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + import java.sql.SQLException; import java.util.Collections; import java.util.Map; @@ -27,6 +29,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteConnection; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -38,9 +41,6 @@ * {@link BeamCalciteSchema BeamCalciteSchemas} keep reference to this connection. Pipeline options * are stored here. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class JdbcConnection extends CalciteConnectionWrapper { /** * Connection string parameters that begin with {@code "beam."} will be interpreted as {@link @@ -49,7 +49,7 @@ public class JdbcConnection extends CalciteConnectionWrapper { private static final String PIPELINE_OPTION_PREFIX = "beam."; private Map pipelineOptionsMap; - private PipelineOptions pipelineOptions; + private @Nullable PipelineOptions pipelineOptions; private JdbcConnection(CalciteConnection connection) throws SQLException { super(connection); @@ -62,16 +62,16 @@ private JdbcConnection(CalciteConnection connection) throws SQLException { *

Sets the pipeline options, replaces the initial non-functional top-level schema with schema * created by {@link BeamCalciteSchemaFactory}. */ - static @Nullable JdbcConnection initialize(CalciteConnection connection) { + static JdbcConnection initialize(CalciteConnection connection) { try { - if (connection == null) { - return null; - } + String currentSchemaName = + checkStateNotNull( + connection.getSchema(), "When trying to initialize JdbcConnection: No schema set."); JdbcConnection jdbcConnection = new JdbcConnection(connection); jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection)); jdbcConnection.setSchema( - connection.getSchema(), + currentSchemaName, BeamCalciteSchemaFactory.catalogFromInitialEmptySchema(jdbcConnection)); return jdbcConnection; } catch (SQLException e) { @@ -107,27 +107,29 @@ public void setPipelineOptions(PipelineOptions pipelineOptions) { this.pipelineOptions = pipelineOptions; } - public PipelineOptions getPipelineOptions() { + public @Nullable PipelineOptions getPipelineOptions() { return this.pipelineOptions; } /** Get the current default schema from the root schema. */ - @SuppressWarnings("TypeParameterUnusedInFormals") - T getCurrentBeamSchema() { - try { - return (T) CalciteSchema.from(getRootSchema().getSubSchema(getSchema())).schema; - } catch (SQLException e) { - throw new RuntimeException(e); - } + Schema getCurrentBeamSchema() { + return CalciteSchema.from(getCurrentSchemaPlus()).schema; } /** Calcite-created {@link SchemaPlus} wrapper for the current schema. */ public SchemaPlus getCurrentSchemaPlus() { + String currentSchema; try { - return getRootSchema().getSubSchema(getSchema()); + currentSchema = checkStateNotNull(getSchema(), "Current schema not set"); } catch (SQLException e) { throw new RuntimeException(e); } + + return checkStateNotNull( + getRootSchema().getSubSchema(currentSchema), + "SubSchema not found in `%s`: %s", + getRootSchema().getName(), + currentSchema); } /** diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java index b23dee607cc1..ddc6c9e7500b 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java @@ -45,6 +45,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.rules.CoreRules; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.runtime.Hook; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.tools.RuleSet; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Calcite JDBC driver with Beam defaults. @@ -56,10 +57,6 @@ *

The querystring-style parameters are parsed as {@link PipelineOptions}. */ @AutoService(java.sql.Driver.class) -@SuppressWarnings({ - "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class JdbcDriver extends Driver { public static final JdbcDriver INSTANCE = new JdbcDriver(); public static final String CONNECT_STRING_PREFIX = "jdbc:beam:"; @@ -129,10 +126,18 @@ protected String getConnectStringPrefix() { * CalciteConnection}. */ @Override - public Connection connect(String url, Properties info) throws SQLException { + @SuppressWarnings("override.return") // https://github.com/typetools/jdk/pull/246 + public @Nullable Connection connect(String url, Properties info) throws SQLException { + @Nullable CalciteConnection connection = (CalciteConnection) super.connect(url, info); + + // null here means that CalciteConnection is not a "suitable driver" based on the parameters + if (connection == null) { + return null; + } + // calciteConnection is initialized with an empty Beam schema, // we need to populate it with pipeline options, load table providers, etc - return JdbcConnection.initialize((CalciteConnection) super.connect(url, info)); + return JdbcConnection.initialize(connection); } /** @@ -176,6 +181,12 @@ private static JdbcConnection getConnection(PipelineOptions options) { JdbcConnection connection; try { connection = (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties); + // Normally, #connect is allowed to return null when the URL is not suitable. Here, however, + // we are + // deliberately passing a bogus URL to instantiate a connection, so it should never be null. + if (connection == null) { + throw new SQLException("Unexpected null when creating synthetic Beam JdbcDriver"); + } } catch (SQLException e) { throw new RuntimeException(e); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java index b92e2ce12556..c195a5f67af5 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java @@ -101,10 +101,10 @@ public static ImmutableMultimap createAll(Class clazz) { /** * Creates {@link org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function} from - * given method. When {@code eval} method does not suit, {@code null} is returned. + * given method. * * @param method method that is used to implement the function - * @return created {@link Function} or null + * @return created {@link Function} */ public static Function create(Method method) { return create(method, ""); @@ -112,11 +112,11 @@ public static Function create(Method method) { /** * Creates {@link org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Function} from - * given method. When {@code eval} method does not suit, {@code null} is returned. + * given method. * * @param method method that is used to implement the function * @param jarPath Path to jar that contains the method. - * @return created {@link Function} or null + * @return created {@link Function} */ public static Function create(Method method, String jarPath) { validateMethod(method); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java index 3b816c0a7b1c..46be7b61077f 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TVFSlidingWindowFn.java @@ -17,8 +17,9 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import com.google.auto.value.AutoValue; -import java.util.Arrays; import java.util.Collection; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.extensions.sql.impl.utils.TVFStreamingUtils; @@ -27,16 +28,15 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; +import org.joda.time.ReadableDateTime; /** * TVFSlidingWindowFn assigns window based on input row's "window_start" and "window_end" * timestamps. */ @AutoValue -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public abstract class TVFSlidingWindowFn extends NonMergingWindowFn { /** Size of the generated windows. */ public abstract Duration getSize(); @@ -53,10 +53,19 @@ public Collection assignWindows(AssignContext c) throws Exceptio Row curRow = (Row) c.element(); // In sliding window as TVF syntax, each row contains's its window's start and end as metadata, // thus we can assign a window directly based on window's start and end metadata. - return Arrays.asList( - new IntervalWindow( - curRow.getDateTime(TVFStreamingUtils.WINDOW_START).toInstant(), - curRow.getDateTime(TVFStreamingUtils.WINDOW_END).toInstant())); + ReadableDateTime windowStart = + checkArgumentNotNull( + curRow.getDateTime(TVFStreamingUtils.WINDOW_START), + "When assigning a sliding window to row: %s cannot be null", + TVFStreamingUtils.WINDOW_START); + + ReadableDateTime windowEnd = + checkArgumentNotNull( + curRow.getDateTime(TVFStreamingUtils.WINDOW_END), + "When assigning a sliding window to row: %s is null", + TVFStreamingUtils.WINDOW_END); + + return ImmutableList.of(new IntervalWindow(windowStart.toInstant(), windowEnd.toInstant())); } @Override diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java index 3196667cb8cb..8dbe82b30ab6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/TableResolutionUtils.java @@ -19,6 +19,8 @@ import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.sql.SQLException; @@ -31,11 +33,9 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; /** Utils to wire up the custom table resolution into Calcite's planner. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class TableResolutionUtils { /** @@ -68,7 +68,10 @@ class TableResolutionUtils { */ static void setupCustomTableResolution(JdbcConnection connection, SqlNode parsed) { List tableNames = TableNameExtractionUtils.extractTableNamesFromNode(parsed); - String currentSchemaName = getCurrentSchemaName(connection); + String currentSchemaName = + checkStateNotNull( + getCurrentSchemaName(connection), + "When trying to set up custom table resolution: current schema is null"); SchemaWithName defaultSchema = SchemaWithName.create(connection, currentSchemaName); @@ -80,7 +83,7 @@ static void setupCustomTableResolution(JdbcConnection connection, SqlNode parsed } /** Current (default) schema name in the JdbcConnection. */ - private static String getCurrentSchemaName(JdbcConnection connection) { + private static @Nullable String getCurrentSchemaName(JdbcConnection connection) { try { return connection.getSchema(); } catch (SQLException e) { @@ -170,12 +173,22 @@ private static class SchemaWithName { String name; org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema schema; + private SchemaWithName( + String name, + org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema schema) { + this.name = name; + this.schema = schema; + } + static SchemaWithName create(JdbcConnection connection, String name) { - SchemaWithName schemaWithName = new SchemaWithName(); - schemaWithName.name = name; - schemaWithName.schema = - CalciteSchema.from(connection.getRootSchema().getSubSchema(name)).schema; - return schemaWithName; + return new SchemaWithName( + name, + CalciteSchema.from( + checkArgumentNotNull( + connection.getRootSchema().getSubSchema(name), + "Sub-schema not found: %s", + name)) + .schema); } /** Whether this schema/table provider supports custom table resolution. */ diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java index 95fddf680279..7ebd3faea782 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/UdfImpl.java @@ -17,15 +17,15 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.lang.reflect.Method; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.TranslatableTable; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.impl.TableMacroImpl; +import org.checkerframework.checker.nullness.qual.Nullable; /** Beam-customized facade behind {@link Function} to address BEAM-5921. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) class UdfImpl { private UdfImpl() {} @@ -38,13 +38,18 @@ private UdfImpl() {} * * @param clazz class that is used to implement the function * @param methodName Method name (typically "eval") - * @return created {@link Function} or null + * @return created {@link Function} */ public static Function create(Class clazz, String methodName) { - final Method method = findMethod(clazz, methodName); + final @Nullable Method method = findMethod(clazz, methodName); + if (method == null) { - return null; + throw new RuntimeException( + String.format( + "Cannot create UDF from method: method %s.%s not found", + clazz.getCanonicalName(), methodName)); } + return create(method); } @@ -57,7 +62,8 @@ public static Function create(Class clazz, String methodName) { */ public static Function create(Method method) { if (TranslatableTable.class.isAssignableFrom(method.getReturnType())) { - return TableMacroImpl.create(method); + return checkArgumentNotNull( + TableMacroImpl.create(method), "Could not create function from method: %s", method); } else { return ScalarFunctionImpl.create(method); } @@ -69,7 +75,7 @@ public static Function create(Method method) { * @param name name of the method to find * @return the first method with matching name or null when no method found */ - static Method findMethod(Class clazz, String name) { + static @Nullable Method findMethod(Class clazz, String name) { for (Method method : clazz.getMethods()) { if (method.getName().equals(name) && !method.isBridge()) { return method; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java index 67d415f24183..abe31b8f9cf6 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/FullNameTableProvider.java @@ -32,9 +32,6 @@ * Base class for table providers that look up table metadata using full table names, instead of * querying it by parts of the name separately. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public abstract class FullNameTableProvider implements TableProvider, CustomTableResolver { private List knownTables; @@ -51,7 +48,7 @@ public void registerKnownTableNames(List tableNames) { } @Override - public TableProvider getSubProvider(String name) { + public @Nullable TableProvider getSubProvider(String name) { // TODO: implement with trie // If 'name' matches a sub-schema/sub-provider we start tracking @@ -103,7 +100,7 @@ class TableNameTrackingProvider extends InMemoryMetaTableProvider { } @Override - public TableProvider getSubProvider(String name) { + public @Nullable TableProvider getSubProvider(String name) { // Find if any of the parsed table names have 'name' as part // of their path at current index. // diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/ReadOnlyTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/ReadOnlyTableProvider.java index ad8ba3ead5ce..861cc01d94d0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/ReadOnlyTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/ReadOnlyTableProvider.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.extensions.sql.meta.provider; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; + import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; @@ -27,9 +29,6 @@ * A {@code ReadOnlyTableProvider} provides in-memory read only set of {@code BeamSqlTable * BeamSqlTables}. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public class ReadOnlyTableProvider implements TableProvider { private final String typeName; private final Map tables; @@ -73,6 +72,6 @@ public Map getTables() { @Override public BeamSqlTable buildBeamSqlTable(Table table) { - return tables.get(table.getName()); + return checkArgumentNotNull(tables.get(table.getName()), "Table not found: " + table.getName()); } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java index 7bb29955b4f4..9be8c96b7c99 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java @@ -36,9 +36,6 @@ * automatically loaded by CLI or other cases when {@link JdbcDriver} is used with default * connection parameters. */ -@SuppressWarnings({ - "nullness" // TODO(https://github.com/apache/beam/issues/20497) -}) public interface TableProvider { /** Gets the table type this provider handles. */ String getTableType(); @@ -76,7 +73,7 @@ default Set getSubProviders() { * Returns a sub-provider, e.g. sub-schema. Temporary, this logic needs to live in {@link * BeamCalciteSchema}. */ - default TableProvider getSubProvider(String name) { + default @Nullable TableProvider getSubProvider(String name) { return null; } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java index 2265759126fb..4133fb8b0700 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java @@ -127,7 +127,7 @@ public void testDriverManager_defaultUserAgent() throws Exception { public void testDriverManager_hasUserAgent() throws Exception { JdbcConnection connection = (JdbcConnection) DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX); - CatalogManagerSchema schema = connection.getCurrentBeamSchema(); + CatalogManagerSchema schema = (CatalogManagerSchema) connection.getCurrentBeamSchema(); assertThat( schema.connection().getPipelineOptionsMap().get("userAgent"), equalTo("BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()));