diff --git a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDatabaseMetaData.java b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDatabaseMetaData.java index 629b6a446795..ea4c04340eb0 100644 --- a/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDatabaseMetaData.java +++ b/client/trino-jdbc/src/test/java/io/trino/jdbc/TestTrinoDatabaseMetaData.java @@ -2204,8 +2204,9 @@ private void assertMetadataCalls( }); actualMetadataCallsCount = actualMetadataCallsCount.stream() - // Every query involves beginQuery and cleanupQuery, so ignore them. - .filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method)) + // Every query involves beginQuery, cleanupQuery and getMetrics, so ignore them. + .filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method) + && !"ConnectorMetadata.getMetrics".equals(method)) .collect(toImmutableMultiset()); assertMultisetsEqual(actualMetadataCallsCount, expectedMetadataCallsCount); diff --git a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java index 3c92cf4cd00a..e0b78689f27c 100644 --- a/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java +++ b/core/trino-main/src/main/java/io/trino/dispatcher/FailedDispatchQuery.java @@ -336,6 +336,7 @@ private static QueryStats immediateFailureQueryStats() DataSize.ofBytes(0), ImmutableList.of(), DynamicFiltersStats.EMPTY, + ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()); } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java index 4b0accd72a28..634f4c39de92 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStateMachine.java @@ -29,6 +29,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; +import io.trino.NotInTransactionException; import io.trino.Session; import io.trino.client.NodeVersion; import io.trino.exchange.ExchangeInput; @@ -36,6 +37,7 @@ import io.trino.execution.StateMachine.StateChangeListener; import io.trino.execution.querystats.PlanOptimizersStatsCollector; import io.trino.execution.warnings.WarningCollector; +import io.trino.metadata.CatalogInfo; import io.trino.metadata.Metadata; import io.trino.operator.BlockedReason; import io.trino.operator.OperatorStats; @@ -182,6 +184,7 @@ public class QueryStateMachine private final AtomicReference> output = new AtomicReference<>(Optional.empty()); private final AtomicReference> referencedTables = new AtomicReference<>(ImmutableList.of()); private final AtomicReference> routines = new AtomicReference<>(ImmutableList.of()); + private final AtomicReference> catalogMetadataMetrics = new AtomicReference<>(ImmutableMap.of()); private final StateMachine> finalQueryInfo; private final WarningCollector warningCollector; @@ -386,6 +389,38 @@ static QueryStateMachine beginWithTicker( return queryStateMachine; } + private void collectCatalogMetadataMetrics() + { + try { + if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) { + // The metrics collection depends on active transaction as the metrics + // are stored in the transactional ConnectorMetadata, but the collection can be + // run after the query has failed e.g., via cancel. + return; + } + + ImmutableMap.Builder catalogMetadataMetrics = ImmutableMap.builder(); + List activeCatalogs = metadata.listActiveCatalogs(session); + for (CatalogInfo activeCatalog : activeCatalogs) { + Metrics metrics = metadata.getMetrics(session, activeCatalog.catalogName()); + if (!metrics.getMetrics().isEmpty()) { + catalogMetadataMetrics.put(activeCatalog.catalogName(), metrics); + } + } + + this.catalogMetadataMetrics.set(catalogMetadataMetrics.buildOrThrow()); + } + catch (NotInTransactionException e) { + // Ignore, The metrics collection depends on an active transaction and should be skipped + // if there is no one running. + // The exception can be thrown even though there is a check for transaction at the top of the method, because + // the transaction can be committed or aborted concurrently, after the check is done. + } + catch (RuntimeException e) { + QUERY_STATE_LOG.error(e, "Error collecting query catalog metadata metrics: %s", queryId); + } + } + public QueryId getQueryId() { return queryId; @@ -863,6 +898,10 @@ private QueryStats getQueryStats(Optional stages) } } + // Try to collect the catalog metadata metrics. + // The collection will fail, and we will use metrics collected earlier if + // the query is already committed or aborted and metadata is not available. + collectCatalogMetadataMetrics(); return new QueryStats( queryStateTimer.getCreateTime(), getExecutionStartTime().orElse(null), @@ -948,7 +987,7 @@ private QueryStats getQueryStats(Optional stages) stageGcStatistics.build(), getDynamicFiltersStats(), - + catalogMetadataMetrics.get(), operatorStatsSummary.build(), planOptimizersStatsCollector.getTopRuleStats()); } @@ -1179,6 +1218,7 @@ public boolean transitionToFinishing() transitionToFailed(e); return true; } + collectCatalogMetadataMetrics(); Optional transaction = session.getTransactionId().flatMap(transactionManager::getTransactionInfoIfExist); if (transaction.isPresent() && transaction.get().isAutoCommitContext()) { @@ -1256,6 +1296,7 @@ private boolean transitionToFailed(Throwable throwable, boolean log) } return false; } + collectCatalogMetadataMetrics(); try { if (log) { @@ -1561,6 +1602,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats) queryStats.getFailedPhysicalWrittenDataSize(), queryStats.getStageGcStatistics(), queryStats.getDynamicFiltersStats(), + ImmutableMap.of(), ImmutableList.of(), // Remove the operator summaries as OperatorInfo (especially DirectExchangeClientStatus) can hold onto a large amount of memory ImmutableList.of()); } diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java index e0827bb8dbe0..370e35978c1d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryStats.java @@ -24,10 +24,12 @@ import io.trino.operator.TableWriterOperator; import io.trino.spi.eventlistener.QueryPlanOptimizerStatistics; import io.trino.spi.eventlistener.StageGcStatistics; +import io.trino.spi.metrics.Metrics; import jakarta.annotation.Nullable; import java.time.Instant; import java.util.List; +import java.util.Map; import java.util.OptionalDouble; import java.util.Set; @@ -126,6 +128,7 @@ public class QueryStats private final DynamicFiltersStats dynamicFiltersStats; + private final Map catalogMetadataMetrics; private final List operatorSummaries; private final List optimizerRulesSummaries; @@ -217,7 +220,7 @@ public QueryStats( @JsonProperty("stageGcStatistics") List stageGcStatistics, @JsonProperty("dynamicFiltersStats") DynamicFiltersStats dynamicFiltersStats, - + @JsonProperty("catalogMetadataMetrics") Map catalogMetadataMetrics, @JsonProperty("operatorSummaries") List operatorSummaries, @JsonProperty("optimizerRulesSummaries") List optimizerRulesSummaries) { @@ -322,6 +325,7 @@ public QueryStats( this.stageGcStatistics = ImmutableList.copyOf(requireNonNull(stageGcStatistics, "stageGcStatistics is null")); this.dynamicFiltersStats = requireNonNull(dynamicFiltersStats, "dynamicFiltersStats is null"); + this.catalogMetadataMetrics = requireNonNull(catalogMetadataMetrics, "catalogMetadataMetrics is null"); this.operatorSummaries = ImmutableList.copyOf(operatorSummaries); this.optimizerRulesSummaries = ImmutableList.copyOf(requireNonNull(optimizerRulesSummaries, "optimizerRulesSummaries is null")); } @@ -766,6 +770,12 @@ public DynamicFiltersStats getDynamicFiltersStats() return dynamicFiltersStats; } + @JsonProperty + public Map getCatalogMetadataMetrics() + { + return catalogMetadataMetrics; + } + @JsonProperty public List getOperatorSummaries() { diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java index 0d3b9d8d187e..4210e13c8095 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStateMachine.java @@ -29,6 +29,7 @@ import io.trino.operator.OperatorStats; import io.trino.operator.PipelineStats; import io.trino.operator.TaskStats; +import io.trino.plugin.base.metrics.DistributionSnapshot; import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.spi.eventlistener.StageGcStatistics; import io.trino.spi.metrics.Metrics; @@ -647,7 +648,7 @@ public StageInfo getStageInfo(Supplier> taskInfosSupplier) succinctDuration(inputBlockedTime, NANOSECONDS), succinctDuration(failedInputBlockedTime, NANOSECONDS), succinctBytes(bufferedDataSize), - TDigestHistogram.merge(bufferUtilizationHistograms.build()).map(DistributionSnapshot::new), + TDigestHistogram.merge(bufferUtilizationHistograms.build()).map(DistributionSnapshot::fromDistribution), succinctBytes(outputDataSize), succinctBytes(failedOutputDataSize), outputPositions, diff --git a/core/trino-main/src/main/java/io/trino/execution/StageStats.java b/core/trino-main/src/main/java/io/trino/execution/StageStats.java index e8e3db5441ac..50030b0a301f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/StageStats.java +++ b/core/trino-main/src/main/java/io/trino/execution/StageStats.java @@ -100,7 +100,7 @@ public class StageStats private final Duration failedInputBlockedTime; private final DataSize bufferedDataSize; - private final Optional outputBufferUtilization; + private final Optional outputBufferUtilization; private final DataSize outputDataSize; private final DataSize failedOutputDataSize; private final long outputPositions; @@ -174,7 +174,7 @@ public StageStats( @JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime, @JsonProperty("bufferedDataSize") DataSize bufferedDataSize, - @JsonProperty("outputBufferUtilization") Optional outputBufferUtilization, + @JsonProperty("outputBufferUtilization") Optional outputBufferUtilization, @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize, @JsonProperty("outputPositions") long outputPositions, @@ -543,7 +543,7 @@ public DataSize getBufferedDataSize() } @JsonProperty - public Optional getOutputBufferUtilization() + public Optional getOutputBufferUtilization() { return outputBufferUtilization; } diff --git a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java index ca54fa865132..7ea41323bf08 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/Metadata.java +++ b/core/trino-main/src/main/java/io/trino/metadata/Metadata.java @@ -67,6 +67,7 @@ import io.trino.spi.function.FunctionMetadata; import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.OperatorType; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.FunctionAuthorization; import io.trino.spi.security.GrantInfo; @@ -147,6 +148,11 @@ Optional getTableHandleForExecute( Optional getInfo(Session session, TableHandle handle); + /** + * Return connector-specific, metadata operations metrics for the given session. + */ + Metrics getMetrics(Session session, String catalogName); + CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle); /** @@ -483,6 +489,8 @@ Optional finishRefreshMaterializedView( */ List listCatalogs(Session session); + List listActiveCatalogs(Session session); + /** * Get the names that match the specified table prefix (never null). */ diff --git a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java index 5e8145780cdf..4f5f5c19a5e1 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/MetadataManager.java @@ -104,6 +104,7 @@ import io.trino.spi.function.OperatorType; import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.Signature; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.FunctionAuthorization; import io.trino.spi.security.GrantInfo; @@ -457,6 +458,14 @@ public Optional getInfo(Session session, TableHandle handle) return metadata.getInfo(connectorSession, handle.connectorHandle()); } + @Override + public Metrics getMetrics(Session session, String catalogName) + { + return transactionManager.getRequiredCatalogMetadata(session.getRequiredTransactionId(), catalogName) + .getMetadata(session) + .getMetrics(session.toConnectorSession()); + } + @Override public CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle) { @@ -1389,6 +1398,12 @@ public List listCatalogs(Session session) return transactionManager.getCatalogs(session.getRequiredTransactionId()); } + @Override + public List listActiveCatalogs(Session session) + { + return transactionManager.getActiveCatalogs(session.getRequiredTransactionId()); + } + @Override public List listViews(Session session, QualifiedTablePrefix prefix) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index b87e0662f76f..84b86204ac96 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -31,7 +31,6 @@ import io.trino.cost.PlanNodeStatsAndCostSummary; import io.trino.cost.PlanNodeStatsEstimate; import io.trino.cost.StatsAndCosts; -import io.trino.execution.DistributionSnapshot; import io.trino.execution.QueryStats; import io.trino.execution.StageInfo; import io.trino.execution.StageStats; @@ -42,6 +41,7 @@ import io.trino.metadata.Metadata; import io.trino.metadata.ResolvedFunction; import io.trino.metadata.TableHandle; +import io.trino.plugin.base.metrics.DistributionSnapshot; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.expression.FunctionName; import io.trino.spi.function.CatalogSchemaFunctionName; diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java index 74b515d3d696..b28f0f3a6064 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingConnectorMetadata.java @@ -79,6 +79,7 @@ import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.GrantInfo; import io.trino.spi.security.Privilege; @@ -252,6 +253,14 @@ public Optional getInfo(ConnectorSession session, ConnectorTableHandle t } } + @Override + public Metrics getMetrics(ConnectorSession session) + { + // Do not trace getMetrics as this is expected to be a quick local jvm call, + // and adding this span would only obfuscate the trace + return delegate.getMetrics(session); + } + @Override public List listTables(ConnectorSession session, Optional schemaName) { diff --git a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java index 89b1c7defdca..98a764ed5f62 100644 --- a/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java +++ b/core/trino-main/src/main/java/io/trino/tracing/TracingMetadata.java @@ -97,6 +97,7 @@ import io.trino.spi.function.FunctionMetadata; import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.OperatorType; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.FunctionAuthorization; import io.trino.spi.security.GrantInfo; @@ -284,6 +285,14 @@ public Optional getInfo(Session session, TableHandle handle) } } + @Override + public Metrics getMetrics(Session session, String catalogName) + { + // Do not trace getMetrics as this is expected to be a quick local jvm call, + // and adding this span would only obfuscate the trace + return delegate.getMetrics(session, catalogName); + } + @Override public CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle) { @@ -868,6 +877,15 @@ public List listCatalogs(Session session) } } + @Override + public List listActiveCatalogs(Session session) + { + Span span = startSpan("listActiveCatalogs"); + try (var _ = scopedSpan(span)) { + return delegate.listActiveCatalogs(session); + } + } + @Override public List listViews(Session session, QualifiedTablePrefix prefix) { diff --git a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java index d385309479a5..cf25e47851f6 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockManagedQueryExecution.java @@ -266,6 +266,7 @@ public QueryInfo getFullQueryInfo() ImmutableList.of(), DynamicFiltersStats.EMPTY, + ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()), Optional.empty(), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestDistributionSnapshot.java b/core/trino-main/src/test/java/io/trino/execution/TestDistributionSnapshot.java index e96d85a03076..579649e11a68 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestDistributionSnapshot.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestDistributionSnapshot.java @@ -14,6 +14,7 @@ package io.trino.execution; import io.airlift.stats.TDigest; +import io.trino.plugin.base.metrics.DistributionSnapshot; import io.trino.plugin.base.metrics.TDigestHistogram; import org.junit.jupiter.api.Test; @@ -36,7 +37,7 @@ void testConvertFromTDigestHistogram() digest.add(7.0); TDigestHistogram histogram = new TDigestHistogram(digest); - DistributionSnapshot snapshot = new DistributionSnapshot(histogram); + DistributionSnapshot snapshot = DistributionSnapshot.fromDistribution(histogram); assertThat(snapshot.total()).isEqualTo(9); assertThat(snapshot.min()).isEqualTo(1.0); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java index 79d2c17f943d..7558f25e5f39 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryInfo.java @@ -28,6 +28,7 @@ import io.opentelemetry.api.trace.Span; import io.trino.client.NodeVersion; import io.trino.operator.RetryPolicy; +import io.trino.plugin.base.metrics.DistributionSnapshot; import io.trino.plugin.base.metrics.LongCount; import io.trino.plugin.base.metrics.TDigestHistogram; import io.trino.server.BasicQueryStats; @@ -318,7 +319,7 @@ private static StageStats createStageStats(int value) Duration.succinctDuration(value, SECONDS), Duration.succinctDuration(value, SECONDS), succinctBytes(value), - Optional.of(new DistributionSnapshot(new TDigestHistogram(new TDigest()))), + Optional.of(DistributionSnapshot.fromDistribution(new TDigestHistogram(new TDigest()))), succinctBytes(value), succinctBytes(value), value, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java index 7392fa0a2a5b..9b11c043def4 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStateMachine.java @@ -30,6 +30,7 @@ import io.trino.execution.warnings.WarningCollector; import io.trino.execution.warnings.WarningCollectorConfig; import io.trino.metadata.Metadata; +import io.trino.metadata.TestMetadataManager; import io.trino.plugin.base.security.AllowAllSystemAccessControl; import io.trino.plugin.base.security.DefaultSystemAccessControl; import io.trino.security.AccessControlConfig; @@ -89,7 +90,6 @@ import static io.trino.execution.QueryState.STARTING; import static io.trino.execution.QueryState.WAITING_FOR_RESOURCES; import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector; -import static io.trino.metadata.TestMetadataManager.createTestMetadataManager; import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.TYPE_MISMATCH; import static io.trino.spi.StandardErrorCode.USER_CANCELED; @@ -442,15 +442,7 @@ public void testPreserveFirstFailure() { CountDownLatch cleanup = new CountDownLatch(1); QueryStateMachine queryStateMachine = queryStateMachine() - .withMetadata(new TracingMetadata(noopTracer(), createTestMetadataManager()) - { - @Override - public void cleanupQuery(Session session) - { - cleanup.countDown(); - super.cleanupQuery(session); - } - }) + .beforeQueryCleanup(cleanup::countDown) .build(); Future anotherThread = executor.submit(() -> { @@ -479,15 +471,7 @@ public void testPreserveCancellation() { CountDownLatch cleanup = new CountDownLatch(1); QueryStateMachine queryStateMachine = queryStateMachine() - .withMetadata(new TracingMetadata(noopTracer(), createTestMetadataManager()) - { - @Override - public void cleanupQuery(Session session) - { - cleanup.countDown(); - super.cleanupQuery(session); - } - }) + .beforeQueryCleanup(cleanup::countDown) .build(); Future anotherThread = executor.submit(() -> { @@ -761,7 +745,7 @@ private QueryStateMachineBuilder queryStateMachine() private class QueryStateMachineBuilder { private Ticker ticker = Ticker.systemTicker(); - private Metadata metadata; + private Optional beforeQueryCleanup = Optional.empty(); private WarningCollector warningCollector = WarningCollector.NOOP; private String setCatalog; private String setPath; @@ -780,9 +764,9 @@ public QueryStateMachineBuilder withTicker(Ticker ticker) } @CanIgnoreReturnValue - public QueryStateMachineBuilder withMetadata(Metadata metadata) + public QueryStateMachineBuilder beforeQueryCleanup(Runnable runnable) { - this.metadata = metadata; + this.beforeQueryCleanup = Optional.of(runnable); return this; } @@ -842,10 +826,24 @@ public QueryStateMachineBuilder withAddPreparedStatements(Map pr public QueryStateMachine build() { - if (metadata == null) { - metadata = createTestMetadataManager(); - } TransactionManager transactionManager = createTestTransactionManager(); + Metadata metadata = TestMetadataManager.builder() + .withTransactionManager(transactionManager) + .build(); + if (beforeQueryCleanup.isPresent()) { + Runnable beforeQueryCleanupAction = beforeQueryCleanup.get(); + // Using TracingMetadata in lieu of "Forwarding Metadata" which currently does not exist + metadata = new TracingMetadata(noopTracer(), metadata) + { + @Override + public void cleanupQuery(Session session) + { + beforeQueryCleanupAction.run(); + super.cleanupQuery(session); + } + }; + } + AccessControlManager accessControl = new AccessControlManager( NodeVersion.UNKNOWN, transactionManager, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index ed2587797ffb..7dd7d1c6b479 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -14,6 +14,7 @@ package io.trino.execution; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.json.JsonCodec; import io.airlift.units.DataSize; @@ -270,7 +271,7 @@ public class TestQueryStats 107)), DynamicFiltersStats.EMPTY, - + ImmutableMap.of(), operatorSummaries, optimizerRulesSummaries); diff --git a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java index 85c2999db29b..b31ad64e2b39 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestStageStats.java @@ -92,7 +92,7 @@ public class TestStageStats new Duration(202, NANOSECONDS), DataSize.ofBytes(34), - Optional.of(new io.trino.execution.DistributionSnapshot(getTDigestHistogram(10))), + Optional.of(io.trino.plugin.base.metrics.DistributionSnapshot.fromDistribution(getTDigestHistogram(10))), DataSize.ofBytes(35), DataSize.ofBytes(36), 37, diff --git a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java index 485122c764d8..81dad3933d3e 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java +++ b/core/trino-main/src/test/java/io/trino/metadata/AbstractMockMetadata.java @@ -73,6 +73,7 @@ import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.OperatorType; import io.trino.spi.function.Signature; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.FunctionAuthorization; import io.trino.spi.security.GrantInfo; @@ -207,6 +208,12 @@ public Optional getInfo(Session session, TableHandle handle) throw new UnsupportedOperationException(); } + @Override + public Metrics getMetrics(Session session, String catalogName) + { + throw new UnsupportedOperationException(); + } + @Override public CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle) { @@ -580,6 +587,12 @@ public List listCatalogs(Session session) throw new UnsupportedOperationException(); } + @Override + public List listActiveCatalogs(Session session) + { + throw new UnsupportedOperationException(); + } + @Override public List listViews(Session session, QualifiedTablePrefix prefix) { diff --git a/core/trino-main/src/test/java/io/trino/operator/scalar/BenchmarkHistogram.java b/core/trino-main/src/test/java/io/trino/operator/scalar/BenchmarkHistogram.java new file mode 100644 index 000000000000..e90b0eacb1d7 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/scalar/BenchmarkHistogram.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.trino.operator.scalar; + +import io.airlift.stats.TDigest; +import io.trino.jmh.Benchmarks; +import io.trino.plugin.base.metrics.DistributionSnapshot; +import io.trino.plugin.base.metrics.TDigestHistogram; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.security.SecureRandom; +import java.util.concurrent.TimeUnit; + +@SuppressWarnings("MethodMayBeStatic") +@State(Scope.Thread) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(1) +@Warmup(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS) +@BenchmarkMode(Mode.AverageTime) +public class BenchmarkHistogram +{ + private static final int HISTOGRAM_VALUES_SETS = 100; + + @Benchmark + @OperationsPerInvocation(HISTOGRAM_VALUES_SETS) + public void benchmarkTDigestSnapshot(BenchmarkData data, Blackhole bh) + { + for (TDigest digest : data.tDigests()) { + bh.consume(DistributionSnapshot.fromDistribution(new TDigestHistogram(digest))); + } + } + + @SuppressWarnings("FieldMayBeFinal") + @State(Scope.Thread) + public static class BenchmarkData + { + @Param({"10", "100", "1000"}) + private int valuesPerHistogram = 100; + + TDigest[] tDigests; + + @Setup + public void setup() + { + SecureRandom random = new SecureRandom(); + tDigests = new TDigest[HISTOGRAM_VALUES_SETS]; + for (int i = 0; i < HISTOGRAM_VALUES_SETS; i++) { + TDigest digest = new TDigest(); + for (int j = 0; j < valuesPerHistogram; j++) { + long value = random.nextLong(10_000_000, 10_000_000_000L); + digest.add(value); + } + tDigests[i] = digest; + } + } + + public TDigest[] tDigests() + { + return tDigests; + } + } + + public static void main(String[] args) + throws Exception + { + // assure the benchmarks are valid before running + BenchmarkData data = new BenchmarkData(); + data.setup(); + Blackhole bh = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous."); + new BenchmarkHistogram().benchmarkTDigestSnapshot(data, bh); + + Benchmarks.benchmark(BenchmarkHistogram.class).run(); + } +} diff --git a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java index c605174338eb..76367e1fc91e 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestBasicQueryInfo.java @@ -134,6 +134,7 @@ public void testConstructor() 106, 107)), DynamicFiltersStats.EMPTY, + ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()), Optional.empty(), diff --git a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java index 4e7c5bbfe0b1..061293dd9ee9 100644 --- a/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java +++ b/core/trino-main/src/test/java/io/trino/server/TestQueryStateInfo.java @@ -179,6 +179,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query DataSize.valueOf("41GB"), ImmutableList.of(), DynamicFiltersStats.EMPTY, + ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()), Optional.empty(), diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 56a634fc7bf6..267cbcd9b23b 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -319,6 +319,10 @@ method long[] io.trino.spi.PageSorter::sort(java.util.List<io.trino.spi.type.Type>, java.util.List<io.trino.spi.Page>, java.util.List<java.lang.Integer>, java.util.List<io.trino.spi.connector.SortOrder>, int) method java.util.Iterator<io.trino.spi.Page> io.trino.spi.PageSorter::sort(java.util.List<io.trino.spi.type.Type>, java.util.List<io.trino.spi.Page>, java.util.List<java.lang.Integer>, java.util.List<io.trino.spi.connector.SortOrder>, int) + + java.method.addedToInterface + method double[] io.trino.spi.metrics.Distribution<T>::getPercentiles(double[]) + diff --git a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java index 912a1cc80273..d253801c19b0 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java +++ b/core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorMetadata.java @@ -28,6 +28,7 @@ import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.GrantInfo; import io.trino.spi.security.Privilege; @@ -249,6 +250,14 @@ default Optional getInfo(ConnectorSession session, ConnectorTableHandle return Optional.empty(); } + /** + * Return connector-specific, metadata operations metrics for the given session. + */ + default Metrics getMetrics(ConnectorSession session) + { + return Metrics.EMPTY; + } + /** * List table, view and materialized view names, possibly filtered by schema. An empty list is returned if none match. * An empty list is returned also when schema name does not refer to an existing schema. diff --git a/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java index 3c7a6ae3f670..1cc9798bc708 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java +++ b/core/trino-spi/src/main/java/io/trino/spi/metrics/Distribution.java @@ -23,4 +23,6 @@ public interface Distribution double getMax(); double getPercentile(double percentile); + + double[] getPercentiles(double... percentiles); } diff --git a/lib/trino-metastore/pom.xml b/lib/trino-metastore/pom.xml index 3ae2c759c2cb..81f4e486564d 100644 --- a/lib/trino-metastore/pom.xml +++ b/lib/trino-metastore/pom.xml @@ -55,6 +55,11 @@ slice + + io.airlift + stats + + io.airlift units @@ -75,6 +80,11 @@ trino-cache + + io.trino + trino-plugin-toolkit + + io.trino trino-spi @@ -107,6 +117,13 @@ test + + io.trino + trino-spi + test-jar + test + + io.trino.hadoop hadoop-apache diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastore.java b/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastore.java index f1d0e95ce3c0..56e2e02701a4 100644 --- a/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastore.java +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/HiveMetastore.java @@ -17,6 +17,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.function.LanguageFunction; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.RoleGrant; @@ -236,4 +237,9 @@ default void alterTransactionalTable(Table table, long transactionId, long write void replaceFunction(String databaseName, String functionName, LanguageFunction function); void dropFunction(String databaseName, String functionName, String signatureToken); + + default Metrics getMetrics() + { + return Metrics.EMPTY; + } } diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java b/lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java new file mode 100644 index 000000000000..f748e7a26113 --- /dev/null +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/MeasuredHiveMetastore.java @@ -0,0 +1,507 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.metastore; + +import com.google.common.base.Throwables; +import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableMap; +import com.google.errorprone.annotations.ThreadSafe; +import io.airlift.stats.TDigest; +import io.trino.plugin.base.metrics.DistributionSnapshot; +import io.trino.plugin.base.metrics.LongCount; +import io.trino.plugin.base.metrics.TDigestHistogram; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.function.LanguageFunction; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; +import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.security.ConnectorIdentity; +import io.trino.spi.security.RoleGrant; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; + +import static java.util.Objects.requireNonNull; + +public class MeasuredHiveMetastore + implements HiveMetastore +{ + private final HiveMetastore delegate; + private final MetastoreApiCallStats allApiCallsStats = new MetastoreApiCallStats(); + private final Map apiCallStats = new ConcurrentHashMap<>(); + private final Ticker ticker = Ticker.systemTicker(); + + public MeasuredHiveMetastore(HiveMetastore delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + public static HiveMetastoreFactory factory(HiveMetastoreFactory metastoreFactory) + { + return new MeasuredMetastoreFactory(metastoreFactory); + } + + @Override + public Metrics getMetrics() + { + ImmutableMap.Builder> metrics = ImmutableMap.builder(); + allApiCallsStats.storeTo(metrics, "metastore.all"); + for (Map.Entry callStats : apiCallStats.entrySet()) { + callStats.getValue().storeTo(metrics, "metastore.%s".formatted(callStats.getKey())); + } + return new Metrics(metrics.buildOrThrow()); + } + + @Override + public Optional getDatabase(String databaseName) + { + return wrap("getDatabase", () -> delegate.getDatabase(databaseName)); + } + + @Override + public List getAllDatabases() + { + return wrap("getAllDatabases", delegate::getAllDatabases); + } + + @Override + public Optional getTable(String databaseName, String tableName) + { + return wrap("getTable", () -> delegate.getTable(databaseName, tableName)); + } + + @Override + public Map getTableColumnStatistics(String databaseName, String tableName, Set columnNames) + { + return wrap("getTableColumnStatistics", () -> delegate.getTableColumnStatistics(databaseName, tableName, columnNames)); + } + + @Override + public Map> getPartitionColumnStatistics(String databaseName, String tableName, Set partitionNames, Set columnNames) + { + return wrap("getPartitionColumnStatistics", () -> delegate.getPartitionColumnStatistics(databaseName, tableName, partitionNames, columnNames)); + } + + @Override + public boolean useSparkTableStatistics() + { + return wrap("useSparkTableStatistics", delegate::useSparkTableStatistics); + } + + @Override + public void updateTableStatistics(String databaseName, String tableName, OptionalLong acidWriteId, StatisticsUpdateMode mode, PartitionStatistics statisticsUpdate) + { + wrap("updateTableStatistics", () -> delegate.updateTableStatistics(databaseName, tableName, acidWriteId, mode, statisticsUpdate)); + } + + @Override + public void updatePartitionStatistics(Table table, StatisticsUpdateMode mode, Map partitionUpdates) + { + wrap("updatePartitionStatistics", () -> delegate.updatePartitionStatistics(table, mode, partitionUpdates)); + } + + @Override + public List getTables(String databaseName) + { + return wrap("getTables", () -> delegate.getTables(databaseName)); + } + + @Override + public List getTableNamesWithParameters(String databaseName, String parameterKey, Set parameterValues) + { + return wrap("getTableNamesWithParameters", () -> delegate.getTableNamesWithParameters(databaseName, parameterKey, parameterValues)); + } + + @Override + public void createDatabase(Database database) + { + wrap("createDatabase", () -> delegate.createDatabase(database)); + } + + @Override + public void dropDatabase(String databaseName, boolean deleteData) + { + wrap("dropDatabase", () -> delegate.dropDatabase(databaseName, deleteData)); + } + + @Override + public void renameDatabase(String databaseName, String newDatabaseName) + { + wrap("renameDatabase", () -> delegate.renameDatabase(databaseName, newDatabaseName)); + } + + @Override + public void setDatabaseOwner(String databaseName, HivePrincipal principal) + { + wrap("setDatabaseOwner", () -> delegate.setDatabaseOwner(databaseName, principal)); + } + + @Override + public void createTable(Table table, PrincipalPrivileges principalPrivileges) + { + wrap("createTable", () -> delegate.createTable(table, principalPrivileges)); + } + + @Override + public void dropTable(String databaseName, String tableName, boolean deleteData) + { + wrap("dropTable", () -> delegate.dropTable(databaseName, tableName, deleteData)); + } + + @Override + public void replaceTable(String databaseName, String tableName, Table newTable, PrincipalPrivileges principalPrivileges, Map environmentContext) + { + wrap("replaceTable", () -> delegate.replaceTable(databaseName, tableName, newTable, principalPrivileges, environmentContext)); + } + + @Override + public void renameTable(String databaseName, String tableName, String newDatabaseName, String newTableName) + { + wrap("renameTable", () -> delegate.renameTable(databaseName, tableName, newDatabaseName, newTableName)); + } + + @Override + public void commentTable(String databaseName, String tableName, Optional comment) + { + wrap("commentTable", () -> delegate.commentTable(databaseName, tableName, comment)); + } + + @Override + public void setTableOwner(String databaseName, String tableName, HivePrincipal principal) + { + wrap("setTableOwner", () -> delegate.setTableOwner(databaseName, tableName, principal)); + } + + @Override + public void commentColumn(String databaseName, String tableName, String columnName, Optional comment) + { + wrap("commentColumn", () -> delegate.commentColumn(databaseName, tableName, columnName, comment)); + } + + @Override + public void addColumn(String databaseName, String tableName, String columnName, HiveType columnType, String columnComment) + { + wrap("addColumn", () -> delegate.addColumn(databaseName, tableName, columnName, columnType, columnComment)); + } + + @Override + public void renameColumn(String databaseName, String tableName, String oldColumnName, String newColumnName) + { + wrap("renameColumn", () -> delegate.renameColumn(databaseName, tableName, oldColumnName, newColumnName)); + } + + @Override + public void dropColumn(String databaseName, String tableName, String columnName) + { + wrap("dropColumn", () -> delegate.dropColumn(databaseName, tableName, columnName)); + } + + @Override + public Optional getPartition(Table table, List partitionValues) + { + return wrap("getPartition", () -> delegate.getPartition(table, partitionValues)); + } + + @Override + public Optional> getPartitionNamesByFilter(String databaseName, String tableName, List columnNames, TupleDomain partitionKeysFilter) + { + return wrap("getPartitionNamesByFilter", () -> delegate.getPartitionNamesByFilter(databaseName, tableName, columnNames, partitionKeysFilter)); + } + + @Override + public Map> getPartitionsByNames(Table table, List partitionNames) + { + return wrap("getPartitionsByNames", () -> delegate.getPartitionsByNames(table, partitionNames)); + } + + @Override + public void addPartitions(String databaseName, String tableName, List partitions) + { + wrap("addPartitions", () -> delegate.addPartitions(databaseName, tableName, partitions)); + } + + @Override + public void dropPartition(String databaseName, String tableName, List parts, boolean deleteData) + { + wrap("dropPartition", () -> delegate.dropPartition(databaseName, tableName, parts, deleteData)); + } + + @Override + public void alterPartition(String databaseName, String tableName, PartitionWithStatistics partition) + { + wrap("alterPartition", () -> delegate.alterPartition(databaseName, tableName, partition)); + } + + @Override + public void createRole(String role, String grantor) + { + wrap("createRole", () -> delegate.createRole(role, grantor)); + } + + @Override + public void dropRole(String role) + { + wrap("dropRole", () -> delegate.dropRole(role)); + } + + @Override + public Set listRoles() + { + return wrap("listRoles", delegate::listRoles); + } + + @Override + public void grantRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + wrap("grantRoles", () -> delegate.grantRoles(roles, grantees, adminOption, grantor)); + } + + @Override + public void revokeRoles(Set roles, Set grantees, boolean adminOption, HivePrincipal grantor) + { + wrap("revokeRoles", () -> delegate.revokeRoles(roles, grantees, adminOption, grantor)); + } + + @Override + public Set listRoleGrants(HivePrincipal principal) + { + return wrap("listRoleGrants", () -> delegate.listRoleGrants(principal)); + } + + @Override + public void grantTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + wrap("grantTablePrivileges", () -> delegate.grantTablePrivileges(databaseName, tableName, tableOwner, grantee, grantor, privileges, grantOption)); + } + + @Override + public void revokeTablePrivileges(String databaseName, String tableName, String tableOwner, HivePrincipal grantee, HivePrincipal grantor, Set privileges, boolean grantOption) + { + wrap("revokeTablePrivileges", () -> delegate.revokeTablePrivileges(databaseName, tableName, tableOwner, grantee, grantor, privileges, grantOption)); + } + + @Override + public Set listTablePrivileges(String databaseName, String tableName, Optional tableOwner, Optional principal) + { + return wrap("listTablePrivileges", () -> delegate.listTablePrivileges(databaseName, tableName, tableOwner, principal)); + } + + @Override + public void checkSupportsTransactions() + { + wrap("checkSupportsTransactions", () -> delegate.checkSupportsTransactions()); + } + + @Override + public long openTransaction(AcidTransactionOwner transactionOwner) + { + return wrap("openTransaction", () -> delegate.openTransaction(transactionOwner)); + } + + @Override + public void commitTransaction(long transactionId) + { + wrap("commitTransaction", () -> delegate.commitTransaction(transactionId)); + } + + @Override + public void abortTransaction(long transactionId) + { + wrap("abortTransaction", () -> delegate.abortTransaction(transactionId)); + } + + @Override + public void sendTransactionHeartbeat(long transactionId) + { + wrap("sendTransactionHeartbeat", () -> delegate.sendTransactionHeartbeat(transactionId)); + } + + @Override + public void acquireSharedReadLock(AcidTransactionOwner transactionOwner, String queryId, long transactionId, List fullTables, List partitions) + { + wrap("acquireSharedReadLock", () -> delegate.acquireSharedReadLock(transactionOwner, queryId, transactionId, fullTables, partitions)); + } + + @Override + public String getValidWriteIds(List tables, long currentTransactionId) + { + return wrap("getValidWriteIds", () -> delegate.getValidWriteIds(tables, currentTransactionId)); + } + + @Override + public Optional getConfigValue(String name) + { + return wrap("getConfigValue", () -> delegate.getConfigValue(name)); + } + + @Override + public long allocateWriteId(String dbName, String tableName, long transactionId) + { + return wrap("allocateWriteId", () -> delegate.allocateWriteId(dbName, tableName, transactionId)); + } + + @Override + public void acquireTableWriteLock(AcidTransactionOwner transactionOwner, String queryId, long transactionId, String dbName, String tableName, AcidOperation operation, boolean isDynamicPartitionWrite) + { + wrap("acquireTableWriteLock", () -> delegate.acquireTableWriteLock(transactionOwner, queryId, transactionId, dbName, tableName, operation, isDynamicPartitionWrite)); + } + + @Override + public void updateTableWriteId(String dbName, String tableName, long transactionId, long writeId, OptionalLong rowCountChange) + { + wrap("updateTableWriteId", () -> delegate.updateTableWriteId(dbName, tableName, transactionId, writeId, rowCountChange)); + } + + @Override + public void addDynamicPartitions(String dbName, String tableName, List partitionNames, long transactionId, long writeId, AcidOperation operation) + { + wrap("addDynamicPartitions", () -> delegate.addDynamicPartitions(dbName, tableName, partitionNames, transactionId, writeId, operation)); + } + + @Override + public void alterTransactionalTable(Table table, long transactionId, long writeId, PrincipalPrivileges principalPrivileges) + { + wrap("alterTransactionalTable", () -> delegate.alterTransactionalTable(table, transactionId, writeId, principalPrivileges)); + } + + @Override + public boolean functionExists(String databaseName, String functionName, String signatureToken) + { + return wrap("functionExists", () -> delegate.functionExists(databaseName, functionName, signatureToken)); + } + + @Override + public Collection getAllFunctions(String databaseName) + { + return wrap("getAllFunctions", () -> delegate.getAllFunctions(databaseName)); + } + + @Override + public Collection getFunctions(String databaseName, String functionName) + { + return wrap("getFunctions", () -> delegate.getFunctions(databaseName, functionName)); + } + + @Override + public void createFunction(String databaseName, String functionName, LanguageFunction function) + { + wrap("createFunction", () -> delegate.createFunction(databaseName, functionName, function)); + } + + @Override + public void replaceFunction(String databaseName, String functionName, LanguageFunction function) + { + wrap("replaceFunction", () -> delegate.replaceFunction(databaseName, functionName, function)); + } + + @Override + public void dropFunction(String databaseName, String functionName, String signatureToken) + { + wrap("dropFunction", () -> delegate.dropFunction(databaseName, functionName, signatureToken)); + } + + private void wrap(String callName, Runnable call) + { + wrap(callName, () -> { + call.run(); + return null; + }); + } + + private T wrap(String callName, Callable call) + { + MetastoreApiCallStats callStats = apiCallStats.computeIfAbsent(callName, _ -> new MetastoreApiCallStats()); + long start = ticker.read(); + try { + return call.call(); + } + catch (Exception e) { + callStats.recordFailure(); + allApiCallsStats.recordFailure(); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + finally { + long timeNanos = ticker.read() - start; + callStats.record(timeNanos); + allApiCallsStats.record(timeNanos); + } + } + + @ThreadSafe + static class MetastoreApiCallStats + { + private final TDigest timeNanosDistribution = new TDigest(); + private long totalTimeNanos; + private long totalFailures; + + public synchronized void record(long nanos) + { + timeNanosDistribution.add(nanos); + totalTimeNanos += nanos; + } + + public synchronized void recordFailure() + { + totalFailures++; + } + + public synchronized void storeTo(ImmutableMap.Builder> metrics, String prefix) + { + // DistributionSnapshot does not retain reference to the histogram + metrics.put(prefix + ".time.distribution", DistributionSnapshot.fromDistribution(new TDigestHistogram(timeNanosDistribution))); + metrics.put(prefix + ".time.total", new LongCount(totalTimeNanos)); + + // do not add redundant 0 failures to make the metrics more concise + if (totalFailures > 0) { + metrics.put(prefix + ".failures", new LongCount(totalFailures)); + } + } + } + + public static class MeasuredMetastoreFactory + implements HiveMetastoreFactory + { + private final HiveMetastoreFactory metastoreFactory; + + public MeasuredMetastoreFactory(HiveMetastoreFactory metastoreFactory) + { + this.metastoreFactory = requireNonNull(metastoreFactory, "metastoreFactory is null"); + } + + @Override + public boolean isImpersonationEnabled() + { + return metastoreFactory.isImpersonationEnabled(); + } + + @Override + public boolean hasBuiltInCaching() + { + return metastoreFactory.hasBuiltInCaching(); + } + + @Override + public HiveMetastore createMetastore(Optional identity) + { + return new MeasuredHiveMetastore(metastoreFactory.createMetastore(identity)); + } + } +} diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/cache/CachingHiveMetastore.java b/lib/trino-metastore/src/main/java/io/trino/metastore/cache/CachingHiveMetastore.java index b19ed01277b9..2b3929530c15 100644 --- a/lib/trino-metastore/src/main/java/io/trino/metastore/cache/CachingHiveMetastore.java +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/cache/CachingHiveMetastore.java @@ -47,6 +47,7 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.function.LanguageFunction; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.RoleGrant; import org.weakref.jmx.Managed; @@ -1120,6 +1121,12 @@ public void dropFunction(String databaseName, String functionName, String signat delegate.dropFunction(databaseName, functionName, signatureToken); } + @Override + public Metrics getMetrics() + { + return delegate.getMetrics(); + } + private static LoadingCache buildCache( OptionalLong expiresAfterWriteMillis, OptionalLong refreshMillis, diff --git a/lib/trino-metastore/src/main/java/io/trino/metastore/tracing/TracingHiveMetastore.java b/lib/trino-metastore/src/main/java/io/trino/metastore/tracing/TracingHiveMetastore.java index 39c89d29eee9..3d5c109b1aaa 100644 --- a/lib/trino-metastore/src/main/java/io/trino/metastore/tracing/TracingHiveMetastore.java +++ b/lib/trino-metastore/src/main/java/io/trino/metastore/tracing/TracingHiveMetastore.java @@ -33,6 +33,7 @@ import io.trino.metastore.TableInfo; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.function.LanguageFunction; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.RoleGrant; @@ -676,4 +677,10 @@ public void dropFunction(String databaseName, String functionName, String signat .startSpan(); withTracing(span, () -> delegate.dropFunction(databaseName, functionName, signatureToken)); } + + @Override + public Metrics getMetrics() + { + return delegate.getMetrics(); + } } diff --git a/lib/trino-metastore/src/test/java/io/trino/metastore/TestMeasuredHiveMetastore.java b/lib/trino-metastore/src/test/java/io/trino/metastore/TestMeasuredHiveMetastore.java new file mode 100644 index 000000000000..7bef542a6639 --- /dev/null +++ b/lib/trino-metastore/src/test/java/io/trino/metastore/TestMeasuredHiveMetastore.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.metastore; + +import io.trino.metastore.MeasuredHiveMetastore.MeasuredMetastoreFactory; +import org.junit.jupiter.api.Test; + +import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; + +class TestMeasuredHiveMetastore +{ + @Test + public void testAllMethodsImplemented() + { + assertAllMethodsOverridden(HiveMetastore.class, MeasuredHiveMetastore.class); + } + + @Test + public void testAllFactoryMethodsImplemented() + { + assertAllMethodsOverridden(HiveMetastoreFactory.class, MeasuredMetastoreFactory.class); + } +} diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java index a4ea5ec77d76..099d4ac62282 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorMetadata.java @@ -79,6 +79,7 @@ import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; import io.trino.spi.function.table.ConnectorTableFunctionHandle; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.GrantInfo; import io.trino.spi.security.Privilege; @@ -282,6 +283,14 @@ public Optional getInfo(ConnectorSession session, ConnectorTableHandle t } } + @Override + public Metrics getMetrics(ConnectorSession session) + { + try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) { + return delegate.getMetrics(session); + } + } + @Override public List listTables(ConnectorSession session, Optional schemaName) { diff --git a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/DistributionSnapshot.java similarity index 81% rename from core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java rename to lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/DistributionSnapshot.java index a992325eaa82..5e79918fe58e 100644 --- a/core/trino-main/src/main/java/io/trino/execution/DistributionSnapshot.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/DistributionSnapshot.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.execution; +package io.trino.plugin.base.metrics; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.databind.annotation.JsonSerialize; @@ -29,21 +29,22 @@ public record DistributionSnapshot(long total, double min, double max, double p01, double p05, double p10, double p25, double p50, double p75, double p90, double p95, double p99) implements Metric { - public DistributionSnapshot(Distribution distribution) + public static DistributionSnapshot fromDistribution(Distribution distribution) { - this( + double[] percentiles = distribution.getPercentiles(1, 5, 10, 25, 50, 75, 90, 95, 99); + return new DistributionSnapshot( distribution.getTotal(), distribution.getMin(), distribution.getMax(), - distribution.getPercentile(1), - distribution.getPercentile(5), - distribution.getPercentile(10), - distribution.getPercentile(25), - distribution.getPercentile(50), - distribution.getPercentile(75), - distribution.getPercentile(90), - distribution.getPercentile(95), - distribution.getPercentile(99)); + percentiles[0], + percentiles[1], + percentiles[2], + percentiles[3], + percentiles[4], + percentiles[5], + percentiles[6], + percentiles[7], + percentiles[8]); } @Override diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java index e9fc606d597a..36354004bde6 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/metrics/TDigestHistogram.java @@ -184,6 +184,16 @@ public synchronized double getPercentile(double percentile) return digest.valueAt(percentile / 100.0); } + @Override + public synchronized double[] getPercentiles(double... percentiles) + { + double[] digestPercentiles = new double[percentiles.length]; + for (int i = 0; i < percentiles.length; i++) { + digestPercentiles[i] = percentiles[i] / 100.0; + } + return digest.valuesAt(digestPercentiles); + } + @Override public String toString() { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java index 3e77c533aed3..dd3a1f168b65 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveMetadata.java @@ -117,6 +117,7 @@ import io.trino.spi.expression.Variable; import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; @@ -830,6 +831,13 @@ public Optional getInfo(ConnectorSession session, ConnectorTableHandle t tableDefaultFileFormat)); } + @Override + public Metrics getMetrics(ConnectorSession session) + { + // HiveMetadata and metastore are created per session + return metastore.getMetrics(); + } + @Override public List listTables(ConnectorSession session, Optional optionalSchemaName) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/CachingHiveMetastoreModule.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/CachingHiveMetastoreModule.java index 63e94f7d6860..570dc5ad7389 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/CachingHiveMetastoreModule.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/CachingHiveMetastoreModule.java @@ -19,6 +19,7 @@ import com.google.inject.Singleton; import io.airlift.configuration.AbstractConfigurationAwareModule; import io.trino.metastore.HiveMetastoreFactory; +import io.trino.metastore.MeasuredHiveMetastore.MeasuredMetastoreFactory; import io.trino.metastore.RawHiveMetastoreFactory; import io.trino.metastore.cache.CachingHiveMetastore; import io.trino.metastore.cache.CachingHiveMetastoreConfig; @@ -65,7 +66,7 @@ public static HiveMetastoreFactory createHiveMetastore( SharedHiveMetastoreCache sharedHiveMetastoreCache) { // cross TX metastore cache is enabled wrapper with caching metastore - return sharedHiveMetastoreCache.createCachingHiveMetastoreFactory(metastoreFactory); + return sharedHiveMetastoreCache.createCachingHiveMetastoreFactory(new MeasuredMetastoreFactory(metastoreFactory)); } @Provides diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java index f8f29e900ae9..31ef18e7c9a8 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java @@ -68,6 +68,7 @@ import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.security.PrincipalType; @@ -1282,6 +1283,11 @@ public void checkSupportsHiveAcidTransactions() delegate.checkSupportsTransactions(); } + public Metrics getMetrics() + { + return delegate.getMetrics(); + } + public void beginQuery(ConnectorSession session) { String queryId = session.getQueryId(); diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index 7bc854803792..aeef1d2e3e14 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -22,6 +22,8 @@ import io.airlift.json.ObjectMapperProvider; import io.airlift.units.DataSize; import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; import io.trino.execution.QueryInfo; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -40,9 +42,11 @@ import io.trino.spi.connector.CatalogSchemaTableName; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.Constraint; +import io.trino.spi.metrics.Metrics; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.security.Identity; import io.trino.spi.security.SelectedRole; +import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.DateType; import io.trino.spi.type.TimestampType; import io.trino.spi.type.Type; @@ -64,11 +68,13 @@ import io.trino.testing.QueryRunner; import io.trino.testing.QueryRunner.MaterializedResultWithPlan; import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.TestingSession; import io.trino.testing.sql.TestTable; import io.trino.testing.sql.TrinoSqlExecutor; import io.trino.type.TypeDeserializer; import org.assertj.core.api.AbstractLongAssert; import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -119,6 +125,7 @@ import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN; import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE; import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE; +import static io.trino.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT; import static io.trino.SystemSessionProperties.MAX_WRITER_TASK_COUNT; import static io.trino.SystemSessionProperties.QUERY_MAX_MEMORY_PER_NODE; import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES; @@ -197,6 +204,7 @@ import static java.util.stream.Collectors.toSet; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.Assumptions.abort; public abstract class BaseHiveConnectorTest @@ -245,6 +253,29 @@ protected static QueryRunner createHiveQueryRunner(HiveQueryRunner.Builder bu return queryRunner; } + @BeforeAll + public void initMockMetricsCatalog() + { + QueryRunner queryRunner = getQueryRunner(); + String mockConnector = "mock_metrics"; + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName(mockConnector) + .withListSchemaNames(_ -> ImmutableList.of("default")) + .withGetTableStatistics(_ -> { + try { + Thread.sleep(110); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return TableStatistics.empty(); + }) + .build())); + + queryRunner.createCatalog("mock_metrics", mockConnector); + } + @Override protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { @@ -9398,6 +9429,39 @@ public void testFlushMetadataDisabled() assertQuerySucceeds("CALL system.flush_metadata_cache()"); } + @Test + public void testCatalogMetadataMetrics() + { + MaterializedResultWithPlan result = getQueryRunner().executeWithPlan( + getSession(), + "SELECT count(*) FROM region r, nation n WHERE r.regionkey = n.regionkey"); + Map metrics = getCatalogMetadataMetrics(result.queryId()); + assertCountMetricExists(metrics, "hive", "metastore.all.time.total"); + assertDistributionMetricExists(metrics, "hive", "metastore.all.time.distribution"); + assertCountMetricExists(metrics, "hive", "metastore.getTable.time.total"); + assertDistributionMetricExists(metrics, "hive", "metastore.getTable.time.distribution"); + assertCountMetricExists(metrics, "hive", "metastore.getTableColumnStatistics.time.total"); + } + + @Test + public void testCatalogMetadataMetricsWithOptimizerTimeoutExceeded() + { + String query = "SELECT count(*) FROM region r, nation n, mock_metrics.default.mock_table m WHERE r.regionkey = n.regionkey"; + try { + Session smallOptimizerTimeout = TestingSession.testSessionBuilder(getSession()) + .setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "100ms") + .build(); + MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(smallOptimizerTimeout, query); + fail(format("Expected query to fail: %s [QueryId: %s]", query, result.queryId())); + } + catch (QueryFailedException e) { + assertThat(e.getMessage()).contains("The optimizer exhausted the time limit"); + Map metrics = getCatalogMetadataMetrics(e.getQueryId()); + assertCountMetricExists(metrics, "hive", "metastore.all.time.total"); + assertCountMetricExists(metrics, "hive", "metastore.getTable.time.total"); + } + } + private static final Set NAMED_COLUMN_ONLY_FORMATS = ImmutableSet.of(HiveStorageFormat.AVRO, HiveStorageFormat.JSON); private Session getParallelWriteSession(Session baseSession) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index d69655cc4611..af3470e5253c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -2345,6 +2345,12 @@ public Optional getInfo(ConnectorSession session, ConnectorTableHandle t totalDeleteFiles)); } + @Override + public io.trino.spi.metrics.Metrics getMetrics(ConnectorSession session) + { + return catalog.getMetrics(); + } + @Override public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java index 12d8419cbac3..823c8b8f9d79 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/TrinoCatalog.java @@ -24,6 +24,7 @@ import io.trino.spi.connector.RelationColumnsMetadata; import io.trino.spi.connector.RelationCommentMetadata; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.metrics.Metrics; import io.trino.spi.security.TrinoPrincipal; import jakarta.annotation.Nullable; import org.apache.iceberg.BaseTable; @@ -200,4 +201,9 @@ void createMaterializedView( void updateColumnComment(ConnectorSession session, SchemaTableName schemaTableName, ColumnIdentity columnIdentity, Optional comment); Optional redirectTable(ConnectorSession session, SchemaTableName tableName, String hiveCatalogName); + + default Metrics getMetrics() + { + return Metrics.EMPTY; + } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java index 79e20e863f90..06460d638b7e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/TrinoHiveCatalog.java @@ -53,6 +53,7 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; import io.trino.spi.connector.ViewNotFoundException; +import io.trino.spi.metrics.Metrics; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.type.TypeManager; import org.apache.iceberg.BaseTable; @@ -880,6 +881,12 @@ public Optional redirectTable(ConnectorSession session, return Optional.empty(); } + @Override + public Metrics getMetrics() + { + return metastore.getMetrics(); + } + @Override protected void invalidateTableCache(SchemaTableName schemaTableName) { diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 4c818a7177f1..f4f9ddcbb827 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -23,6 +23,8 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; import io.trino.execution.StageId; import io.trino.execution.StageInfo; import io.trino.execution.StagesInfo; @@ -42,8 +44,10 @@ import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; +import io.trino.spi.metrics.Metrics; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.statistics.TableStatistics; import io.trino.sql.planner.Plan; import io.trino.sql.planner.optimizations.PlanNodeSearcher; import io.trino.sql.planner.plan.ExchangeNode; @@ -56,9 +60,11 @@ import io.trino.testing.DistributedQueryRunner; import io.trino.testing.MaterializedResult; import io.trino.testing.MaterializedRow; +import io.trino.testing.QueryFailedException; import io.trino.testing.QueryRunner; import io.trino.testing.QueryRunner.MaterializedResultWithPlan; import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.TestingSession; import io.trino.testing.sql.TestTable; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -113,6 +119,7 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static io.trino.SystemSessionProperties.DETERMINE_PARTITION_COUNT_FOR_WRITE_ENABLED; import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; +import static io.trino.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT; import static io.trino.SystemSessionProperties.MAX_HASH_PARTITION_COUNT; import static io.trino.SystemSessionProperties.MAX_WRITER_TASK_COUNT; import static io.trino.SystemSessionProperties.SCALE_WRITERS; @@ -179,6 +186,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.offset; +import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.Assumptions.abort; public abstract class BaseIcebergConnectorTest @@ -217,6 +225,29 @@ protected IcebergQueryRunner.Builder createQueryRunnerBuilder() .setInitialTables(REQUIRED_TPCH_TABLES); } + @BeforeAll + public void initMockMetricsCatalog() + { + QueryRunner queryRunner = getQueryRunner(); + String mockConnector = "mock_metrics"; + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName(mockConnector) + .withListSchemaNames(_ -> ImmutableList.of("default")) + .withGetTableStatistics(_ -> { + try { + Thread.sleep(110); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return TableStatistics.empty(); + }) + .build())); + + queryRunner.createCatalog("mock_metrics", mockConnector); + } + @BeforeAll public void initFileSystem() { @@ -9217,6 +9248,38 @@ public void testCreateTableWithDataLocationButObjectStoreLayoutDisabled() "Data location can only be set when object store layout is enabled"); } + @Test + public void testCatalogMetadataMetrics() + { + MaterializedResultWithPlan result = getQueryRunner().executeWithPlan( + getSession(), + "SELECT count(*) FROM region r, nation n WHERE r.regionkey = n.regionkey"); + Map metrics = getCatalogMetadataMetrics(result.queryId()); + assertCountMetricExists(metrics, "iceberg", "metastore.all.time.total"); + assertDistributionMetricExists(metrics, "iceberg", "metastore.all.time.distribution"); + assertCountMetricExists(metrics, "iceberg", "metastore.getTable.time.total"); + assertDistributionMetricExists(metrics, "iceberg", "metastore.getTable.time.distribution"); + } + + @Test + public void testCatalogMetadataMetricsWithOptimizerTimeoutExceeded() + { + String query = "SELECT count(*) FROM region r, nation n, mock_metrics.default.mock_table m WHERE r.regionkey = n.regionkey"; + try { + Session smallOptimizerTimeout = TestingSession.testSessionBuilder(getSession()) + .setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "100ms") + .build(); + MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(smallOptimizerTimeout, query); + fail(format("Expected query to fail: %s [QueryId: %s]", query, result.queryId())); + } + catch (QueryFailedException e) { + assertThat(e.getMessage()).contains("The optimizer exhausted the time limit"); + Map metrics = getCatalogMetadataMetrics(e.getQueryId()); + assertCountMetricExists(metrics, "iceberg", "metastore.all.time.total"); + assertCountMetricExists(metrics, "iceberg", "metastore.getTable.time.total"); + } + } + @Test @Override public void testSetFieldMapKeyType() diff --git a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java index 8aade6701804..9a1bd7afa600 100644 --- a/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java +++ b/plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseMetadata.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import io.airlift.slice.Slice; import io.trino.metastore.Table; @@ -87,6 +88,8 @@ import io.trino.spi.expression.Constant; import io.trino.spi.function.LanguageFunction; import io.trino.spi.function.SchemaFunctionName; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; import io.trino.spi.security.GrantInfo; import io.trino.spi.security.Privilege; import io.trino.spi.security.RoleGrant; @@ -251,6 +254,25 @@ public Optional getInfo(ConnectorSession session, ConnectorTableHandle t return forHandle(table).getInfo(session, table); } + @Override + public Metrics getMetrics(ConnectorSession session) + { + ImmutableMap.Builder> metrics = ImmutableMap.>builder(); + hiveMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> { + metrics.put("hive." + metricName, metric); + }); + icebergMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> { + metrics.put("iceberg." + metricName, metric); + }); + deltaMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> { + metrics.put("delta." + metricName, metric); + }); + hudiMetadata.getMetrics(session).getMetrics().forEach((metricName, metric) -> { + metrics.put("hudi." + metricName, metric); + }); + return new Metrics(metrics.buildOrThrow()); + } + @Override public List listTables(ConnectorSession session, Optional schemaName) { diff --git a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java index 0e4bc7787ff4..494be1052613 100644 --- a/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java +++ b/plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseConnectorTest.java @@ -13,21 +13,30 @@ */ package io.trino.plugin.lakehouse; +import com.google.common.collect.ImmutableList; import io.trino.Session; +import io.trino.connector.MockConnectorFactory; +import io.trino.connector.MockConnectorPlugin; import io.trino.plugin.hive.containers.Hive3MinioDataLake; +import io.trino.spi.metrics.Metrics; +import io.trino.spi.statistics.TableStatistics; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryFailedException; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.TestingSession; import io.trino.testing.sql.TestTable; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; +import static io.trino.SystemSessionProperties.ITERATIVE_OPTIMIZER_TIMEOUT; import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.MaterializedResult.resultBuilder; @@ -36,8 +45,10 @@ import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; import static io.trino.testing.containers.Minio.MINIO_REGION; import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.Assumptions.abort; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; @@ -81,6 +92,29 @@ public void setUp() copyTpchTables(getQueryRunner(), "tpch", TINY_SCHEMA_NAME, REQUIRED_TPCH_TABLES); } + @BeforeAll + public void initMockMetricsCatalog() + { + QueryRunner queryRunner = getQueryRunner(); + String mockConnector = "mock_metrics"; + queryRunner.installPlugin(new MockConnectorPlugin(MockConnectorFactory.builder() + .withName(mockConnector) + .withListSchemaNames(_ -> ImmutableList.of("default")) + .withGetTableStatistics(_ -> { + try { + Thread.sleep(110); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + return TableStatistics.empty(); + }) + .build())); + + queryRunner.createCatalog("mock_metrics", mockConnector); + } + @Override protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { @@ -365,4 +399,36 @@ public void testShowCreateTable() type = 'ICEBERG' )\\E"""); } + + @Test + public void testCatalogMetadataMetrics() + { + QueryRunner.MaterializedResultWithPlan result = getQueryRunner().executeWithPlan( + getSession(), + "SELECT count(*) FROM region r, nation n WHERE r.regionkey = n.regionkey"); + Map metrics = getCatalogMetadataMetrics(result.queryId()); + assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.total"); + assertDistributionMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.distribution"); + assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.total"); + assertDistributionMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.distribution"); + } + + @Test + public void testCatalogMetadataMetricsWithOptimizerTimeoutExceeded() + { + String query = "SELECT count(*) FROM region r, nation n, mock_metrics.default.mock_table m WHERE r.regionkey = n.regionkey"; + try { + Session smallOptimizerTimeout = TestingSession.testSessionBuilder(getSession()) + .setSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, "100ms") + .build(); + QueryRunner.MaterializedResultWithPlan result = getQueryRunner().executeWithPlan(smallOptimizerTimeout, query); + fail(format("Expected query to fail: %s [QueryId: %s]", query, result.queryId())); + } + catch (QueryFailedException e) { + assertThat(e.getMessage()).contains("The optimizer exhausted the time limit"); + Map metrics = getCatalogMetadataMetrics(e.getQueryId()); + assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.all.time.total"); + assertCountMetricExists(metrics, "lakehouse", "iceberg.metastore.getTable.time.total"); + } + } } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java index bad470fd331c..a898fd7d434f 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/BaseConnectorTest.java @@ -30,9 +30,13 @@ import io.trino.metadata.FunctionManager; import io.trino.metadata.Metadata; import io.trino.metadata.QualifiedObjectName; +import io.trino.plugin.base.metrics.DistributionSnapshot; +import io.trino.plugin.base.metrics.LongCount; import io.trino.server.BasicQueryInfo; +import io.trino.spi.QueryId; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.MaterializedViewFreshness; +import io.trino.spi.metrics.Metrics; import io.trino.spi.security.Identity; import io.trino.sql.planner.OptimizerConfig.JoinDistributionType; import io.trino.sql.planner.Plan; @@ -61,6 +65,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.ConcurrentHashMap; @@ -7474,6 +7479,36 @@ protected Consumer assertPartialLimitWithPreSortedInputsCount(Session sess }; } + protected Map getCatalogMetadataMetrics(QueryId queryId) + { + try { + return getDistributedQueryRunner().getCoordinator() + .getQueryManager() + .getFullQueryInfo(queryId) + .getQueryStats() + .getCatalogMetadataMetrics(); + } + catch (NoSuchElementException e) { + throw new RuntimeException("Couldn't find operator summary, probably due to query statistic collection error", e); + } + } + + protected static void assertDistributionMetricExists(Map metrics, String catalog, String metricKey) + { + assertThat(metrics).containsKey(catalog); + assertThat(metrics.get(catalog).getMetrics()).isNotEmpty(); + assertThat(metrics.get(catalog).getMetrics()).containsKey(metricKey); + assertThat(((DistributionSnapshot) metrics.get(catalog).getMetrics().get(metricKey)).total()).isGreaterThan(0); + } + + protected static void assertCountMetricExists(Map metrics, String catalog, String metricKey) + { + assertThat(metrics).containsKey(catalog); + assertThat(metrics.get(catalog).getMetrics()).isNotEmpty(); + assertThat(metrics.get(catalog).getMetrics()).containsKey(metricKey); + assertThat(((LongCount) metrics.get(catalog).getMetrics().get(metricKey)).getTotal()).isGreaterThan(0); + } + protected void withMockTableListing(String forSchema, Function> listing, Runnable closure) { requireNonNull(forSchema, "forSchema is null"); diff --git a/testing/trino-tests/src/test/java/io/trino/connector/informationschema/TestInformationSchemaConnector.java b/testing/trino-tests/src/test/java/io/trino/connector/informationschema/TestInformationSchemaConnector.java index 7e8ae06e5f33..bc082347993f 100644 --- a/testing/trino-tests/src/test/java/io/trino/connector/informationschema/TestInformationSchemaConnector.java +++ b/testing/trino-tests/src/test/java/io/trino/connector/informationschema/TestInformationSchemaConnector.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; +import com.google.common.collect.Multisets; import io.trino.Session; import io.trino.plugin.tpch.TpchPlugin; import io.trino.testing.AbstractTestQueryFramework; @@ -30,6 +31,7 @@ import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.parallel.Execution; +import java.util.Objects; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -410,6 +412,8 @@ private void assertMetadataCalls(@Language("SQL") String actualSql, @Language("S assertQuery(actualSql, expectedSql); }); + // Every query involves some number of ConnectorMetadata.getMetrics calls, so ignore them. + actualMetadataCallsCount = Multisets.filter(actualMetadataCallsCount, e -> !Objects.equals(e, "ConnectorMetadata.getMetrics")); assertMultisetsEqual(actualMetadataCallsCount, expectedMetadataCallsCount); } } diff --git a/testing/trino-tests/src/test/java/io/trino/connector/system/metadata/TestSystemMetadataConnector.java b/testing/trino-tests/src/test/java/io/trino/connector/system/metadata/TestSystemMetadataConnector.java index a2a0de6fb36f..61e9f2bff27f 100644 --- a/testing/trino-tests/src/test/java/io/trino/connector/system/metadata/TestSystemMetadataConnector.java +++ b/testing/trino-tests/src/test/java/io/trino/connector/system/metadata/TestSystemMetadataConnector.java @@ -415,8 +415,9 @@ private void assertMetadataCalls(@Language("SQL") String actualSql, @Language("S }); actualMetadataCallsCount = actualMetadataCallsCount.stream() - // Every query involves beginQuery and cleanupQuery, so ignore them. - .filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method)) + // Every query involves beginQuery, cleanupQuery and getMetrics, so ignore them. + .filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method) + && !"ConnectorMetadata.getMetrics".equals(method)) .collect(toImmutableMultiset()); assertMultisetsEqual(actualMetadataCallsCount, expectedMetadataCallsCount);