Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
134d8b8
docs: add fix-telemetry-gaps design document
Mar 10, 2026
2867946
docs: move E2E tests to Phase 1 (test-first approach)
Mar 11, 2026
425a554
docs: consolidate implementation into 2 phases (Thrift gaps first, SE…
Mar 12, 2026
b498f06
Build E2E test infrastructure with CapturingTelemetryExporter\n\nTask…
Mar 13, 2026
3d80f8d
Populate runtime_vendor and client_app_name in DriverSystemConfigurat…
Mar 13, 2026
9fdd789
Populate auth_type on root telemetry log\n\nTask ID: task-1.3-auth-ty…
Mar 13, 2026
12b28b2
Populate WorkspaceId in TelemetrySessionContext\n\nTask ID: task-1.4-…
Mar 13, 2026
5a6d1ef
Expand DriverConnectionParameters with additional fields\n\nTask ID: …
Mar 13, 2026
128a824
Add ChunkMetrics aggregation to CloudFetchDownloader\n\nTask ID: task…
Mar 13, 2026
2e676d4
Expose GetChunkMetrics() on CloudFetchReader interface\n\nTask ID: ta…
Mar 13, 2026
54e94e1
Call SetChunkDetails() in DatabricksStatement.EmitTelemetry()\n\nTask…
Mar 13, 2026
ac0a512
Track retry_count in SqlExecutionEvent\n\nTask ID: task-1.9-track-ret…
Mar 13, 2026
532e529
Mark internal calls with is_internal_call flag\n\nTask ID: task-1.10-…
Mar 13, 2026
a6b030a
Add telemetry for metadata operations (GetObjects, GetTableTypes)\n\n…
Mar 13, 2026
80477df
Verify all Phase 1 E2E tests pass\n\nTask ID: task-1.12-verify-phase1…
Mar 13, 2026
a811006
fix(csharp): remove accidentally committed demo submodule breaking CI
Mar 13, 2026
e33bee4
fix(csharp): fix lint issues - trailing whitespace, license headers, …
Mar 13, 2026
e953996
fix(csharp): remove extra trailing newline in telemetry-design.md
Mar 13, 2026
d504b0a
fix(csharp): address PR review feedback - test isolation, assertions,…
Mar 13, 2026
6a928e7
address comments
Mar 17, 2026
490d943
address pr comments
Mar 17, 2026
19ae1c0
fix(csharp): address PR review - use existing constants, extract org …
Mar 18, 2026
8da5457
fix(csharp): emit correct metadata telemetry for statement-level meta…
Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,6 @@ generated_task_specs.json

# Git worktrees
.worktrees/

# Demo directory (local only)
demo/
80 changes: 80 additions & 0 deletions csharp/doc/telemetry-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -2846,3 +2846,83 @@ This **direct object telemetry design (V3)** provides a simple approach to colle
4. **Deterministic emission**: Exactly one telemetry event per statement — on reader dispose (success) or catch block (error)
5. **Flush-before-close**: Connection dispose blocks until all pending telemetry is sent to Databricks
6. **JDBC-compatible**: snake_case JSON field names, same proto schema, same export endpoint

---

## Implementation Notes - E2E Test Infrastructure (2026-03-13)

### Files Implemented

1. **CapturingTelemetryExporter.cs** (`csharp/test/E2E/Telemetry/CapturingTelemetryExporter.cs`)
- Thread-safe telemetry event capture using `ConcurrentBag<TelemetryFrontendLog>`
- Export call counting for validation
- Reset capability for test cleanup

2. **TelemetryTestHelpers.cs** (`csharp/test/E2E/Telemetry/TelemetryTestHelpers.cs`)
- `CreateConnectionWithCapturingTelemetry()` - Uses `TelemetryClientManager.ExporterOverride` to inject test exporter
- `WaitForTelemetryEvents()` - Waits for expected telemetry events with timeout
- Proto field assertion helpers for session, system config, connection params, SQL operations, and errors

3. **TelemetryBaselineTests.cs** (`csharp/test/E2E/Telemetry/TelemetryBaselineTests.cs`)
- 10 baseline E2E tests validating all currently populated proto fields
- Tests against real Databricks workspace (no backend connectivity required)
- All tests passing ✅

### Test Coverage

Baseline tests validate:
- ✅ session_id population
- ✅ sql_statement_id population
- ✅ operation_latency_ms > 0
- ✅ system_configuration fields (driver_version, driver_name, os_name, runtime_name)
- ✅ driver_connection_params.mode is set
- ✅ sql_operation fields (statement_type, operation_type, result_latency)
- ✅ Multiple statements share session_id but have unique statement_ids
- ✅ Telemetry disabled when telemetry.enabled=false
- ✅ error_info populated on SQL errors
- ✅ UPDATE statement telemetry

### Implementation Patterns Discovered

1. **Exporter Override**: `TelemetryClientManager.ExporterOverride` provides global test exporter injection
2. **Proto Enums**: Use nested structure `Statement.Types.Type.Query`, `Operation.Types.Type.ExecuteStatement`, etc.
3. **Name Collision**: Proto `Statement` conflicts with `AdbcStatement` - resolved with type aliases:
```csharp
using ProtoStatement = AdbcDrivers.Databricks.Telemetry.Proto.Statement;
using ProtoOperation = AdbcDrivers.Databricks.Telemetry.Proto.Operation;
using ProtoDriverMode = AdbcDrivers.Databricks.Telemetry.Proto.DriverMode;
```
4. **QueryResult**: `ExecuteQuery()` returns `QueryResult` with `Stream` property (IDisposable)

### Test Pattern

```csharp
CapturingTelemetryExporter exporter = null!;
AdbcConnection? connection = null;

try
{
var properties = TestEnvironment.GetDriverParameters(TestConfiguration);
(connection, exporter) = TelemetryTestHelpers.CreateConnectionWithCapturingTelemetry(properties);

// Execute operation
using var statement = connection.CreateStatement();
statement.SqlQuery = "SELECT 1";
var result = statement.ExecuteQuery();
using var reader = result.Stream;

statement.Dispose();

// Wait for and validate telemetry
var logs = await TelemetryTestHelpers.WaitForTelemetryEvents(exporter, expectedCount: 1);
var protoLog = TelemetryTestHelpers.GetProtoLog(logs[0]);

Assert.False(string.IsNullOrEmpty(protoLog.SessionId));
// ... more assertions
}
finally
{
connection?.Dispose();
TelemetryTestHelpers.ClearExporterOverride();
}
```
221 changes: 219 additions & 2 deletions csharp/src/DatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ internal class DatabricksConnection : SparkHttpConnection
// Telemetry fields
private ITelemetryClient? _telemetryClient;
private string? _host;
private TOpenSessionResp? _openSessionResp;
internal TelemetrySessionContext? TelemetrySession { get; private set; }

/// <summary>
Expand Down Expand Up @@ -430,6 +431,138 @@ protected override HttpMessageHandler CreateHttpHandler()

protected override string DriverName => DatabricksDriverName;

/// <summary>
/// Overrides GetObjects to emit telemetry with appropriate operation type based on depth.
/// </summary>
public override IArrowArrayStream GetObjects(
GetObjectsDepth depth,
string? catalogPattern,
string? dbSchemaPattern,
string? tableNamePattern,
IReadOnlyList<string>? tableTypes,
string? columnNamePattern)
{
var operationType = depth switch
{
GetObjectsDepth.Catalogs => Telemetry.Proto.Operation.Types.Type.ListCatalogs,
GetObjectsDepth.DbSchemas => Telemetry.Proto.Operation.Types.Type.ListSchemas,
GetObjectsDepth.Tables => Telemetry.Proto.Operation.Types.Type.ListTables,
GetObjectsDepth.All => Telemetry.Proto.Operation.Types.Type.ListColumns,
_ => Telemetry.Proto.Operation.Types.Type.Unspecified
};

return ExecuteWithMetadataTelemetry(
operationType,
() => base.GetObjects(depth, catalogPattern, dbSchemaPattern, tableNamePattern, tableTypes, columnNamePattern));
}

/// <summary>
/// Overrides GetTableTypes to emit telemetry with LIST_TABLE_TYPES operation type.
/// </summary>
public override IArrowArrayStream GetTableTypes()
{
return ExecuteWithMetadataTelemetry(
Telemetry.Proto.Operation.Types.Type.ListTableTypes,
() => base.GetTableTypes());
}

/// <summary>
/// Executes a metadata operation with telemetry instrumentation.
/// Metadata operations don't track batch/consumption timing since results are returned inline.
/// </summary>
private T ExecuteWithMetadataTelemetry<T>(Telemetry.Proto.Operation.Types.Type operationType, Func<T> operation)
{
return this.TraceActivity(activity =>
{
StatementTelemetryContext? telemetryContext = null;
try
{
if (TelemetrySession?.TelemetryClient != null)
{
telemetryContext = new StatementTelemetryContext(TelemetrySession)
{
StatementType = Telemetry.Proto.Statement.Types.Type.Metadata,
OperationType = operationType,
ResultFormat = Telemetry.Proto.ExecutionResult.Types.Format.InlineArrow,
IsCompressed = false
};

activity?.SetTag("telemetry.operation_type", operationType.ToString());
activity?.SetTag("telemetry.statement_type", "METADATA");
}
}
catch (Exception ex)
{
activity?.AddEvent(new System.Diagnostics.ActivityEvent("telemetry.context_creation.error",
tags: new System.Diagnostics.ActivityTagsCollection
{
{ "error.type", ex.GetType().Name },
{ "error.message", ex.Message }
}));
}

T result;
try
{
result = operation();
}
catch (Exception ex)
{
if (telemetryContext != null)
{
try
{
telemetryContext.HasError = true;
telemetryContext.ErrorName = ex.GetType().Name;
telemetryContext.ErrorMessage = ex.Message;
}
catch
{
// Swallow telemetry errors
}
}
throw;
}
finally
{
if (telemetryContext != null)
{
try
{
var telemetryLog = telemetryContext.BuildTelemetryLog();

var frontendLog = new Telemetry.Models.TelemetryFrontendLog
{
WorkspaceId = telemetryContext.WorkspaceId,
FrontendLogEventId = Guid.NewGuid().ToString(),
Context = new Telemetry.Models.FrontendLogContext
{
TimestampMillis = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
},
Entry = new Telemetry.Models.FrontendLogEntry
{
SqlDriverLog = telemetryLog
}
};

TelemetrySession?.TelemetryClient?.Enqueue(frontendLog);
}
catch (Exception ex)
{
activity?.AddEvent(new System.Diagnostics.ActivityEvent("telemetry.emit.error",
tags: new System.Diagnostics.ActivityTagsCollection
{
{ "error.type", ex.GetType().Name },
{ "error.message", ex.Message }
}));
}
}
}

return result;
});
}

internal override IArrowArrayStream NewReader<T>(T statement, Schema schema, IResponse response, TGetResultSetMetadataResp? metadataResp = null)
{
bool isLz4Compressed = false;
Expand Down Expand Up @@ -533,6 +666,9 @@ protected override async Task HandleOpenSessionResponse(TOpenSessionResp? sessio
return;
}

// Store session response for later use (e.g., extracting workspace ID)
_openSessionResp = session;

var version = session.ServerProtocolVersion;

// Log server protocol version
Expand Down Expand Up @@ -651,15 +787,28 @@ private void InitializeTelemetry(Activity? activity = null)
true, // unauthed failure will be report separately
telemetryConfig);

// Extract workspace ID from org ID in the HTTP path (e.g., ?o=12345)
long workspaceId = 0;
string? orgId = PropertyHelper.ParseOrgIdFromProperties(Properties);
if (!string.IsNullOrEmpty(orgId) && long.TryParse(orgId, out long parsedOrgId))
{
workspaceId = parsedOrgId;
activity?.AddEvent(new ActivityEvent("telemetry.workspace_id.from_org_id",
tags: new ActivityTagsCollection { { "workspace_id", workspaceId } }));
}

// Create session-level telemetry context for V3 direct-object pipeline
TelemetrySession = new TelemetrySessionContext
{
SessionId = SessionHandle?.SessionId?.Guid != null
? new Guid(SessionHandle.SessionId.Guid).ToString()
: null,
WorkspaceId = workspaceId,

TelemetryClient = _telemetryClient,
SystemConfiguration = BuildSystemConfiguration(),
DriverConnectionParams = BuildDriverConnectionParams(true)
DriverConnectionParams = BuildDriverConnectionParams(true),
AuthType = DetermineAuthType()
};

activity?.AddEvent(new ActivityEvent("telemetry.initialization.success",
Expand All @@ -686,6 +835,7 @@ private void InitializeTelemetry(Activity? activity = null)
private Telemetry.Proto.DriverSystemConfiguration BuildSystemConfiguration()
{
var osVersion = System.Environment.OSVersion;
var processName = System.Diagnostics.Process.GetCurrentProcess().ProcessName;
return new Telemetry.Proto.DriverSystemConfiguration
{
DriverVersion = s_assemblyVersion,
Expand All @@ -695,9 +845,11 @@ private Telemetry.Proto.DriverSystemConfiguration BuildSystemConfiguration()
OsArch = System.Runtime.InteropServices.RuntimeInformation.OSArchitecture.ToString(),
RuntimeName = System.Runtime.InteropServices.RuntimeInformation.FrameworkDescription,
RuntimeVersion = System.Environment.Version.ToString(),
RuntimeVendor = "Microsoft",
LocaleName = System.Globalization.CultureInfo.CurrentCulture.Name,
CharSetEncoding = System.Text.Encoding.Default.WebName,
ProcessName = System.Diagnostics.Process.GetCurrentProcess().ProcessName
ProcessName = processName,
ClientAppName = processName
};
}

Expand Down Expand Up @@ -735,15 +887,80 @@ private Telemetry.Proto.DriverConnectionParameters BuildDriverConnectionParams(b
},
AuthMech = authMech,
AuthFlow = authFlow,
EnableArrow = true, // Always true for ADBC driver
RowsFetchedPerBlock = GetBatchSize(),
SocketTimeout = GetSocketTimeout(),
EnableDirectResults = _enableDirectResults,
EnableComplexDatatypeSupport = _useDescTableExtended,
AutoCommit = true, // ADBC always uses auto-commit (implicit commits)
};
}

/// <summary>
/// Gets the batch size from connection properties.
/// </summary>
/// <returns>The batch size value.</returns>
private int GetBatchSize()
{
const int DefaultBatchSize = 2000000; // DatabricksStatement.DatabricksBatchSizeDefault
if (Properties.TryGetValue(ApacheParameters.BatchSize, out string? batchSizeStr) &&
int.TryParse(batchSizeStr, out int batchSize))
{
return batchSize;
}
return DefaultBatchSize;
}

/// <summary>
/// Gets the socket timeout from connection properties.
/// </summary>
/// <returns>The socket timeout value in milliseconds.</returns>
private int GetSocketTimeout()
{
const int DefaultConnectTimeoutMs = 30000; // Default from HiveServer2
if (Properties.TryGetValue(SparkParameters.ConnectTimeoutMilliseconds, out string? timeoutStr) &&
int.TryParse(timeoutStr, out int timeout))
{
return timeout;
}
return DefaultConnectTimeoutMs;
}

/// <summary>
/// Determines the auth_type string based on connection properties.
/// Format: auth_type or auth_type-grant_type (for OAuth).
/// Mapping: PAT -> 'pat', OAuth -> 'oauth-{grant_type}', Other -> 'other'
/// </summary>
/// <returns>The auth_type string value.</returns>
private string DetermineAuthType()
{
// Format: auth_type or auth_type-grant_type (for OAuth)
Properties.TryGetValue(DatabricksParameters.OAuthGrantType, out string? grantType);

if (!string.IsNullOrEmpty(grantType))
{
// OAuth with grant type: oauth-{grant_type}
return $"oauth-{grantType}";
}

// Check for PAT (Personal Access Token)
Properties.TryGetValue(SparkParameters.Token, out string? token);
if (!string.IsNullOrEmpty(token))
{
return "pat";
}

// Default to 'other' for unknown or unspecified auth types
return "other";
}

// Since Databricks Namespace was introduced in newer versions, we fallback to USE SCHEMA to set default schema
// in case the server version is too old.
private async Task SetSchema(string schemaName)
{
using var statement = new DatabricksStatement(this);
statement.SqlQuery = $"USE {schemaName}";
statement.IsInternalCall = true; // Mark as internal driver operation
await statement.ExecuteUpdateAsync();
}

Expand Down
Loading
Loading