Skip to content
Draft
284 changes: 283 additions & 1 deletion csharp/doc/telemetry-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,17 @@ namespace AdbcDrivers.Databricks.Telemetry.TagDefinitions
Description = "Total polling latency")]
public const string PollLatencyMs = "poll.latency_ms";

// Chunk latency metrics (from CloudFetch download summary)
[TelemetryTag("chunk.initial_latency_ms",
ExportScope = TagExportScope.ExportDatabricks,
Description = "Latency of first chunk download in milliseconds")]
public const string ChunkInitialLatencyMs = "chunk.initial_latency_ms";

[TelemetryTag("chunk.slowest_latency_ms",
ExportScope = TagExportScope.ExportDatabricks,
Description = "Latency of slowest chunk download in milliseconds")]
public const string ChunkSlowestLatencyMs = "chunk.slowest_latency_ms";

// Sensitive tags - NOT exported to Databricks
[TelemetryTag("db.statement",
ExportScope = TagExportScope.ExportLocal,
Expand All @@ -1170,7 +1181,9 @@ namespace AdbcDrivers.Databricks.Telemetry.TagDefinitions
ResultBytesDownloaded,
ResultCompressionEnabled,
PollCount,
PollLatencyMs
PollLatencyMs,
ChunkInitialLatencyMs,
ChunkSlowestLatencyMs
};
}
}
Expand Down Expand Up @@ -1337,6 +1350,275 @@ graph LR

**Key Point**: No new instrumentation code! Just add tags to existing activities.

### 4.4 Activity Tag to Proto Schema Mapping

This section defines how Activity tags are mapped to the proto-generated `OssSqlDriverTelemetryLog` message structure. The `MetricsAggregator` is responsible for this conversion.

#### 4.4.1 Proto Schema Overview

The telemetry data is serialized using the proto schema defined in `Telemetry/Proto/sql_driver_telemetry.proto`. The main message is `OssSqlDriverTelemetryLog`:

```protobuf
message OssSqlDriverTelemetryLog {
string session_id = 1;
string sql_statement_id = 2;
DriverSystemConfiguration system_configuration = 3;
DriverConnectionParameters driver_connection_params = 4;
string auth_type = 5;
VolumeOperationEvent vol_operation = 6;
SqlExecutionEvent sql_operation = 7;
DriverErrorInfo error_info = 8;
int64 operation_latency_ms = 9;
}
```

#### 4.4.2 Common Field Mappings

These fields are mapped for all event types:

| Activity Property | Proto Field | Notes |
|-------------------|-------------|-------|
| `activity.Duration` | `operation_latency_ms` | Converted to milliseconds (int64) |
| Tag: `session.id` | `session_id` | Connection session identifier |
| Tag: `statement.id` | `sql_statement_id` | Statement execution identifier |

#### 4.4.3 ConnectionOpen Event Mapping

**Activity Operation Names**: `Connection.Open`, `Connection.OpenAsync`, `OpenAsync`, `OpenConnection`

| Activity Tag | Proto Location | Proto Field | Type/Notes |
|--------------|----------------|-------------|------------|
| `driver.version` | `system_configuration` | `driver_version` | string |
| `driver.os` | `system_configuration` | `os_name` | string |
| `driver.runtime` | `system_configuration` | `runtime_version` | string |
| _(hardcoded)_ | `system_configuration` | `driver_name` | "Databricks ADBC Driver" |
| _(hardcoded)_ | `system_configuration` | `runtime_name` | ".NET" |
| `feature.direct_results` | `driver_connection_params` | `enable_direct_results` | bool |
| `feature.arrow` | `driver_connection_params` | `enable_arrow` | bool |

**MetricsAggregator Implementation**:
```csharp
private static DriverSystemConfiguration? ExtractSystemConfiguration(Activity activity)
{
return new DriverSystemConfiguration
{
DriverName = "Databricks ADBC Driver",
DriverVersion = GetTagValue(activity, "driver.version") ?? string.Empty,
OsName = GetTagValue(activity, "driver.os") ?? string.Empty,
RuntimeName = ".NET",
RuntimeVersion = GetTagValue(activity, "driver.runtime") ?? string.Empty
};
}
```

#### 4.4.4 StatementExecution Event Mapping

**Activity Operation Names**: `Statement.Execute`, `Statement.ExecuteQuery`, `Statement.ExecuteUpdate`, or any activity with `statement.id` tag

