Skip to content

Commit 5807173

Browse files
authored
fix(analyzer): Materialized view refresh improvement and fix (#26529)
Summary: 1. Fix an error of refresh MV after base table insertion 2. Use ExpressionUtil in MV refresh query composing 3. Fix an issue in MV caused by copying catalog properties during session creation For #3, we need a follow up to understand why the check of setting transaction id and catalog properties during session constructor is needed. I don't see a clear reason by checking the previous relavant commits. I'll check with some additional people. If no clear objection, I'll remove that check at least from the Session constructor in following PRs, and restoring the copy of catalog properties in buildOwnerSession(). Differential Revision: D86223888 ## Release Notes ``` == NO RELEASE NOTE == ```
1 parent b6c6475 commit 5807173

File tree

3 files changed

+71
-23
lines changed

3 files changed

+71
-23
lines changed

presto-hive/src/test/java/com/facebook/presto/hive/TestHiveMaterializedViewLogicalPlanner.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2866,6 +2866,61 @@ public void testAutoRefreshMaterializedViewFullyRefreshed()
28662866
}
28672867
}
28682868

2869+
@Test
2870+
public void testAutoRefreshMaterializedViewAfterInsertion()
2871+
{
2872+
QueryRunner queryRunner = getQueryRunner();
2873+
2874+
String table = "test_auto_refresh";
2875+
String view = "test_auto_refresh_mv";
2876+
2877+
Session fullRefreshSession = Session.builder(getSession())
2878+
.setSystemProperty("materialized_view_allow_full_refresh_enabled", "true")
2879+
.setSystemProperty("materialized_view_data_consistency_enabled", "false")
2880+
.build();
2881+
Session ownerSession = getSession();
2882+
2883+
queryRunner.execute(
2884+
fullRefreshSession,
2885+
format("CREATE TABLE %s (col1 bigint, col2 varchar, part_key varchar) " +
2886+
"WITH (partitioned_by = ARRAY['part_key'])", table));
2887+
2888+
queryRunner.execute(
2889+
fullRefreshSession,
2890+
format("INSERT INTO %s VALUES (1, 'aaa', 'p1'), " +
2891+
"(2, 'bbb', 'p2'), (3, 'aaa', 'p1')", table));
2892+
2893+
queryRunner.execute(
2894+
fullRefreshSession,
2895+
format("CREATE MATERIALIZED VIEW %s " +
2896+
"WITH (partitioned_by = ARRAY['part_key']) " +
2897+
"AS SELECT col1, part_key FROM %s", view, table));
2898+
2899+
try {
2900+
queryRunner.execute(fullRefreshSession, format("REFRESH MATERIALIZED VIEW %s", view));
2901+
2902+
MaterializedResult result = queryRunner.execute(fullRefreshSession,
2903+
format("SELECT COUNT(DISTINCT part_key) FROM %s", view));
2904+
assertEquals((long) ((Long) result.getOnlyValue()), 2, "Materialized view should contain all data after refreshes");
2905+
2906+
queryRunner.execute(
2907+
fullRefreshSession,
2908+
format("INSERT INTO %s VALUES (1, 'aaa', 'p3'), " +
2909+
"(2, 'bbb', 'p4'), (3, 'aaa', 'p5')", table));
2910+
2911+
queryRunner.execute(fullRefreshSession,
2912+
format("REFRESH MATERIALIZED VIEW %s", view));
2913+
2914+
result = queryRunner.execute(fullRefreshSession,
2915+
format("SELECT COUNT(DISTINCT part_key) FROM %s", view));
2916+
assertEquals((long) ((Long) result.getOnlyValue()), 5, "Materialized view should contain all data after refreshes");
2917+
}
2918+
finally {
2919+
queryRunner.execute(ownerSession, format("DROP MATERIALIZED VIEW %s", view));
2920+
queryRunner.execute(ownerSession, format("DROP TABLE %s", table));
2921+
}
2922+
}
2923+
28692924
private void setReferencedMaterializedViews(DistributedQueryRunner queryRunner, String tableName, List<String> referencedMaterializedViews)
28702925
{
28712926
appendTableParameter(replicateHiveMetastore(queryRunner),

presto-main-base/src/main/java/com/facebook/presto/sql/MaterializedViewUtils.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.facebook.presto.common.predicate.TupleDomain;
2020
import com.facebook.presto.metadata.Metadata;
2121
import com.facebook.presto.metadata.SessionPropertyManager;
22-
import com.facebook.presto.spi.ConnectorId;
2322
import com.facebook.presto.spi.MaterializedViewStatus;
2423
import com.facebook.presto.spi.SchemaTableName;
2524
import com.facebook.presto.spi.relation.DomainTranslator;
@@ -60,6 +59,7 @@
6059
import static com.facebook.presto.common.predicate.TupleDomain.extractFixedValues;
6160
import static com.facebook.presto.common.type.StandardTypes.HYPER_LOG_LOG;
6261
import static com.facebook.presto.common.type.StandardTypes.VARBINARY;
62+
import static com.facebook.presto.sql.ExpressionUtils.combineDisjuncts;
6363
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.NOT_SUPPORTED;
6464
import static com.facebook.presto.sql.tree.ArithmeticBinaryExpression.Operator.DIVIDE;
6565
import static com.facebook.presto.sql.tree.BooleanLiteral.FALSE_LITERAL;
@@ -115,13 +115,6 @@ public static Session buildOwnerSession(Session session, Optional<String> owner,
115115
builder.setSystemProperty(property.getKey(), property.getValue());
116116
}
117117

118-
for (Map.Entry<ConnectorId, Map<String, String>> connectorEntry : session.getConnectorProperties().entrySet()) {
119-
String catalogName = connectorEntry.getKey().getCatalogName();
120-
for (Map.Entry<String, String> property : connectorEntry.getValue().entrySet()) {
121-
builder.setCatalogSessionProperty(catalogName, property.getKey(), property.getValue());
122-
}
123-
}
124-
125118
return builder.build();
126119
}
127120

@@ -393,10 +386,7 @@ public static Expression convertMaterializedDataPredicatesToExpression(
393386
return disjuncts.get(0);
394387
}
395388
else {
396-
return disjuncts.stream()
397-
.reduce((left, right) -> new LogicalBinaryExpression(
398-
LogicalBinaryExpression.Operator.OR, left, right))
399-
.get();
389+
return combineDisjuncts(disjuncts);
400390
}
401391
}
402392

presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import com.facebook.presto.spi.type.UnknownTypeException;
8181
import com.facebook.presto.sql.ExpressionUtils;
8282
import com.facebook.presto.sql.MaterializedViewUtils;
83-
import com.facebook.presto.sql.SqlFormatterUtil;
8483
import com.facebook.presto.sql.analyzer.Analysis.TableArgumentAnalysis;
8584
import com.facebook.presto.sql.analyzer.Analysis.TableFunctionInvocationAnalysis;
8685
import com.facebook.presto.sql.parser.ParsingException;
@@ -266,6 +265,8 @@
266265
import static com.facebook.presto.sql.NodeUtils.mapFromProperties;
267266
import static com.facebook.presto.sql.QueryUtil.selectList;
268267
import static com.facebook.presto.sql.QueryUtil.simpleQuery;
268+
import static com.facebook.presto.sql.SqlFormatter.formatSql;
269+
import static com.facebook.presto.sql.SqlFormatterUtil.getFormattedSql;
269270
import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifyOrderByAggregations;
270271
import static com.facebook.presto.sql.analyzer.AggregationAnalyzer.verifySourceAggregations;
271272
import static com.facebook.presto.sql.analyzer.Analysis.MaterializedViewAnalysisState;
@@ -849,7 +850,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView node, Optio
849850

