Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2204,8 +2204,9 @@ private void assertMetadataCalls(
});

actualMetadataCallsCount = actualMetadataCallsCount.stream()
// Every query involves beginQuery and cleanupQuery, so ignore them.
.filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method))
// Every query involves beginQuery, cleanupQuery and getMetrics, so ignore them.
.filter(method -> !"ConnectorMetadata.beginQuery".equals(method) && !"ConnectorMetadata.cleanupQuery".equals(method)
&& !"ConnectorMetadata.getMetrics".equals(method))
.collect(toImmutableMultiset());

assertMultisetsEqual(actualMetadataCallsCount, expectedMetadataCallsCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ private static QueryStats immediateFailureQueryStats()
DataSize.ofBytes(0),
ImmutableList.of(),
DynamicFiltersStats.EMPTY,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.trino.NotInTransactionException;
import io.trino.Session;
import io.trino.client.NodeVersion;
import io.trino.exchange.ExchangeInput;
import io.trino.execution.QueryExecution.QueryOutputInfo;
import io.trino.execution.StateMachine.StateChangeListener;
import io.trino.execution.querystats.PlanOptimizersStatsCollector;
import io.trino.execution.warnings.WarningCollector;
import io.trino.metadata.CatalogInfo;
import io.trino.metadata.Metadata;
import io.trino.operator.BlockedReason;
import io.trino.operator.OperatorStats;
Expand Down Expand Up @@ -182,6 +184,7 @@ public class QueryStateMachine
private final AtomicReference<Optional<Output>> output = new AtomicReference<>(Optional.empty());
private final AtomicReference<List<TableInfo>> referencedTables = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<List<RoutineInfo>> routines = new AtomicReference<>(ImmutableList.of());
private final AtomicReference<Map<String, Metrics>> catalogMetadataMetrics = new AtomicReference<>(ImmutableMap.of());
private final StateMachine<Optional<QueryInfo>> finalQueryInfo;

private final WarningCollector warningCollector;
Expand Down Expand Up @@ -386,6 +389,38 @@ static QueryStateMachine beginWithTicker(
return queryStateMachine;
}

private void collectCatalogMetadataMetrics()
{
try {
if (session.getTransactionId().filter(transactionManager::transactionExists).isEmpty()) {
// The metrics collection depends on active transaction as the metrics
// are stored in the transactional ConnectorMetadata, but the collection can be
// run after the query has failed e.g., via cancel.
return;
}

ImmutableMap.Builder<String, Metrics> catalogMetadataMetrics = ImmutableMap.builder();
List<CatalogInfo> activeCatalogs = metadata.listActiveCatalogs(session);
for (CatalogInfo activeCatalog : activeCatalogs) {
Metrics metrics = metadata.getMetrics(session, activeCatalog.catalogName());
if (!metrics.getMetrics().isEmpty()) {
catalogMetadataMetrics.put(activeCatalog.catalogName(), metrics);
}
}

this.catalogMetadataMetrics.set(catalogMetadataMetrics.buildOrThrow());
}
catch (NotInTransactionException e) {
// Ignore, The metrics collection depends on an active transaction and should be skipped
// if there is no one running.
// The exception can be thrown even though there is a check for transaction at the top of the method, because
// the transaction can be committed or aborted concurrently, after the check is done.
}
catch (RuntimeException e) {
QUERY_STATE_LOG.error(e, "Error collecting query catalog metadata metrics: %s", queryId);
}
}

public QueryId getQueryId()
{
return queryId;
Expand Down Expand Up @@ -851,6 +886,10 @@ private QueryStats getQueryStats(Optional<StagesInfo> stages)
}
}

// Try to collect the catalog metadata metrics.
// The collection will fail, and we will use metrics collected earlier if
// the query is already committed or aborted and metadata is not available.
collectCatalogMetadataMetrics();
return new QueryStats(
queryStateTimer.getCreateTime(),
getExecutionStartTime().orElse(null),
Expand Down Expand Up @@ -936,7 +975,7 @@ private QueryStats getQueryStats(Optional<StagesInfo> stages)
stageGcStatistics.build(),

getDynamicFiltersStats(),

catalogMetadataMetrics.get(),
operatorStatsSummary.build(),
planOptimizersStatsCollector.getTopRuleStats());
}
Expand Down Expand Up @@ -1167,6 +1206,7 @@ public boolean transitionToFinishing()
transitionToFailed(e);
return true;
}
collectCatalogMetadataMetrics();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make metadata calls during FINISHING state as well, so we should collect metrics at the end of that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We remove the transaction right after this call, so there is no way to collect the metrics anymore.


Optional<TransactionInfo> transaction = session.getTransactionId().flatMap(transactionManager::getTransactionInfoIfExist);
if (transaction.isPresent() && transaction.get().isAutoCommitContext()) {
Expand Down Expand Up @@ -1244,6 +1284,7 @@ private boolean transitionToFailed(Throwable throwable, boolean log)
}
return false;
}
collectCatalogMetadataMetrics();

