Skip to content

Commit 2667559

Browse files
authored
fix: disable COUNT(*) row estimation in query enrichment by default (#92)
GetTableAvailability() unconditionally ran SELECT COUNT(*) for every URN during DataHub search enrichment, causing full table scans and timeouts on large tables. Add EstimateRowCounts config flag (default: false) so row counts are skipped unless explicitly enabled via injection.estimate_row_counts: true.
1 parent 0bbb888 commit 2667559

File tree

6 files changed

+94
-37
lines changed

6 files changed

+94
-37
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,10 @@ ext-apps/
5858
# Admin UI
5959
admin-ui/node_modules/
6060
admin-ui/dist/
61+
admin-ui/.vite/
62+
admin-ui/test-results/
63+
64+
# Root node_modules (if any)
65+
node_modules/
6166
internal/adminui/dist/*
6267
!internal/adminui/dist/.gitkeep

pkg/platform/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,7 @@ type InjectionConfig struct {
295295
DataHubQueryEnrichment bool `yaml:"datahub_query_enrichment"`
296296
S3SemanticEnrichment bool `yaml:"s3_semantic_enrichment"`
297297
DataHubStorageEnrichment bool `yaml:"datahub_storage_enrichment"`
298+
EstimateRowCounts bool `yaml:"estimate_row_counts"`
298299
SessionDedup SessionDedupConfig `yaml:"session_dedup"`
299300
}
300301

pkg/platform/config_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,9 @@ func TestApplyDefaults(t *testing.T) {
230230
if cfg.Server.Shutdown.PreShutdownDelay != cfgTestDefaultPreDelay {
231231
t.Errorf("Server.Shutdown.PreShutdownDelay = %v, want %v", cfg.Server.Shutdown.PreShutdownDelay, cfgTestDefaultPreDelay)
232232
}
233+
if cfg.Injection.EstimateRowCounts {
234+
t.Error("Injection.EstimateRowCounts should default to false")
235+
}
233236
}
234237

235238
func TestApplyDefaults_PreservesExisting(t *testing.T) {

pkg/platform/platform.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -895,20 +895,21 @@ func (p *Platform) createQueryProvider() (query.Provider, error) {
895895
}
896896

897897
adapter, err := trinoquery.New(trinoquery.Config{
898-
Host: trinoCfg.Host,
899-
Port: trinoCfg.Port,
900-
User: trinoCfg.User,
901-
Password: trinoCfg.Password,
902-
Catalog: trinoCfg.Catalog,
903-
Schema: trinoCfg.Schema,
904-
SSL: trinoCfg.SSL,
905-
SSLVerify: trinoCfg.SSLVerify,
906-
Timeout: trinoCfg.Timeout,
907-
DefaultLimit: trinoCfg.DefaultLimit,
908-
MaxLimit: trinoCfg.MaxLimit,
909-
ReadOnly: trinoCfg.ReadOnly,
910-
ConnectionName: trinoCfg.ConnectionName,
911-
CatalogMapping: p.config.Query.URNMapping.CatalogMapping,
898+
Host: trinoCfg.Host,
899+
Port: trinoCfg.Port,
900+
User: trinoCfg.User,
901+
Password: trinoCfg.Password,
902+
Catalog: trinoCfg.Catalog,
903+
Schema: trinoCfg.Schema,
904+
SSL: trinoCfg.SSL,
905+
SSLVerify: trinoCfg.SSLVerify,
906+
Timeout: trinoCfg.Timeout,
907+
DefaultLimit: trinoCfg.DefaultLimit,
908+
MaxLimit: trinoCfg.MaxLimit,
909+
ReadOnly: trinoCfg.ReadOnly,
910+
ConnectionName: trinoCfg.ConnectionName,
911+
CatalogMapping: p.config.Query.URNMapping.CatalogMapping,
912+
EstimateRowCounts: p.config.Injection.EstimateRowCounts,
912913
})
913914
if err != nil {
914915
return nil, fmt.Errorf("creating trino query provider: %w", err)

pkg/query/trino/adapter.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ type Config struct {
4141
// This is the reverse of the semantic layer's catalog mapping.
4242
// For example: {"warehouse": "rdbms"} means DataHub "warehouse" → Trino "rdbms"
4343
CatalogMapping map[string]string
44+
45+
// EstimateRowCounts controls whether GetTableAvailability runs
46+
// SELECT COUNT(*) to estimate row counts. Disabled by default because
47+
// COUNT(*) can cause full table scans on large tables, making DataHub
48+
// search enrichment very slow.
49+
EstimateRowCounts bool
4450
}
4551

4652
// Client defines the interface for Trino operations.
@@ -194,27 +200,11 @@ func (a *Adapter) GetTableAvailability(ctx context.Context, urn string) (*query.
194200
}, nil
195201
}
196202

197-
// Try to get an estimated row count by running a quick COUNT query
203+
// Optionally estimate row count via COUNT(*). Disabled by default
204+
// because COUNT(*) can trigger full table scans on large tables.
198205
var estimatedRows *int64
199-
countSQL := fmt.Sprintf("SELECT COUNT(*) FROM %s.%s.%s",
200-
trinoclient.QuoteIdentifier(table.Catalog),
201-
trinoclient.QuoteIdentifier(table.Schema),
202-
trinoclient.QuoteIdentifier(table.Table),
203-
)
204-
205-
countResult, err := a.client.Query(ctx, countSQL, trinoclient.QueryOptions{Limit: 1})
206-
if err == nil && len(countResult.Rows) > 0 {
207-
for _, v := range countResult.Rows[0] {
208-
if count, ok := v.(int64); ok {
209-
estimatedRows = &count
210-
break
211-
}
212-
if count, ok := v.(float64); ok {
213-
c := int64(count)
214-
estimatedRows = &c
215-
break
216-
}
217-
}
206+
if a.cfg.EstimateRowCounts {
207+
estimatedRows = a.estimateRowCount(ctx, table)
218208
}
219209

220210
return &query.TableAvailability{
@@ -225,6 +215,31 @@ func (a *Adapter) GetTableAvailability(ctx context.Context, urn string) (*query.
225215
}, nil
226216
}
227217

218+
// estimateRowCount runs SELECT COUNT(*) and returns the result, or nil on error.
219+
func (a *Adapter) estimateRowCount(ctx context.Context, table *query.TableIdentifier) *int64 {
220+
countSQL := fmt.Sprintf("SELECT COUNT(*) FROM %s.%s.%s",
221+
trinoclient.QuoteIdentifier(table.Catalog),
222+
trinoclient.QuoteIdentifier(table.Schema),
223+
trinoclient.QuoteIdentifier(table.Table),
224+
)
225+
226+
result, err := a.client.Query(ctx, countSQL, trinoclient.QueryOptions{Limit: 1})
227+
if err != nil || len(result.Rows) == 0 {
228+
return nil
229+
}
230+
231+
for _, v := range result.Rows[0] {
232+
if count, ok := v.(int64); ok {
233+
return &count
234+
}
235+
if count, ok := v.(float64); ok {
236+
c := int64(count)
237+
return &c
238+
}
239+
}
240+
return nil
241+
}
242+
228243
// GetQueryExamples returns sample queries for a table.
229244
func (a *Adapter) GetQueryExamples(ctx context.Context, urn string) ([]query.Example, error) {
230245
table, err := a.ResolveTable(ctx, urn)

pkg/query/trino/adapter_test.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func TestGetTableAvailability_TableExists(t *testing.T) {
210210
}, nil
211211
},
212212
}
213-
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "test"}, mock)
213+
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "test", EstimateRowCounts: true}, mock)
214214

215215
result, err := adapter.GetTableAvailability(ctx, "urn:li:dataset:(urn:li:dataPlatform:trino,schema.table,PROD)")
216216
if err != nil {
@@ -236,7 +236,7 @@ func TestGetTableAvailability_FloatCount(t *testing.T) {
236236
}, nil
237237
},
238238
}
239-
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "test"}, mock)
239+
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "test", EstimateRowCounts: true}, mock)
240240

241241
result, err := adapter.GetTableAvailability(ctx, "urn:li:dataset:(urn:li:dataPlatform:trino,schema.table,PROD)")
242242
if err != nil {
@@ -247,6 +247,38 @@ func TestGetTableAvailability_FloatCount(t *testing.T) {
247247
}
248248
}
249249

250+
func TestGetTableAvailability_RowCountsDisabled(t *testing.T) {
251+
ctx := context.Background()
252+
queryCalled := false
253+
mock := &mockTrinoClient{
254+
describeTableFunc: func(_ context.Context, _, _, _ string) (*trinoclient.TableInfo, error) {
255+
return &trinoclient.TableInfo{Name: "test_table"}, nil
256+
},
257+
queryFunc: func(_ context.Context, _ string, _ trinoclient.QueryOptions) (*trinoclient.QueryResult, error) {
258+
queryCalled = true
259+
return &trinoclient.QueryResult{
260+
Rows: []map[string]any{{"_col0": int64(adapterTestRowCount100)}},
261+
}, nil
262+
},
263+
}
264+
// EstimateRowCounts defaults to false (zero value)
265+
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "test"}, mock)
266+
267+
result, err := adapter.GetTableAvailability(ctx, "urn:li:dataset:(urn:li:dataPlatform:trino,schema.table,PROD)")
268+
if err != nil {
269+
t.Fatalf(adapterTestUnexpectedErr, err)
270+
}
271+
if !result.Available {
272+
t.Error("expected Available to be true")
273+
}
274+
if result.EstimatedRows != nil {
275+
t.Errorf("expected EstimatedRows to be nil when row counts disabled, got %d", *result.EstimatedRows)
276+
}
277+
if queryCalled {
278+
t.Error("expected Query to NOT be called when EstimateRowCounts is false")
279+
}
280+
}
281+
250282
func TestGetTableAvailability_NotExists(t *testing.T) {
251283
ctx := context.Background()
252284
mock := &mockTrinoClient{
@@ -316,7 +348,7 @@ func TestGetExecutionContext(t *testing.T) {
316348
}, nil
317349
},
318350
}
319-
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "main"}, mock)
351+
adapter, _ := NewWithClient(Config{Catalog: "hive", ConnectionName: "main", EstimateRowCounts: true}, mock)
320352
ctx := context.Background()
321353

322354
urns := []string{

0 commit comments

Comments
 (0)