850851
// the original refresh statement will always be one line
851852
analysis.setExpandedQuery(format("-- Expanded Query: %s%nINSERT INTO %s %s",
852-
SqlFormatterUtil.getFormattedSql(node, sqlParser, Optional.empty()),
853+
getFormattedSql(node, sqlParser, Optional.empty()),
853854
viewName.getObjectName(),
854855
view.getOriginalSql()));
855856
analysis.addAccessControlCheckForTable(
@@ -870,7 +871,7 @@ protected Scope visitRefreshMaterializedView(RefreshMaterializedView node, Optio
870871

871872
Query viewQuery = parseView(view.getOriginalSql(), viewName, node);
872873
Query refreshQuery = tablePredicates.containsKey(toSchemaTableName(viewName)) ?
873-
buildQueryWithPredicate(viewQuery, tablePredicates.get(toSchemaTableName(viewName)))
874+
buildSubqueryWithPredicate(viewQuery, tablePredicates.get(toSchemaTableName(viewName)))
874875
: viewQuery;
875876
// Check if the owner has SELECT permission on the base tables
876877
StatementAnalyzer queryAnalyzer = new StatementAnalyzer(
@@ -938,7 +939,7 @@ private Optional<RelationType> analyzeBaseTableForRefreshMaterializedView(Table
938939

939940
SchemaTableName baseTableName = toSchemaTableName(createQualifiedObjectName(session, baseTable, baseTable.getName(), metadata));
940941
if (tablePredicates.containsKey(baseTableName)) {
941-
Query tableSubquery = buildQueryWithPredicate(baseTable, tablePredicates.get(baseTableName));
942+
Query tableSubquery = buildTableQueryWithPredicate(baseTable, tablePredicates.get(baseTableName));
942943
analysis.registerNamedQuery(baseTable, tableSubquery, true);
943944

944945
Scope subqueryScope = process(tableSubquery, scope);
@@ -975,17 +976,19 @@ private Map<SchemaTableName, Expression> getTablePredicatesForMaterializedViewRe
975976
}
976977
}
977978

978-
private Query buildQueryWithPredicate(Table table, Expression predicate)
979+
private Query buildTableQueryWithPredicate(Table table, Expression predicate)
979980
{
980981
Query query = simpleQuery(selectList(new AllColumns()), table, predicate);
981-
return (Query) sqlParser.createStatement(
982-
SqlFormatterUtil.getFormattedSql(query, sqlParser, Optional.empty()),
983-
createParsingOptions(session, warningCollector));
982+
String formattedSql = formatSql(query, Optional.empty());
983+
return (Query) sqlParser.createStatement(formattedSql, createParsingOptions(session, warningCollector));
984984
}
985985

986-
private Query buildQueryWithPredicate(Query originalQuery, Expression predicate)
986+
private Query buildSubqueryWithPredicate(Query originalQuery, Expression predicate)
987987
{
988-
return simpleQuery(selectList(new AllColumns()), new TableSubquery(originalQuery), predicate);
988+
Query query = simpleQuery(selectList(new AllColumns()), new TableSubquery(originalQuery), predicate);
989+
return (Query) sqlParser.createStatement(
990+
getFormattedSql(query, sqlParser, Optional.empty()),
991+
createParsingOptions(session, warningCollector));
989992
}
990993

991994
@Override
@@ -2402,7 +2405,7 @@ else if (materializedViewStatus.isPartiallyMaterialized()) {
24022405
Query unionQuery = new Query(predicateStitchedQuery.getWith(), union, predicateStitchedQuery.getOrderBy(), predicateStitchedQuery.getOffset(), predicateStitchedQuery.getLimit());
24032406
// can we return the above query object, instead of building a query string?
24042407
// in case of returning the query object, make sure to clone the original query object.
2405-
return SqlFormatterUtil.getFormattedSql(unionQuery, sqlParser, Optional.empty());
2408+
return getFormattedSql(unionQuery, sqlParser, Optional.empty());
24062409
}
24072410

24082411
/**

0 commit comments

Comments
 (0)