Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/src/main/sphinx/connector/hive.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,11 @@ Hive connector documentation.
catalog specific use.
- `false`
* - `hive.query-partition-filter-required-schemas`
- Allow specifying the list of schemas for which Trino will enforce that
queries use a filter on partition keys for source tables. The list can be
specified using the `hive.query-partition-filter-required-schemas`,
- Allow specifying the list of schema names or regex patterns for which Trino
will enforce that queries use a filter on partition keys for source tables.
Each entry can be an exact schema name (e.g., `schema1`) or a regular
expression pattern (e.g., `schema.*` to match all schemas starting with "schema").
The list can be specified using the `hive.query-partition-filter-required-schemas`,
or the `query_partition_filter_required_schemas` session property. The list
is taken into consideration only if the `hive.query-partition-filter-required`
configuration property or the `query_partition_filter_required` session
Expand Down
6 changes: 4 additions & 2 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,10 @@ implementation is used:
catalog session property is `query_partition_filter_required`.
- `false`
* - `iceberg.query-partition-filter-required-schemas`
- Specify the list of schemas for which Trino can enforce that queries use a
filter on partition keys for source tables. Equivalent session property is
- Specify the list of schema names or regex patterns for which Trino can enforce
that queries use a filter on partition keys for source tables. Each entry can be
an exact schema name (e.g., `schema1`) or a regular expression pattern (e.g., `schema.*`
to match all schemas starting with "schema"). Equivalent session property is
`query_partition_filter_required_schemas`. The list is used if the
`iceberg.query-partition-filter-required` configuration property or the
`query_partition_filter_required` catalog session property is set to `true`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ public Set<String> getQueryPartitionFilterRequiredSchemas()
}

