Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run ",
"modification": 1
"modification": 2
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PreCommit_SQL.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
"modification": 3
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
Expand Down Expand Up @@ -71,6 +72,11 @@ public boolean createDatabase(String database) {
return catalogConfig.createNamespace(database);
}

@Override
public Collection<String> databases() {
return catalogConfig.listNamespaces();
}

@Override
public void useDatabase(String database) {
checkArgument(databaseExists(database), "Database '%s' does not exist.");
Expand Down
10 changes: 10 additions & 0 deletions sdks/java/extensions/sql/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ data: {
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseDatabase"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlShowCatalogs"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlShowDatabases"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlShowTables"
"org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam"
"org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils"
"org.apache.beam.sdk.schemas.Schema"
Expand All @@ -50,6 +53,9 @@ data: {
"TBLPROPERTIES"
"PROPERTIES"
"PARTITIONED"
"CATALOGS"
"DATABASES"
"TABLES"
"USE"
]

Expand Down Expand Up @@ -422,6 +428,10 @@ data: {
# Return type of method implementation should be 'SqlNode'.
# Example: SqlShowDatabases(), SqlShowTables().
statementParserMethods: [
"SqlShowTables(Span.of())"
"SqlShowDatabases(Span.of())"
"SqlShowCatalogs(Span.of())"
"SqlShowCurrent(Span.of())"
"SqlUseCatalog(Span.of(), null)"
"SqlUseDatabase(Span.of(), null)"
"SqlSetOptionBeam(Span.of(), null)"
Expand Down
67 changes: 67 additions & 0 deletions sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,22 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) :
}
}

/**
* SHOW CATALOGS [ LIKE regex_pattern ]
*/
SqlCall SqlShowCatalogs(Span s) :
{
SqlNode regex = null;
}
{
<SHOW> <CATALOGS> { s.add(this); }
[ <LIKE> regex = StringLiteral() ]
{
return new SqlShowCatalogs(s.end(this), false, regex);
}
}


/**
* CREATE DATABASE ( IF NOT EXISTS )? ( catalog_name '.' )? database_name
*/
Expand Down Expand Up @@ -331,6 +347,39 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) :
}
}

/**
* SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ]
*/
SqlCall SqlShowDatabases(Span s) :
{
SqlIdentifier catalogName = null;
SqlNode regex = null;
}
{
<SHOW> <DATABASES> { s.add(this); }
[ ( <FROM> | <IN> ) catalogName = SimpleIdentifier() ]
[ <LIKE> regex = StringLiteral() ]
{
return new SqlShowDatabases(s.end(this), false, catalogName, regex);
}
}

SqlCall SqlShowCurrent(Span s) :
{
}
{
<SHOW> <CURRENT> { s.add(this); }
(
<CATALOG> {
return new SqlShowCatalogs(s.end(this), true, null);
}
|
<DATABASE> {
return new SqlShowDatabases(s.end(this), true, null, null);
}
)
}


SqlNodeList PartitionFieldList() :
{
Expand Down Expand Up @@ -456,6 +505,24 @@ SqlDrop SqlDropTable(Span s, boolean replace) :
}
}

/**
* SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE regex_pattern ]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would [catalog_name '.' ] database_name already exist somewhere as a general construction, or could share between grammar rules? It seems like it is a parsing rule for "database reference"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have it as a general construction yet (catalogs and databases are relatively new concepts in Beam SQL).

I could put a comment at the top of the doc, something like databaseRef: [ catalog_name '.' ] database_name, and replace the occurrences with databaseRef

Lmk if that's what you had in mind?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well.. we've have tables named like bigquery.my_dataset.my_tablename and pubsub.my_topic for a long time. Ideally we can merge these two things soonish since they have identical syntax and meaning.

*/
SqlCall SqlShowTables(Span s) :
{
SqlIdentifier database = null;
SqlNode regex = null;
}
{
<SHOW> <TABLES> { s.add(this); }
[ (<FROM> | <IN>) database = CompoundIdentifier() ]
[ <LIKE> regex = StringLiteral() ]
{
return new SqlShowTables(s.end(this), database, regex);
}
}


Schema.FieldType FieldType() :
{
final SqlTypeName collectionTypeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table ge
connection.getPipelineOptions());
}

public Collection<Table> getTables() {
return tableProvider.getTables().values();
}

