Skip to content

Commit 58fd531

Browse files
authored
feat: Refresh materialized view without predicates (#26483)
Summary: Following #26292, add logic for refreshing MV without predicates. Introduced new session parameter to control if we allow to refresh from scratch, which may cause too much data. Differential Revision: D85647785 ``` == RELEASE NOTES == Materialized view * Allow refresh without where predicates * New parameter materialized_view_allow_full_refresh_enabled to allow potentially costly full refresh ```
1 parent b60e795 commit 58fd531

File tree

9 files changed

+307
-6
lines changed

9 files changed

+307
-6
lines changed

presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2469,9 +2469,6 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session
24692469
Map<SchemaTableName, Map<String, String>> viewToBasePartitionMap = getViewToBasePartitionMap(materializedViewTable, baseTables, directColumnMappings);
24702470

24712471
MaterializedDataPredicates materializedDataPredicates = getMaterializedDataPredicates(metastore, metastoreContext, typeManager, materializedViewTable, timeZone);
2472-
if (materializedDataPredicates.getPredicateDisjuncts().isEmpty()) {
2473-
return new MaterializedViewStatus(NOT_MATERIALIZED);
2474-
}
24752472

24762473
// Partitions to keep track of for materialized view freshness are the partitions of every base table
24772474
// that are not available/updated to the materialized view yet.
@@ -2501,6 +2498,9 @@ public MaterializedViewStatus getMaterializedViewStatus(ConnectorSession session
25012498
}
25022499
}
25032500

2501+
if (materializedDataPredicates.getPredicateDisjuncts().isEmpty()) {
2502+
return new MaterializedViewStatus(NOT_MATERIALIZED, partitionsFromBaseTables);
2503+
}
25042504
if (missingPartitions > HiveSessionProperties.getMaterializedViewMissingPartitionsThreshold(session)) {
25052505
return new MaterializedViewStatus(TOO_MANY_PARTITIONS_MISSING, partitionsFromBaseTables);
25062506
}

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6106,7 +6106,25 @@ public void testRefreshMaterializedView()
61066106
// Test invalid predicates
61076107
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE nationname = 'UNITED STATES'", ".*Refresh materialized view by column nationname is not supported.*");
61086108
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5 WHERE regionkey + nationkey = 25", ".*Only column references are supported on the left side of comparison expressions in WHERE clause.*");
6109-
assertQueryFails("REFRESH MATERIALIZED VIEW test_customer_view_5", ".*Refresh Materialized View without predicates is not supported.");
6109+
}
6110+
6111+
@Test
6112+
public void testAutoRefreshMaterializedViewFailsWithoutFlag()
6113+
{
6114+
QueryRunner queryRunner = getQueryRunner();
6115+
6116+
computeActual("CREATE TABLE test_orders_no_flag WITH (partitioned_by = ARRAY['orderstatus']) " +
6117+
"AS SELECT orderkey, totalprice, orderstatus FROM orders WHERE orderkey < 100");
6118+
computeActual(
6119+
"CREATE MATERIALIZED VIEW test_orders_no_flag_view WITH (partitioned_by = ARRAY['orderstatus']" + retentionDays(30) + ") " +
6120+
"AS SELECT SUM(totalprice) AS total, orderstatus FROM test_orders_no_flag GROUP BY orderstatus");
6121+
6122+
assertQueryFails(
6123+
"REFRESH MATERIALIZED VIEW test_orders_no_flag_view",
6124+
".*misses too many partitions or is never refreshed and may incur high cost.*");
6125+
6126+
computeActual("DROP MATERIALIZED VIEW test_orders_no_flag_view");
6127+
computeActual("DROP TABLE test_orders_no_flag");
61106128
}
61116129

61126130
@Test

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,6 +2725,147 @@ public void testRefreshMaterializedViewAccessControl()
27252725
}
27262726
}
27272727