try {
if (log) {
Expand Down Expand Up @@ -1549,6 +1590,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getFailedPhysicalWrittenDataSize(),
queryStats.getStageGcStatistics(),
queryStats.getDynamicFiltersStats(),
ImmutableMap.of(),
ImmutableList.of(), // Remove the operator summaries as OperatorInfo (especially DirectExchangeClientStatus) can hold onto a large amount of memory
ImmutableList.of());
}
Expand Down
12 changes: 11 additions & 1 deletion core/trino-main/src/main/java/io/trino/execution/QueryStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import io.trino.operator.TableWriterOperator;
import io.trino.spi.eventlistener.QueryPlanOptimizerStatistics;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
import jakarta.annotation.Nullable;

import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;

Expand Down Expand Up @@ -127,6 +129,7 @@ public class QueryStats

private final DynamicFiltersStats dynamicFiltersStats;

private final Map<String, Metrics> catalogMetadataMetrics;
private final List<OperatorStats> operatorSummaries;
private final List<QueryPlanOptimizerStatistics> optimizerRulesSummaries;

Expand Down Expand Up @@ -218,7 +221,7 @@ public QueryStats(
@JsonProperty("stageGcStatistics") List<StageGcStatistics> stageGcStatistics,

@JsonProperty("dynamicFiltersStats") DynamicFiltersStats dynamicFiltersStats,

@JsonProperty("catalogMetadataMetrics") Map<String, Metrics> catalogMetadataMetrics,
@JsonProperty("operatorSummaries") List<OperatorStats> operatorSummaries,
@JsonProperty("optimizerRulesSummaries") List<QueryPlanOptimizerStatistics> optimizerRulesSummaries)
{
Expand Down Expand Up @@ -324,6 +327,7 @@ public QueryStats(

this.dynamicFiltersStats = requireNonNull(dynamicFiltersStats, "dynamicFiltersStats is null");

this.catalogMetadataMetrics = requireNonNull(catalogMetadataMetrics, "catalogMetadataMetrics is null");
requireNonNull(operatorSummaries, "operatorSummaries is null");
this.operatorSummaries = operatorSummaries.stream().map(OperatorStats::pruneDigests).collect(toImmutableList());

Expand Down Expand Up @@ -770,6 +774,12 @@ public DynamicFiltersStats getDynamicFiltersStats()
return dynamicFiltersStats;
}

@JsonProperty
public Map<String, Metrics> getCatalogMetadataMetrics()
{
return catalogMetadataMetrics;
}

@JsonProperty
public List<OperatorStats> getOperatorSummaries()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.operator.OperatorStats;
import io.trino.operator.PipelineStats;
import io.trino.operator.TaskStats;
import io.trino.plugin.base.metrics.DistributionSnapshot;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.eventlistener.StageGcStatistics;
import io.trino.spi.metrics.Metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.trino.execution.DistributionSnapshot.pruneMetrics;
import static io.trino.execution.StageState.RUNNING;
import static io.trino.plugin.base.metrics.DistributionSnapshot.pruneMetrics;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -102,7 +102,7 @@ public class StageStats
private final Duration failedInputBlockedTime;

private final DataSize bufferedDataSize;
private final Optional<io.trino.execution.DistributionSnapshot> outputBufferUtilization;
private final Optional<io.trino.plugin.base.metrics.DistributionSnapshot> outputBufferUtilization;
private final DataSize outputDataSize;
private final DataSize failedOutputDataSize;
private final long outputPositions;
Expand Down Expand Up @@ -176,7 +176,7 @@ public StageStats(
@JsonProperty("failedInputBlockedTime") Duration failedInputBlockedTime,

@JsonProperty("bufferedDataSize") DataSize bufferedDataSize,
@JsonProperty("outputBufferUtilization") Optional<io.trino.execution.DistributionSnapshot> outputBufferUtilization,
@JsonProperty("outputBufferUtilization") Optional<io.trino.plugin.base.metrics.DistributionSnapshot> outputBufferUtilization,
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("failedOutputDataSize") DataSize failedOutputDataSize,
@JsonProperty("outputPositions") long outputPositions,
Expand Down Expand Up @@ -547,7 +547,7 @@ public DataSize getBufferedDataSize()
}

@JsonProperty
public Optional<io.trino.execution.DistributionSnapshot> getOutputBufferUtilization()
public Optional<io.trino.plugin.base.metrics.DistributionSnapshot> getOutputBufferUtilization()
{
return outputBufferUtilization;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import io.trino.execution.DistributionSnapshot;
import io.trino.plugin.base.metrics.DistributionSnapshot;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.metrics.Metrics;

Expand Down
8 changes: 8 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.LanguageFunction;
import io.trino.spi.function.OperatorType;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.FunctionAuthorization;
import io.trino.spi.security.GrantInfo;
Expand Down Expand Up @@ -147,6 +148,11 @@ Optional<TableExecuteHandle> getTableHandleForExecute(

Optional<Object> getInfo(Session session, TableHandle handle);

/**
* Return connector-specific, metadata operations metrics for the given session.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connector -> catalog

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the metrics are connector-specific, but we return them for the provided catalog.

*/
Metrics getMetrics(Session session, String catalogName);

CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle);

/**
Expand Down Expand Up @@ -483,6 +489,8 @@ Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
*/
List<CatalogInfo> listCatalogs(Session session);

List<CatalogInfo> listActiveCatalogs(Session session);

/**
* Get the names that match the specified table prefix (never null).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import io.trino.spi.function.OperatorType;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.function.Signature;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.FunctionAuthorization;
import io.trino.spi.security.GrantInfo;
Expand Down Expand Up @@ -456,6 +457,14 @@ public Optional<Object> getInfo(Session session, TableHandle handle)
return metadata.getInfo(connectorSession, handle.connectorHandle());
}

@Override
public Metrics getMetrics(Session session, String catalogName)
{
return transactionManager.getRequiredCatalogMetadata(session.getRequiredTransactionId(), catalogName)
.getMetadata(session)
.getMetrics(session.toConnectorSession());
}

@Override
public CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -1386,6 +1395,12 @@ public List<CatalogInfo> listCatalogs(Session session)
return transactionManager.getCatalogs(session.getRequiredTransactionId());
}

@Override
public List<CatalogInfo> listActiveCatalogs(Session session)
{
return transactionManager.getActiveCatalogs(session.getRequiredTransactionId());
}

@Override
public List<QualifiedObjectName> listViews(Session session, QualifiedTablePrefix prefix)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static io.trino.execution.DistributionSnapshot.pruneMetrics;
import static io.trino.plugin.base.metrics.DistributionSnapshot.pruneMetrics;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import io.trino.cost.PlanNodeStatsAndCostSummary;
import io.trino.cost.PlanNodeStatsEstimate;
import io.trino.cost.StatsAndCosts;
import io.trino.execution.DistributionSnapshot;
import io.trino.execution.QueryStats;
import io.trino.execution.StageInfo;
import io.trino.execution.StageStats;
Expand All @@ -42,6 +41,7 @@
import io.trino.metadata.Metadata;
import io.trino.metadata.ResolvedFunction;
import io.trino.metadata.TableHandle;
import io.trino.plugin.base.metrics.DistributionSnapshot;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.expression.FunctionName;
import io.trino.spi.function.CatalogSchemaFunctionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import io.trino.spi.function.LanguageFunction;
import io.trino.spi.function.SchemaFunctionName;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.Privilege;
Expand Down Expand Up @@ -252,6 +253,14 @@ public Optional<Object> getInfo(ConnectorSession session, ConnectorTableHandle t
}
}

@Override
public Metrics getMetrics(ConnectorSession session)
{
// Do not trace getMetrics as this is expected to be a quick local jvm call,
// and adding this span would only obfuscate the trace
return delegate.getMetrics(session);
}

@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import io.trino.spi.function.FunctionMetadata;
import io.trino.spi.function.LanguageFunction;
import io.trino.spi.function.OperatorType;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.FunctionAuthorization;
import io.trino.spi.security.GrantInfo;
Expand Down Expand Up @@ -284,6 +285,14 @@ public Optional<Object> getInfo(Session session, TableHandle handle)
}
}

@Override
public Metrics getMetrics(Session session, String catalogName)
{
// Do not trace getMetrics as this is expected to be a quick local jvm call,
// and adding this span would only obfuscate the trace
return delegate.getMetrics(session, catalogName);
}

@Override
public CatalogSchemaTableName getTableName(Session session, TableHandle tableHandle)
{
Expand Down Expand Up @@ -868,6 +877,15 @@ public List<CatalogInfo> listCatalogs(Session session)
}
}

@Override
public List<CatalogInfo> listActiveCatalogs(Session session)
{
Span span = startSpan("listActiveCatalogs");
try (var _ = scopedSpan(span)) {
return delegate.listActiveCatalogs(session);
}
}

@Override
public List<QualifiedObjectName> listViews(Session session, QualifiedTablePrefix prefix)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public QueryInfo getFullQueryInfo()

ImmutableList.of(),
DynamicFiltersStats.EMPTY,
ImmutableMap.of(),
ImmutableList.of(),
ImmutableList.of()),
Optional.empty(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.execution;

import io.airlift.stats.TDigest;
import io.trino.plugin.base.metrics.DistributionSnapshot;
import io.trino.plugin.base.metrics.TDigestHistogram;
import org.junit.jupiter.api.Test;

Expand Down
Loading
Loading