| Activity Tag | Proto Location | Proto Field | Type/Transformation |
|--------------|----------------|-------------|---------------------|
| `result.format` | `sql_operation` | `execution_result` | See enum mapping below |
| `result.compression_enabled` | `sql_operation` | `is_compressed` | bool |
| `result.chunk_count` | `sql_operation.chunk_details` | `total_chunks_present` | int32 |
| `result.bytes_downloaded` | `sql_operation.chunk_details` | `sum_chunks_download_time_millis` | int64 (repurposed for bytes) |
| `chunk.initial_latency_ms` | `sql_operation.chunk_details` | `initial_chunk_latency_millis` | int64 (see note below) |
| `chunk.slowest_latency_ms` | `sql_operation.chunk_details` | `slowest_chunk_latency_millis` | int64 (see note below) |
| `poll.count` | `sql_operation.operation_detail` | `n_operation_status_calls` | int32 |

**Chunk Latency Collection** (CloudFetchDownloader):

The `chunk.initial_latency_ms` and `chunk.slowest_latency_ms` values are tracked in `CloudFetchDownloader` during the download process:

```csharp
// In CloudFetchDownloader - track latency for each chunk download
private long _initialChunkLatencyMs = -1;
private long _slowestChunkLatencyMs = 0;

// After each successful chunk download:
lock (_latencyLock)
{
long latencyMs = stopwatch.ElapsedMilliseconds;

// First chunk sets initial latency
if (_initialChunkLatencyMs < 0)
{
_initialChunkLatencyMs = latencyMs;
}

// Track max latency for slowest chunk
if (latencyMs > _slowestChunkLatencyMs)
{
_slowestChunkLatencyMs = latencyMs;
}
}

// Emitted in cloudfetch.download_summary Activity event:
activity?.AddEvent("cloudfetch.download_summary", [
// ... other metrics ...
new("initial_chunk_latency_ms", _initialChunkLatencyMs),
new("slowest_chunk_latency_ms", _slowestChunkLatencyMs),
]);
```

The `MetricsAggregator` processes these Activity events and aggregates them into the `StatementTelemetryContext` for the statement.

| Activity Tag | Proto Location | Proto Field | Type/Transformation |
|--------------|----------------|-------------|---------------------|
| `poll.latency_ms` | `sql_operation.operation_detail` | `operation_status_latency_millis` | int64 |
| `statement.type` | `sql_operation` | `statement_type` | See enum mapping below |

**ExecutionResultFormat Enum Mapping**:

| Tag Value | Proto Enum Value |
|-----------|------------------|
| `"cloudfetch"` | `EXECUTION_RESULT_EXTERNAL_LINKS` |
| `"external_links"` | `EXECUTION_RESULT_EXTERNAL_LINKS` |
| `"arrow"` | `EXECUTION_RESULT_INLINE_ARROW` |
| `"inline_arrow"` | `EXECUTION_RESULT_INLINE_ARROW` |
| `"json"` | `EXECUTION_RESULT_INLINE_JSON` |
| `"inline_json"` | `EXECUTION_RESULT_INLINE_JSON` |
| _(other)_ | `EXECUTION_RESULT_FORMAT_UNSPECIFIED` |

**StatementType Enum Mapping**:

| Tag Value | Proto Enum Value |
|-----------|------------------|
| `"query"` | `STATEMENT_QUERY` |
| `"sql"` | `STATEMENT_SQL` |
| `"update"` | `STATEMENT_UPDATE` |
| `"metadata"` | `STATEMENT_METADATA` |
| _(other)_ | `STATEMENT_TYPE_UNSPECIFIED` |

**MetricsAggregator Implementation**:
```csharp
private OssSqlDriverTelemetryLog CreateTelemetryEvent(StatementTelemetryContext context)
{
var telemetryLog = new OssSqlDriverTelemetryLog
{
SessionId = context.SessionId ?? string.Empty,
SqlStatementId = context.StatementId,
OperationLatencyMs = context.TotalLatencyMs,
SqlOperation = new SqlExecutionEvent
{
IsCompressed = context.CompressionEnabled ?? false,
ChunkDetails = new ChunkDetails
{
TotalChunksPresent = context.ChunkCount ?? 0,
SumChunksDownloadTimeMillis = context.BytesDownloaded ?? 0,
InitialChunkLatencyMillis = context.InitialChunkLatencyMs ?? 0,
SlowestChunkLatencyMillis = context.SlowestChunkLatencyMs ?? 0
},
OperationDetail = new OperationDetail
{
NOperationStatusCalls = context.PollCount ?? 0,
OperationStatusLatencyMillis = context.PollLatencyMs ?? 0
}
}
};

// Map result.format tag to ExecutionResultFormat enum
if (!string.IsNullOrEmpty(context.ResultFormat))
{
telemetryLog.SqlOperation.ExecutionResult = context.ResultFormat.ToLowerInvariant() switch
{
"cloudfetch" or "external_links" => ExecutionResultFormat.ExecutionResultExternalLinks,
"arrow" or "inline_arrow" => ExecutionResultFormat.ExecutionResultInlineArrow,
"json" or "inline_json" => ExecutionResultFormat.ExecutionResultInlineJson,
_ => ExecutionResultFormat.Unspecified
};
}

return telemetryLog;
}
```