2728+
@Test
2729+
public void testAutoRefreshMaterializedViewWithoutPredicates()
2730+
{
2731+
QueryRunner queryRunner = getQueryRunner();
2732+
String table = "test_orders_auto_refresh_source";
2733+
String view = "test_orders_auto_refresh_target_mv";
2734+
String view2 = "test_orders_auto_refresh_target_mv2";
2735+
2736+
Session nonFullRefreshSession = getSession();
2737+
2738+
Session fullRefreshSession = Session.builder(getSession())
2739+
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
2740+
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
2741+
.build();
2742+
2743+
queryRunner.execute(
2744+
fullRefreshSession,
2745+
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['orderstatus']) " +
2746+
"AS SELECT orderkey, custkey, totalprice, orderstatus FROM orders WHERE orderkey < 100", table));
2747+
2748+
queryRunner.execute(
2749+
fullRefreshSession,
2750+
format("CREATE MATERIALIZED VIEW %s " +
2751+
"WITH (partitioned_by = ARRAY['orderstatus']) " +
2752+
"AS SELECT SUM(totalprice) AS total, COUNT(*) AS cnt, orderstatus " +
2753+
"FROM %s GROUP BY orderstatus", view, table));
2754+
2755+
queryRunner.execute(
2756+
nonFullRefreshSession,
2757+
format("CREATE MATERIALIZED VIEW %s " +
2758+
"WITH (partitioned_by = ARRAY['orderstatus']) " +
2759+
"AS SELECT SUM(totalprice) AS total, COUNT(*) AS cnt, orderstatus " +
2760+
"FROM %s GROUP BY orderstatus", view2, table));
2761+
2762+
try {
2763+
// Test that refresh without predicates succeeds when flag is enabled
2764+
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));
2765+
2766+
// Verify all partitions are refreshed
2767+
MaterializedResult result = queryRunner.execute(fullRefreshSession,
2768+
format("SELECT COUNT(DISTINCT orderstatus) FROM %s", view));
2769+
assertTrue(((Long) result.getOnlyValue()) > 0, "Materialized view should contain data after auto-refresh");
2770+
2771+
// Test that refresh without predicates fails when flag is not enabled
2772+
assertQueryFails(
2773+
nonFullRefreshSession,
2774+
format("REFRESH MATERIALIZED VIEW %s", view2),
2775+
".*misses too many partitions or is never refreshed and may incur high cost.*");
2776+
}
2777+
finally {
2778+
queryRunner.execute(fullRefreshSession, format("DROP MATERIALIZED VIEW %s", view));
2779+
queryRunner.execute(fullRefreshSession, format("DROP TABLE %s", table));
2780+
}
2781+
}
2782+
2783+
@Test
2784+
public void testAutoRefreshMaterializedViewWithJoinWithoutPredicates()
2785+
{
2786+
QueryRunner queryRunner = getQueryRunner();
2787+
2788+
String table1 = "test_customer_auto_refresh";
2789+
String table2 = "test_orders_join_auto_refresh";
2790+
String view = "test_auto_refresh_join_target_mv";
2791+
2792+
Session fullRefreshSession = Session.builder(getSession())
2793+
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
2794+
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
2795+
.build();
2796+
Session ownerSession = getSession();
2797+
2798+
queryRunner.execute(
2799+
fullRefreshSession,
2800+
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['nationkey']) " +
2801+
"AS SELECT custkey, name, nationkey FROM customer WHERE custkey < 100", table1));
2802+
queryRunner.execute(
2803+
fullRefreshSession,
2804+
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['orderstatus']) " +
2805+
"AS SELECT orderkey, custkey, totalprice, orderstatus FROM orders WHERE orderkey < 100", table2));
2806+
queryRunner.execute(
2807+
fullRefreshSession,
2808+
format("CREATE MATERIALIZED VIEW %s " +
2809+
"WITH (partitioned_by = ARRAY['nationkey', 'orderstatus']) " +
2810+
"AS SELECT c.name, SUM(o.totalprice) AS total, c.nationkey, o.orderstatus " +
2811+
"FROM %s c JOIN %s o ON c.custkey = o.custkey " +
2812+
"GROUP BY c.name, c.nationkey, o.orderstatus", view, table1, table2));
2813+
2814+
try {
2815+
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));
2816+
2817+
MaterializedResult result = queryRunner.execute(fullRefreshSession,
2818+
format("SELECT COUNT(*) FROM %s", view));
2819+
assertTrue(((Long) result.getOnlyValue()) > 0,
2820+
"Materialized view with join should contain data after auto-refresh");
2821+
}
2822+
finally {
2823+
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
2824+
queryRunner.execute(ownerSession, format("DROP TABLE %s", table1));
2825+
queryRunner.execute(ownerSession, format("DROP TABLE %s", table2));
2826+
}
2827+
}
2828+
2829+
@Test
2830+
public void testAutoRefreshMaterializedViewFullyRefreshed()
2831+
{
2832+
QueryRunner queryRunner = getQueryRunner();
2833+
2834+
String table = "test_customer_auto_refresh";
2835+
String view = "test_auto_refresh_join_target_mv";
2836+
2837+
Session fullRefreshSession = Session.builder(getSession())
2838+
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
2839+
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
2840+
.build();
2841+
Session ownerSession = getSession();
2842+
2843+
queryRunner.execute(
2844+
fullRefreshSession,
2845+
format("CREATE TABLE %s WITH (partitioned_by = ARRAY['nationkey']) " +
2846+
"AS SELECT custkey, name, nationkey FROM customer WHERE custkey < 100", table));
2847+
2848+
queryRunner.execute(
2849+
fullRefreshSession,
2850+
format("CREATE MATERIALIZED VIEW %s " +
2851+
"WITH (partitioned_by = ARRAY['nationkey']) " +
2852+
"AS SELECT custkey, nationkey FROM %s", view, table));
2853+
2854+
try {
2855+
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));
2856+
2857+
MaterializedResult result = queryRunner.execute(fullRefreshSession,
2858+
format("REFRESH MATERIALIZED VIEW %s", view));
2859+
2860+
assertEquals(result.getWarnings().size(), 1);
2861+
assertTrue(result.getWarnings().get(0).getMessage().matches("Materialized view .* is already fully refreshed"));
2862+
}
2863+
finally {
2864+
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
2865+
queryRunner.execute(ownerSession, format("DROP TABLE %s", table));
2866+
}
2867+
}
2868+
27282869
private void setReferencedMaterializedViews(DistributedQueryRunner queryRunner, String tableName, List<String> referencedMaterializedViews)
27292870
{
27302871
appendTableParameter(replicateHiveMetastore(queryRunner),

presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ public final class SystemSessionProperties
245245
public static final String CONSIDER_QUERY_FILTERS_FOR_MATERIALIZED_VIEW_PARTITIONS = "consider-query-filters-for-materialized-view-partitions";
246246
public static final String QUERY_OPTIMIZATION_WITH_MATERIALIZED_VIEW_ENABLED = "query_optimization_with_materialized_view_enabled";
247247
public static final String LEGACY_MATERIALIZED_VIEWS = "legacy_materialized_views";
248+
public static final String MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED = "materialized_view_allow_full_refresh_enabled";
248249
public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy";
249250
public static final String JOINS_NOT_NULL_INFERENCE_STRATEGY = "joins_not_null_inference_strategy";
250251
public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy";
@@ -1360,6 +1361,11 @@ public SystemSessionProperties(
13601361
"or be removed at any time. Do not disable in production environments.",
13611362
featuresConfig.isLegacyMaterializedViews(),
13621363
true),
1364+
booleanProperty(
1365+
MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED,
1366+
"Allow full refresh of MV when it's empty - potentially high cost.",
1367+
featuresConfig.isMaterializedViewAllowFullRefreshEnabled(),
1368+
true),
13631369
stringProperty(
13641370
DISTRIBUTED_TRACING_MODE,
13651371
"Mode for distributed tracing. NO_TRACE, ALWAYS_TRACE, or SAMPLE_BASED",
@@ -2894,6 +2900,11 @@ public static boolean isLegacyMaterializedViews(Session session)
28942900
return session.getSystemProperty(LEGACY_MATERIALIZED_VIEWS, Boolean.class);
28952901
}
28962902

2903+
public static boolean isMaterializedViewAllowFullRefreshEnabled(Session session)
2904+
{
2905+
return session.getSystemProperty(MATERIALIZED_VIEW_ALLOW_FULL_REFRESH_ENABLED, Boolean.class);
2906+
}
2907+
28972908
public static boolean isVerboseRuntimeStatsEnabled(Session session)
28982909
{
28992910
return session.getSystemProperty(VERBOSE_RUNTIME_STATS_ENABLED, Boolean.class);

presto-main-base/src/main/java/com/facebook/presto/sql/MaterializedViewUtils.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,27 @@
2727
import com.facebook.presto.spi.relation.VariableReferenceExpression;
2828
import com.facebook.presto.spi.security.Identity;
2929
import com.facebook.presto.sql.analyzer.SemanticException;
30+
import com.facebook.presto.sql.planner.ExpressionDomainTranslator;
3031
import com.facebook.presto.sql.planner.LiteralEncoder;
3132
import com.facebook.presto.sql.tree.ArithmeticBinaryExpression;
3233
import com.facebook.presto.sql.tree.BooleanLiteral;
3334
import com.facebook.presto.sql.tree.Cast;
3435
import com.facebook.presto.sql.tree.ComparisonExpression;
3536
import com.facebook.presto.sql.tree.Expression;
37+
import com.facebook.presto.sql.tree.ExpressionRewriter;
38+
import com.facebook.presto.sql.tree.ExpressionTreeRewriter;
3639
import com.facebook.presto.sql.tree.FunctionCall;
3740
import com.facebook.presto.sql.tree.Identifier;
3841
import com.facebook.presto.sql.tree.IsNullPredicate;
3942
import com.facebook.presto.sql.tree.LogicalBinaryExpression;
4043
import com.facebook.presto.sql.tree.QualifiedName;
44+
import com.facebook.presto.sql.tree.SymbolReference;
4145
import com.google.common.collect.ImmutableList;
4246
import com.google.common.collect.ImmutableMap;
4347
import com.google.common.collect.ImmutableSet;
4448
import com.google.common.collect.Sets;
4549

50+
import java.util.ArrayList;
4651
import java.util.HashMap;
4752
import java.util.HashSet;
4853
import java.util.Iterator;
@@ -61,6 +66,7 @@
6166
import static com.facebook.presto.sql.tree.ComparisonExpression.Operator.EQUAL;
6267
import static com.facebook.presto.sql.tree.LogicalBinaryExpression.Operator.AND;
6368
import static com.facebook.presto.sql.tree.LogicalBinaryExpression.Operator.OR;
69+
import static com.google.common.base.Preconditions.checkState;
6470
import static com.google.common.collect.ImmutableList.toImmutableList;
6571
import static com.google.common.collect.ImmutableMap.toImmutableMap;
6672

@@ -327,4 +333,82 @@ public boolean validate(Identifier baseTableColumn, Map<Expression, Identifier>
327333
return baseToViewColumnMap.containsKey(new Cast(new FunctionCall(APPROX_SET, ImmutableList.of(baseTableColumn)), VARBINARY));
328334
}
329335
}
336+
337+
/**
338+
* Generate WHERE predicates for missing partitions from MaterializedDataPredicates.
339+
* Used for auto-refresh of materialized views without explicit WHERE clause.
340+
*/
341+
public static Map<SchemaTableName, Expression> generatePredicatesForMissingPartitions(
342+
Map<SchemaTableName, MaterializedViewStatus.MaterializedDataPredicates> missingPartitionsPerTable,
343+
Metadata metadata)
344+
{
345+
Map<SchemaTableName, Expression> predicates = new HashMap<>();
346+
347+
for (Map.Entry<SchemaTableName, MaterializedViewStatus.MaterializedDataPredicates> entry :
348+
missingPartitionsPerTable.entrySet()) {
349+
SchemaTableName tableName = entry.getKey();
350+
MaterializedViewStatus.MaterializedDataPredicates missingPartitions = entry.getValue();
351+
352+
Expression predicate = convertMaterializedDataPredicatesToExpression(missingPartitions, metadata);
353+
354+
predicates.put(tableName, predicate);
355+
}
356+
357+
return predicates;
358+
}
359+
360+
/**
361+
* Convert MaterializedDataPredicates to a SQL Expression tree.
362+
* Builds an OR expression of partition predicates, where each partition is an AND expression of column filters.
363+
*/
364+
public static Expression convertMaterializedDataPredicatesToExpression(
365+
MaterializedViewStatus.MaterializedDataPredicates predicates,
366+
Metadata metadata)
367+
{
368+
List<String> columnNames = predicates.getColumnNames();
369+
List<TupleDomain<String>> predicateDisjuncts = predicates.getPredicateDisjuncts();
370+
371+
ExpressionDomainTranslator translator = new ExpressionDomainTranslator(
372+
new LiteralEncoder(metadata.getBlockEncodingSerde()));
373+
374+
List<Expression> disjuncts = new ArrayList<>();
375+
376+
for (TupleDomain<String> tupleDomain : predicateDisjuncts) {
377+
checkState(!tupleDomain.isAll(), "TupleDomain.isAll() should not appear in MaterializedDataPredicates");
378+
if (tupleDomain.isNone()) {
379+
continue;
380+
}
381+
382+
Expression conjunction = translator.toPredicate(tupleDomain);
383+
conjunction = convertSymbolReferencesToIdentifiers(conjunction);
384+
385+
disjuncts.add(conjunction);
386+
}
387+
388+
if (disjuncts.isEmpty()) {
389+
throw new IllegalStateException("No predicates generated for missing partitions");
390+
}
391+
392+
if (disjuncts.size() == 1) {
393+
return disjuncts.get(0);
394+
}
395+
else {
396+
return disjuncts.stream()
397+
.reduce((left, right) -> new LogicalBinaryExpression(
398+
LogicalBinaryExpression.Operator.OR, left, right))
399+
.get();
400+
}
401+
}
402+
403+
private static Expression convertSymbolReferencesToIdentifiers(Expression expression)
404+
{
405+
return ExpressionTreeRewriter.rewriteWith(new ExpressionRewriter<Void>()
406+
{
407+
@Override
408+
public Expression rewriteSymbolReference(SymbolReference node, Void context, ExpressionTreeRewriter<Void> treeRewriter)
409+
{
410+
return new Identifier(node.getName());
411+
}
412+
}, expression);
413+
}
330414
}

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ public class FeaturesConfig
226226
private boolean materializedViewPartitionFilteringEnabled = true;
227227
private boolean queryOptimizationWithMaterializedViewEnabled;
228228
private boolean legacyMaterializedViewRefresh = true;
229+
private boolean materializedViewAllowFullRefreshEnabled;
229230

230231
private AggregationIfToFilterRewriteStrategy aggregationIfToFilterRewriteStrategy = AggregationIfToFilterRewriteStrategy.DISABLED;
231232
private String analyzerType = "BUILTIN";
@@ -2169,6 +2170,19 @@ public FeaturesConfig setLegacyMaterializedViews(boolean value)
21692170
return this;
21702171
}
21712172

2173+
public boolean isMaterializedViewAllowFullRefreshEnabled()
2174+
{
2175+
return materializedViewAllowFullRefreshEnabled;
2176+
}
2177+
2178+
@Config("materialized-view-allow-full-refresh-enabled")
2179+
@ConfigDescription("Allow full refresh of MV when it's empty - potentially high cost.")
2180+
public FeaturesConfig setMaterializedViewAllowFullRefreshEnabled(boolean value)
2181+
{
2182+
this.materializedViewAllowFullRefreshEnabled = value;
2183+
return this;
2184+
}
2185+
21722186
public boolean isVerboseRuntimeStatsEnabled()
21732187
{
21742188
return verboseRuntimeStatsEnabled;

0 commit comments

Comments
 (0)