Skip to content

Commit 5155cdf

Browse files
samikshya-dbclaude
andcommitted
[PECOBLR-1145] Implement telemetry core infrastructure (config & tags)
Implement Phase 1 of telemetry infrastructure for the Go driver: - Add Config struct with all telemetry configuration fields - Implement DefaultConfig() with telemetry disabled by default - Add ParseTelemetryConfig() for DSN parameter parsing - Define tag constants for connection, statement, and error metrics - Implement tag export scope filtering (local vs Databricks) - Add comprehensive unit tests for config and tag filtering Note: Telemetry is disabled by default and will be enabled after full testing and validation is complete. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 45843b7 commit 5155cdf

File tree

5 files changed

+546
-6
lines changed

5 files changed

+546
-6
lines changed

telemetry/DESIGN.md

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
This document outlines a **telemetry design** for the Databricks SQL Go driver that collects usage metrics and exports them to the Databricks telemetry service. The design leverages Go's `context.Context` and middleware patterns to instrument driver operations without impacting performance.
66

7+
**Important Note:** Telemetry is **disabled by default** and will be enabled only after full testing and validation is complete.
8+
79
**Key Objectives:**
810
- Collect driver usage metrics and performance data
911
- Export aggregated metrics to Databricks telemetry service
@@ -1364,9 +1366,10 @@ type Config struct {
13641366
}
13651367