@Config("hive.query-partition-filter-required-schemas")
@ConfigDescription("List of schemas for which filter on partition column is enforced")
@ConfigDescription("List of schema names or regex patterns for which filter on partition column is enforced")
public HiveConfig setQueryPartitionFilterRequiredSchemas(List<String> queryPartitionFilterRequiredSchemas)
{
this.queryPartitionFilterRequiredSchemas = queryPartitionFilterRequiredSchemas.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4071,6 +4071,7 @@ private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession s
Set<String> requiredSchemas = getQueryPartitionFilterRequiredSchemas(session);
// If query_partition_filter_required_schemas is empty, then we would apply partition filter for all tables.
return isQueryPartitionFilterRequired(session) &&
(requiredSchemas.isEmpty() || requiredSchemas.contains(schemaTableName.getSchemaName()));
(requiredSchemas.isEmpty() || requiredSchemas.contains(schemaTableName.getSchemaName()) ||
requiredSchemas.stream().anyMatch(pattern -> schemaTableName.getSchemaName().matches(pattern)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public HiveSessionProperties(
false),
new PropertyMetadata<>(
QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS,
"List of schemas for which filter on partition column is enforced.",
"List of schema names or regex patterns for which filter on partition column is enforced.",
new ArrayType(VARCHAR),
Set.class,
hiveConfig.getQueryPartitionFilterRequiredSchemas(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,65 @@ public void testIgnoreQueryPartitionFilterRequiredSchemas()
assertUpdate(session, "DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testQueryPartitionFilterRequiredSchemasWithRegex()
{
String schemaPrefix = "test_regex_filter_";
String schema1 = schemaPrefix + "schema1_" + randomNameSuffix();
String schema2 = schemaPrefix + "schema2_" + randomNameSuffix();
String otherSchema = "other_schema_" + randomNameSuffix();

Session session = Session.builder(getSession())
.setIdentity(Identity.forUser("hive")
.withRole("hive", new SelectedRole(ROLE, Optional.of("admin")))
.build())
.setCatalogSessionProperty("hive", "query_partition_filter_required", "true")
.setCatalogSessionProperty("hive", "query_partition_filter_required_schemas", format("[\"%s.*\"]", schemaPrefix))
.build();

// Create schemas
getQueryRunner().execute("CREATE SCHEMA " + schema1);
getQueryRunner().execute("CREATE SCHEMA " + schema2);
getQueryRunner().execute("CREATE SCHEMA " + otherSchema);

try {
// Create tables in schemas matching the regex pattern
assertUpdate(session, format("CREATE TABLE %s.test_table (id integer, ds varchar) WITH (partitioned_by = ARRAY['ds'])", schema1));
assertUpdate(session, format("INSERT INTO %s.test_table VALUES (1, 'a')", schema1), 1);

assertUpdate(session, format("CREATE TABLE %s.test_table (id integer, ds varchar) WITH (partitioned_by = ARRAY['ds'])", schema2));
assertUpdate(session, format("INSERT INTO %s.test_table VALUES (2, 'b')", schema2), 1);

// Create table in schema NOT matching the regex pattern
assertUpdate(session, format("CREATE TABLE %s.test_table (id integer, ds varchar) WITH (partitioned_by = ARRAY['ds'])", otherSchema));
assertUpdate(session, format("INSERT INTO %s.test_table VALUES (3, 'c')", otherSchema), 1);

// Queries on tables in schemas matching regex should require partition filter
assertQueryFails(
session,
format("SELECT id FROM %s.test_table WHERE id = 1", schema1),
format("Filter required on %s\\.test_table for at least one partition column: ds", schema1));

assertQueryFails(
session,
format("SELECT id FROM %s.test_table WHERE id = 2", schema2),
format("Filter required on %s\\.test_table for at least one partition column: ds", schema2));

// Queries on tables in schemas NOT matching regex should succeed without partition filter
assertQuery(session, format("SELECT id FROM %s.test_table WHERE id = 3", otherSchema), "SELECT 3");

// Queries with partition filter should succeed
assertQuery(session, format("SELECT id FROM %s.test_table WHERE ds = 'a'", schema1), "SELECT 1");
assertQuery(session, format("SELECT id FROM %s.test_table WHERE ds = 'b'", schema2), "SELECT 2");
}
finally {
// Cleanup
getQueryRunner().execute("DROP SCHEMA " + schema1 + " CASCADE");
getQueryRunner().execute("DROP SCHEMA " + schema2 + " CASCADE");
getQueryRunner().execute("DROP SCHEMA " + otherSchema + " CASCADE");
}
}

@Test
public void testInvalidValueForQueryPartitionFilterRequiredSchemas()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public Set<String> getQueryPartitionFilterRequiredSchemas()
}

@Config("iceberg.query-partition-filter-required-schemas")
@ConfigDescription("List of schemas for which filter on partition column is enforced")
@ConfigDescription("List of schemas (or regex patterns) for which filter on partition column is enforced")
public IcebergConfig setQueryPartitionFilterRequiredSchemas(Set<String> queryPartitionFilterRequiredSchemas)
{
this.queryPartitionFilterRequiredSchemas = queryPartitionFilterRequiredSchemas.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,8 @@ private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession s
Set<String> requiredSchemas = getQueryPartitionFilterRequiredSchemas(session);
// If query_partition_filter_required_schemas is empty then we would apply partition filter for all tables.
return isQueryPartitionFilterRequired(session) &&
(requiredSchemas.isEmpty() || requiredSchemas.contains(table.getSchemaName()));
(requiredSchemas.isEmpty() || requiredSchemas.contains(table.getSchemaName()) ||
requiredSchemas.stream().anyMatch(pattern -> table.getSchemaName().matches(pattern)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ public IcebergSessionProperties(
false))
.add(new PropertyMetadata<>(
QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS,
"List of schemas for which filter on partition column is enforced.",
"List of schema names or regex patterns for which filter on partition column is enforced.",
new ArrayType(VARCHAR),
Set.class,
icebergConfig.getQueryPartitionFilterRequiredSchemas(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8858,6 +8858,57 @@ public void testPartitionFilterRequiredSchemas()
assertUpdate(session, "DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testPartitionFilterRequiredSchemasWithRegex()
{
String schemaPrefix = "test_regex_filter_";
String schema1 = schemaPrefix + "schema1_" + randomNameSuffix();
String schema2 = schemaPrefix + "schema2_" + randomNameSuffix();
String otherSchema = "other_schema_" + randomNameSuffix();

Session session = Session.builder(withPartitionFilterRequired(getSession()))
.setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"" + schemaPrefix + ".*\"]")
.build();

// Create schemas
assertUpdate(session, "CREATE SCHEMA " + schema1);
assertUpdate(session, "CREATE SCHEMA " + schema2);
assertUpdate(session, "CREATE SCHEMA " + otherSchema);

try {
// Create tables in schemas matching the regex pattern
assertUpdate(session, format("CREATE TABLE %s.test_table (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", schema1), 1);
assertUpdate(session, format("CREATE TABLE %s.test_table (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 2, '2', '2'", schema2), 1);

// Create table in schema NOT matching the regex pattern
assertUpdate(session, format("CREATE TABLE %s.test_table (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 3, '3', '3'", otherSchema), 1);

// Queries on tables in schemas matching regex should require partition filter
assertQueryFails(
session,
format("SELECT id FROM %s.test_table WHERE a = '1'", schema1),
format("Filter required for %s\\.test_table on at least one of the partition columns: ds", schema1));

assertQueryFails(
session,
format("SELECT id FROM %s.test_table WHERE a = '2'", schema2),
format("Filter required for %s\\.test_table on at least one of the partition columns: ds", schema2));

// Queries on tables in schemas NOT matching regex should succeed without partition filter
assertQuerySucceeds(session, format("SELECT id FROM %s.test_table WHERE a = '3'", otherSchema));

// Queries with partition filter should succeed
assertQuery(session, format("SELECT id FROM %s.test_table WHERE ds = '1'", schema1), "SELECT 1");
assertQuery(session, format("SELECT id FROM %s.test_table WHERE ds = '2'", schema2), "SELECT 2");
}
finally {
// Cleanup
assertUpdate(session, "DROP SCHEMA " + schema1 + " CASCADE");
assertUpdate(session, "DROP SCHEMA " + schema2 + " CASCADE");
assertUpdate(session, "DROP SCHEMA " + otherSchema + " CASCADE");
}
}

@Test
public void testIgnorePartitionFilterRequiredSchemas()
{
Expand Down