@Override
public Set<String> getFunctionNames() {
return Collections.emptySet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;

import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;

Expand Down Expand Up @@ -142,7 +143,8 @@ public void dropCatalog(SqlIdentifier identifier, boolean ifExists) {
// will attempt to do so.
public void maybeRegisterProvider(TableName path, String type) {
type = type.toLowerCase();
CatalogSchema catalogSchema = getCatalogSchema(path);
CatalogSchema catalogSchema =
path.catalog() != null ? getCatalogSchema(path) : getCurrentCatalogSchema();
BeamCalciteSchema beamCalciteSchema = catalogSchema.getDatabaseSchema(path);

if (beamCalciteSchema.getTableProvider() instanceof MetaStore) {
Expand Down Expand Up @@ -177,14 +179,16 @@ public Set<String> getTableNames() {
}

public CatalogSchema getCatalogSchema(TableName tablePath) {
@Nullable Schema catalogSchema = getSubSchema(tablePath.catalog());
if (catalogSchema == null) {
catalogSchema = getCurrentCatalogSchema();
}
return getCatalogSchema(tablePath.catalog());
}

public CatalogSchema getCatalogSchema(@Nullable String catalog) {
Schema catalogSchema =
checkArgumentNotNull(getSubSchema(catalog), "Catalog '%s' not found.", catalog);
Preconditions.checkState(
catalogSchema instanceof CatalogSchema,
"Unexpected Schema type for Catalog '%s': %s",
tablePath.catalog(),
catalog,
catalogSchema.getClass());
return (CatalogSchema) catalogSchema;
}
Expand Down Expand Up @@ -222,7 +226,11 @@ public CatalogSchema getCurrentCatalogSchema() {

@Override
public Set<String> getSubSchemaNames() {
return catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
return catalogs().stream().map(Catalog::name).collect(Collectors.toSet());
}

public Collection<Catalog> catalogs() {
return catalogManager.catalogs();
}

public void setPipelineOption(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ public Catalog getCatalog() {
return getSubSchema(catalog.currentDatabase());
}

public Collection<String> databases() {
return catalog.databases();
}

public BeamCalciteSchema getDatabaseSchema(TableName tablePath) {
@Nullable BeamCalciteSchema beamCalciteSchema = getSubSchema(tablePath.database());
if (beamCalciteSchema == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,10 @@ public void execute(CalcitePrepare.Context context) {
CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema;
catalogManagerSchema.maybeRegisterProvider(pathOverride, SqlDdlNodes.getString(type));

CatalogSchema catalogSchema = catalogManagerSchema.getCatalogSchema(pathOverride);
CatalogSchema catalogSchema =
pathOverride.catalog() != null
? catalogManagerSchema.getCatalogSchema(pathOverride)
: catalogManagerSchema.getCurrentCatalogSchema();
beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
} else if (schema instanceof BeamCalciteSchema) {
beamCalciteSchema = (BeamCalciteSchema) schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ static Pair<CalciteSchema, String> schema(
return Pair.of(checkStateNotNull(schema, "Got null sub-schema for path '%s'", path), name(id));
}

static CalciteSchema schema(CalcitePrepare.Context context, boolean mutable) {
CalciteSchema rootSchema = mutable ? context.getMutableRootSchema() : context.getRootSchema();
List<String> path = context.getDefaultSchemaPath();
@Nullable CalciteSchema schema = childSchema(rootSchema, path);
return checkStateNotNull(schema, "Got null sub-schema for path '%s'", path);
}

private static @Nullable CalciteSchema childSchema(CalciteSchema rootSchema, List<String> path) {
@Nullable CalciteSchema schema = rootSchema;
for (String p : path) {
Expand All @@ -85,7 +92,7 @@ public static String name(SqlIdentifier id) {
}
}

static @Nullable String getString(SqlNode n) {
static @Nullable String getString(@Nullable SqlNode n) {
if (n == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ public void execute(CalcitePrepare.Context context) {

List<String> components = Lists.newArrayList(Splitter.on(".").split(databaseName.toString()));
TableName pathOverride = TableName.create(components, "");
CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride);
CatalogSchema catalogSchema =
pathOverride.catalog() != null
? ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride)
: ((CatalogManagerSchema) schema).getCurrentCatalogSchema();
catalogSchema.dropDatabase(databaseName, cascade, ifExists);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public void execute(CalcitePrepare.Context context) {

BeamCalciteSchema beamCalciteSchema;
if (schema instanceof CatalogManagerSchema) {
CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride);
CatalogSchema catalogSchema =
pathOverride.catalog() != null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto - and I do think doing it in the parse layer is the right way to go. But we should probably be able to make a common "catalog reference" and "table reference" construct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we should probably be able to make a common "catalog reference" and "table reference" construct.

Not sure I understand this, but lmk if the new change takes care of it

? ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride)
: ((CatalogManagerSchema) schema).getCurrentCatalogSchema();
beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride);
} else if (schema instanceof BeamCalciteSchema) {
beamCalciteSchema = (BeamCalciteSchema) schema;
Expand Down
Loading
Loading