#### 4.4.5 Error Event Mapping

**Trigger**: Any activity with `error.type` tag present, or exceptions recorded via `RecordException()`

| Activity Tag / Property | Proto Location | Proto Field | Notes |
|-------------------------|----------------|-------------|-------|
| `error.type` | `error_info` | `error_name` | Exception type name |
| `Exception.Message` | `error_info` | `stack_trace` | Truncated to 200 chars |
| Tag: `session.id` | _(root)_ | `session_id` | Connection identifier |
| Tag: `statement.id` | _(root)_ | `sql_statement_id` | Statement identifier |

**Important Proto Schema Limitation**:
The proto `DriverErrorInfo` message only supports two fields:
- `error_name`: Maps to exception type (e.g., "HttpExceptionWithStatusCode")
- `stack_trace`: Used for truncated error message (proto field `error_message` is pending LPP review)

The following Activity tags are **NOT** mapped to proto (proto schema doesn't support them):
- `error.message` - No direct proto field (use `stack_trace` for truncated message)
- `error.http_status` - No `http_status_code` field in proto
- `error.code` - No generic error code field in proto

**MetricsAggregator Implementation**:
```csharp
private OssSqlDriverTelemetryLog CreateErrorTelemetryEvent(
string? sessionId,
string statementId,
Exception exception)
{
return new OssSqlDriverTelemetryLog
{
SessionId = sessionId ?? string.Empty,
SqlStatementId = statementId,
ErrorInfo = new DriverErrorInfo
{
ErrorName = exception.GetType().Name,
StackTrace = TruncateErrorMessage(exception.Message) // Max 200 chars
}
};
}
```

#### 4.4.6 Nested Proto Message Reference

**ChunkDetails** (part of SqlExecutionEvent):
```protobuf
message ChunkDetails {
int64 initial_chunk_latency_millis = 1; // ← Maps from chunk.initial_latency_ms
int64 slowest_chunk_latency_millis = 2; // ← Maps from chunk.slowest_latency_ms
int32 total_chunks_present = 3; // ← Maps from result.chunk_count
int32 total_chunks_iterated = 4;
int64 sum_chunks_download_time_millis = 5; // ← Maps from result.bytes_downloaded (repurposed)
}
```

**OperationDetail** (part of SqlExecutionEvent):
```protobuf
message OperationDetail {
int32 n_operation_status_calls = 1; // ← Maps from poll.count
int64 operation_status_latency_millis = 2; // ← Maps from poll.latency_ms
OperationType operation_type = 3;
bool is_internal_call = 4;
}
```

**ResultLatency** (part of SqlExecutionEvent, currently not mapped):
```protobuf
message ResultLatency {
int64 result_set_ready_latency_millis = 1;
int64 result_set_consumption_latency_millis = 2;
}
```

#### 4.4.7 Future Tag Extensions

When adding new Activity tags, ensure they map to existing proto fields. If a proto field doesn't exist, the tag data will be silently dropped for Databricks export.

**Proto Fields Not Currently Mapped** (available for future use):
- `OssSqlDriverTelemetryLog.auth_type` - Could map from `auth.type` tag
- `DriverSystemConfiguration.runtime_vendor` - Could map from `driver.runtime_vendor` tag
- `DriverSystemConfiguration.os_version` - Could map from `driver.os_version` tag
- `DriverSystemConfiguration.os_arch` - Could map from `driver.os_arch` tag
- `DriverSystemConfiguration.client_app_name` - Could map from `client.app_name` tag
- `DriverConnectionParameters.*` - Many connection parameters available
- `SqlExecutionEvent.retry_count` - Could map from `retry.count` tag
- `ChunkDetails.total_chunks_iterated` - Could map from `chunk.iterated_count` tag
- `ResultLatency.*` - Could map from `result.ready_latency_ms` and `result.consumption_latency_ms` tags

---

## 5. Export Mechanism
Expand Down
Loading
Loading