13661368
// DefaultConfig returns default telemetry configuration.
1369+
// Note: Telemetry is disabled by default and will be enabled after full testing and validation.
13671370
func DefaultConfig() *Config {
13681371
return &Config{
1369-
Enabled: true,
1372+
Enabled: false, // Disabled by default until testing is complete
13701373
BatchSize: 100,
13711374
FlushInterval: 5 * time.Second,
13721375
MaxRetries: 3,
@@ -1733,11 +1736,11 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
17331736

17341737
## 11. Implementation Checklist
17351738

1736-
### Phase 1: Core Infrastructure
1737-
- [ ] Create `telemetry` package structure
1738-
- [ ] Implement `config.go` with configuration types
1739-
- [ ] Implement `tags.go` with tag definitions and filtering
1740-
- [ ] Add unit tests for configuration and tags
1739+
### Phase 1: Core Infrastructure ✅ COMPLETED (PECOBLR-1145)
1740+
- [x] Create `telemetry` package structure
1741+
- [x] Implement `config.go` with configuration types
1742+
- [x] Implement `tags.go` with tag definitions and filtering
1743+
- [x] Add unit tests for configuration and tags
17411744

17421745
### Phase 2: Per-Host Management
17431746
- [ ] Implement `featureflag.go` with caching and reference counting

telemetry/config.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package telemetry
2+
3+
import (
4+
"strconv"
5+
"time"
6+
)
7+
8+
// Config holds telemetry configuration.
9+
type Config struct {
10+
// Enabled controls whether telemetry is active
11+
Enabled bool
12+
13+
// BatchSize is the number of metrics to batch before flushing
14+
BatchSize int
15+
16+
// FlushInterval is how often to flush metrics
17+
FlushInterval time.Duration
18+
19+
// MaxRetries is the maximum number of retry attempts
20+
MaxRetries int
21+
22+
// RetryDelay is the base delay between retries
23+
RetryDelay time.Duration
24+
25+
// CircuitBreakerEnabled enables circuit breaker protection
26+
CircuitBreakerEnabled bool
27+
28+
// CircuitBreakerThreshold is failures before opening circuit
29+
CircuitBreakerThreshold int
30+
31+
// CircuitBreakerTimeout is time before retrying after open
32+
CircuitBreakerTimeout time.Duration
33+
}
34+
35+
// DefaultConfig returns default telemetry configuration.
36+
// Note: Telemetry is disabled by default and will be enabled after full testing and validation.
37+
func DefaultConfig() *Config {
38+
return &Config{
39+
Enabled: false, // Disabled by default until testing is complete
40+
BatchSize: 100,
41+
FlushInterval: 5 * time.Second,
42+
MaxRetries: 3,
43+
RetryDelay: 100 * time.Millisecond,
44+
CircuitBreakerEnabled: true,
45+
CircuitBreakerThreshold: 5,
46+
CircuitBreakerTimeout: 1 * time.Minute,
47+
}
48+
}
49+
50+
// ParseTelemetryConfig extracts telemetry config from DSN query parameters.
51+
func ParseTelemetryConfig(params map[string]string) *Config {
52+
cfg := DefaultConfig()
53+
54+
if v, ok := params["telemetry"]; ok {
55+
cfg.Enabled = v == "true" || v == "1"
56+
}
57+
58+
if v, ok := params["telemetry_batch_size"]; ok {
59+
if size, err := strconv.Atoi(v); err == nil && size > 0 {
60+
cfg.BatchSize = size
61+
}
62+
}
63+
64+
if v, ok := params["telemetry_flush_interval"]; ok {
65+
if duration, err := time.ParseDuration(v); err == nil {
66+
cfg.FlushInterval = duration
67+
}
68+
}
69+
70+
return cfg
71+
}

telemetry/config_test.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
package telemetry
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestDefaultConfig(t *testing.T) {
9+
cfg := DefaultConfig()
10+
11+
// Verify telemetry is disabled by default
12+
if cfg.Enabled {
13+
t.Error("Expected telemetry to be disabled by default, got enabled")
14+
}
15+
16+
// Verify other defaults
17+
if cfg.BatchSize != 100 {
18+
t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize)
19+
}
20+
21+
if cfg.FlushInterval != 5*time.Second {
22+
t.Errorf("Expected FlushInterval 5s, got %v", cfg.FlushInterval)
23+
}
24+
25+
if cfg.MaxRetries != 3 {
26+
t.Errorf("Expected MaxRetries 3, got %d", cfg.MaxRetries)
27+
}
28+
29+
if cfg.RetryDelay != 100*time.Millisecond {
30+
t.Errorf("Expected RetryDelay 100ms, got %v", cfg.RetryDelay)
31+
}
32+
33+
if !cfg.CircuitBreakerEnabled {
34+
t.Error("Expected CircuitBreakerEnabled true, got false")
35+
}
36+
37+
if cfg.CircuitBreakerThreshold != 5 {
38+
t.Errorf("Expected CircuitBreakerThreshold 5, got %d", cfg.CircuitBreakerThreshold)
39+
}
40+
41+
if cfg.CircuitBreakerTimeout != 1*time.Minute {
42+
t.Errorf("Expected CircuitBreakerTimeout 1m, got %v", cfg.CircuitBreakerTimeout)
43+
}
44+
}
45+
46+
func TestParseTelemetryConfig_EmptyParams(t *testing.T) {
47+
params := map[string]string{}
48+
cfg := ParseTelemetryConfig(params)
49+
50+
// Should return defaults
51+
if cfg.Enabled {
52+
t.Error("Expected telemetry to be disabled by default")
53+
}
54+
55+
if cfg.BatchSize != 100 {
56+
t.Errorf("Expected BatchSize 100, got %d", cfg.BatchSize)
57+
}
58+
}
59+
60+
func TestParseTelemetryConfig_EnabledTrue(t *testing.T) {
61+
params := map[string]string{
62+
"telemetry": "true",
63+
}
64+
cfg := ParseTelemetryConfig(params)
65+
66+
if !cfg.Enabled {
67+
t.Error("Expected telemetry to be enabled when set to 'true'")
68+
}
69+
}
70+
71+
func TestParseTelemetryConfig_Enabled1(t *testing.T) {
72+
params := map[string]string{
73+
"telemetry": "1",
74+
}
75+
cfg := ParseTelemetryConfig(params)
76+
77+
if !cfg.Enabled {
78+
t.Error("Expected telemetry to be enabled when set to '1'")
79+
}
80+
}
81+
82+
func TestParseTelemetryConfig_EnabledFalse(t *testing.T) {
83+
params := map[string]string{
84+
"telemetry": "false",
85+
}
86+
cfg := ParseTelemetryConfig(params)
87+
88+
if cfg.Enabled {
89+
t.Error("Expected telemetry to be disabled when set to 'false'")
90+
}
91+
}
92+
93+
func TestParseTelemetryConfig_BatchSize(t *testing.T) {
94+
params := map[string]string{
95+
"telemetry_batch_size": "50",
96+
}
97+
cfg := ParseTelemetryConfig(params)
98+
99+
if cfg.BatchSize != 50 {
100+
t.Errorf("Expected BatchSize 50, got %d", cfg.BatchSize)
101+
}
102+
}
103+
104+
func TestParseTelemetryConfig_BatchSizeInvalid(t *testing.T) {
105+
params := map[string]string{
106+
"telemetry_batch_size": "invalid",
107+
}
108+
cfg := ParseTelemetryConfig(params)
109+
110+
// Should fall back to default
111+
if cfg.BatchSize != 100 {
112+
t.Errorf("Expected BatchSize to fallback to 100, got %d", cfg.BatchSize)
113+
}
114+
}
115+
116+
func TestParseTelemetryConfig_BatchSizeZero(t *testing.T) {
117+
params := map[string]string{
118+
"telemetry_batch_size": "0",
119+
}
120+
cfg := ParseTelemetryConfig(params)
121+
122+
// Should ignore zero and use default
123+
if cfg.BatchSize != 100 {
124+
t.Errorf("Expected BatchSize to fallback to 100 when zero, got %d", cfg.BatchSize)
125+
}
126+
}
127+
128+
func TestParseTelemetryConfig_BatchSizeNegative(t *testing.T) {
129+
params := map[string]string{
130+
"telemetry_batch_size": "-10",
131+
}
132+
cfg := ParseTelemetryConfig(params)
133+
134+
// Should ignore negative and use default
135+
if cfg.BatchSize != 100 {
136+
t.Errorf("Expected BatchSize to fallback to 100 when negative, got %d", cfg.BatchSize)
137+
}
138+
}
139+
140+
func TestParseTelemetryConfig_FlushInterval(t *testing.T) {
141+
params := map[string]string{
142+
"telemetry_flush_interval": "10s",
143+
}
144+
cfg := ParseTelemetryConfig(params)
145+
146+
if cfg.FlushInterval != 10*time.Second {
147+
t.Errorf("Expected FlushInterval 10s, got %v", cfg.FlushInterval)
148+
}
149+
}
150+
151+
func TestParseTelemetryConfig_FlushIntervalInvalid(t *testing.T) {
152+
params := map[string]string{
153+
"telemetry_flush_interval": "invalid",
154+
}
155+
cfg := ParseTelemetryConfig(params)
156+
157+
// Should fall back to default
158+
if cfg.FlushInterval != 5*time.Second {
159+
t.Errorf("Expected FlushInterval to fallback to 5s, got %v", cfg.FlushInterval)
160+
}
161+
}
162+
163+
func TestParseTelemetryConfig_MultipleParams(t *testing.T) {
164+
params := map[string]string{
165+
"telemetry": "true",
166+
"telemetry_batch_size": "200",
167+
"telemetry_flush_interval": "30s",
168+
}
169+
cfg := ParseTelemetryConfig(params)
170+
171+
if !cfg.Enabled {
172+
t.Error("Expected telemetry to be enabled")
173+
}
174+
175+
if cfg.BatchSize != 200 {
176+
t.Errorf("Expected BatchSize 200, got %d", cfg.BatchSize)
177+
}
178+
179+
if cfg.FlushInterval != 30*time.Second {
180+
t.Errorf("Expected FlushInterval 30s, got %v", cfg.FlushInterval)
181+
}
182+
183+
// Other fields should still have defaults
184+
if cfg.MaxRetries != 3 {
185+
t.Errorf("Expected MaxRetries to remain default 3, got %d", cfg.MaxRetries)
186+
}
187+
}

telemetry/tags.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package telemetry
2+
3+
// Tag names for connection metrics
4+
const (
5+
TagWorkspaceID = "workspace.id"
6+
TagSessionID = "session.id"
7+
TagDriverVersion = "driver.version"
8+
TagDriverOS = "driver.os"
9+
TagDriverRuntime = "driver.runtime"
10+
TagServerAddress = "server.address" // Not exported to Databricks
11+
)
12+
13+
// Tag names for statement metrics
14+
const (
15+
TagStatementID = "statement.id"
16+
TagResultFormat = "result.format"
17+
TagResultChunkCount = "result.chunk_count"
18+
TagResultBytesDownloaded = "result.bytes_downloaded"
19+
TagCompressionEnabled = "result.compression_enabled"
20+
TagPollCount = "poll.count"
21+
TagPollLatency = "poll.latency_ms"
22+
)
23+
24+
// Tag names for error metrics
25+
const (
26+
TagErrorType = "error.type"
27+
TagErrorCode = "error.code"
28+
)
29+
30+
// Feature flag tags
31+
const (
32+
TagFeatureCloudFetch = "feature.cloudfetch"
33+
TagFeatureLZ4 = "feature.lz4"
34+
TagFeatureDirectResults = "feature.direct_results"
35+
)
36+
37+
// tagExportScope defines where a tag can be exported.
38+
type tagExportScope int
39+
40+
const (
41+
exportNone tagExportScope = 0
42+
exportLocal = 1 << iota
43+
exportDatabricks
44+
exportAll = exportLocal | exportDatabricks
45+
)
46+
47+
// tagDefinition defines a metric tag and its export scope.
48+
type tagDefinition struct {
49+
name string
50+
exportScope tagExportScope
51+
description string
52+
required bool
53+
}
54+
55+
// connectionTags returns tags allowed for connection events.
56+
func connectionTags() []tagDefinition {
57+
return []tagDefinition{
58+
{TagWorkspaceID, exportDatabricks, "Databricks workspace ID", true},
59+
{TagSessionID, exportDatabricks, "Connection session ID", true},
60+
{TagDriverVersion, exportAll, "Driver version", false},
61+
{TagDriverOS, exportAll, "Operating system", false},
62+
{TagDriverRuntime, exportAll, "Go runtime version", false},
63+
{TagFeatureCloudFetch, exportDatabricks, "CloudFetch enabled", false},
64+
{TagFeatureLZ4, exportDatabricks, "LZ4 compression enabled", false},
65+
{TagServerAddress, exportLocal, "Server address (local only)", false},
66+
}
67+
}
68+
69+
// statementTags returns tags allowed for statement events.
70+
func statementTags() []tagDefinition {
71+
return []tagDefinition{
72+
{TagStatementID, exportDatabricks, "Statement ID", true},
73+
{TagSessionID, exportDatabricks, "Session ID", true},
74+
{TagResultFormat, exportDatabricks, "Result format", false},
75+
{TagResultChunkCount, exportDatabricks, "Chunk count", false},
76+
{TagResultBytesDownloaded, exportDatabricks, "Bytes downloaded", false},
77+
{TagCompressionEnabled, exportDatabricks, "Compression enabled", false},
78+
{TagPollCount, exportDatabricks, "Poll count", false},
79+
{TagPollLatency, exportDatabricks, "Poll latency", false},
80+
}
81+
}
82+
83+
// shouldExportToDatabricks returns true if tag should be exported to Databricks.
84+
func shouldExportToDatabricks(metricType, tagName string) bool {
85+
var tags []tagDefinition
86+
switch metricType {
87+
case "connection":
88+
tags = connectionTags()
89+
case "statement":
90+
tags = statementTags()
91+
default:
92+
return false
93+
}
94+
95+
for _, tag := range tags {
96+
if tag.name == tagName {
97+
return tag.exportScope&exportDatabricks != 0
98+
}
99+
}
100+
return false
101+
}

0 commit comments

Comments
 (0)