diff --git a/csharp/Apache.Arrow.Adbc.sln b/csharp/Apache.Arrow.Adbc.sln index e79981db6b..a0ab271afc 100644 --- a/csharp/Apache.Arrow.Adbc.sln +++ b/csharp/Apache.Arrow.Adbc.sln @@ -58,6 +58,14 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Exporters", "Exporters", "{ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters", "test\Telemetry\Traces\Exporters\Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.csproj", "{1558BC4B-6E76-434B-8877-6C49B1460544}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Listeners", "Listeners", "{53C45FD3-7277-49FA-AEEB-DF8F2386ECAA}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Listeners", "Listeners", "{1C530561-1008-4F39-B437-15B2FD59EAC9}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners", "test\Telemetry\Traces\Listeners\Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.csproj", "{9CE9106B-ACBB-54C1-DE57-370E5CF09363}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Adbc.Telemetry.Traces.Listeners", "src\Telemetry\Traces\Listeners\Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj", "{4D5ADA1A-2DEE-5860-2351-221090CF4442}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -136,6 +144,14 @@ Global {1558BC4B-6E76-434B-8877-6C49B1460544}.Debug|Any CPU.Build.0 = Debug|Any CPU {1558BC4B-6E76-434B-8877-6C49B1460544}.Release|Any CPU.ActiveCfg = Release|Any CPU {1558BC4B-6E76-434B-8877-6C49B1460544}.Release|Any CPU.Build.0 = Release|Any CPU + {9CE9106B-ACBB-54C1-DE57-370E5CF09363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9CE9106B-ACBB-54C1-DE57-370E5CF09363}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9CE9106B-ACBB-54C1-DE57-370E5CF09363}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9CE9106B-ACBB-54C1-DE57-370E5CF09363}.Release|Any CPU.Build.0 = Release|Any CPU + {4D5ADA1A-2DEE-5860-2351-221090CF4442}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4D5ADA1A-2DEE-5860-2351-221090CF4442}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4D5ADA1A-2DEE-5860-2351-221090CF4442}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4D5ADA1A-2DEE-5860-2351-221090CF4442}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -164,6 +180,10 @@ Global {B74532A7-8A78-4AD9-9B2E-584765491E48} = {9FE39661-2A39-4E9F-A5F2-11FB0D54CB42} {43910445-FEC4-4AEC-A698-6A48327600C3} = {B74532A7-8A78-4AD9-9B2E-584765491E48} {1558BC4B-6E76-434B-8877-6C49B1460544} = {43910445-FEC4-4AEC-A698-6A48327600C3} + {53C45FD3-7277-49FA-AEEB-DF8F2386ECAA} = {B74532A7-8A78-4AD9-9B2E-584765491E48} + {1C530561-1008-4F39-B437-15B2FD59EAC9} = {22EF23A3-1566-446F-B696-9323F3B6F56C} + {9CE9106B-ACBB-54C1-DE57-370E5CF09363} = {53C45FD3-7277-49FA-AEEB-DF8F2386ECAA} + {4D5ADA1A-2DEE-5860-2351-221090CF4442} = {1C530561-1008-4F39-B437-15B2FD59EAC9} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {4795CF16-0FDB-4BE0-9768-5CF31564DC03} diff --git a/csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj b/csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj index f4c11907d2..8e68c63f3a 100644 --- a/csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj +++ b/csharp/src/Drivers/Apache/Apache.Arrow.Adbc.Drivers.Apache.csproj @@ -14,6 +14,7 @@ + diff --git a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs index 766bec961b..193cec7227 100644 --- a/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs +++ b/csharp/src/Drivers/Apache/Hive2/HiveServer2Connection.cs @@ -27,6 +27,8 @@ using System.Threading.Tasks; using Apache.Arrow.Adbc.Drivers.Apache.Thrift; using Apache.Arrow.Adbc.Extensions; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener; using Apache.Arrow.Adbc.Tracing; using Apache.Arrow.Ipc; using Apache.Arrow.Types; @@ -49,6 +51,9 @@ internal abstract class HiveServer2Connection : TracingConnection private readonly Lazy _vendorVersion; private readonly Lazy _vendorName; private bool _isDisposed; + // Note: this needs to be set before the constructor runs + private readonly string _traceInstanceId = Guid.NewGuid().ToString("N"); + private readonly FileActivityListener? _fileActivityListener; readonly AdbcInfoCode[] infoSupportedCodes = [ AdbcInfoCode.DriverName, @@ -278,6 +283,8 @@ internal HiveServer2Connection(IReadOnlyDictionary properties) { Properties = properties; + TryInitTracerProvider(out _fileActivityListener); + // Note: "LazyThreadSafetyMode.PublicationOnly" is thread-safe initialization where // the first successful thread sets the value. If an exception is thrown, initialization // will retry until it successfully returns a value without an exception. @@ -294,6 +301,31 @@ internal HiveServer2Connection(IReadOnlyDictionary properties) } } + private bool TryInitTracerProvider(out FileActivityListener? fileActivityListener) + { + Properties.TryGetValue(ListenersOptions.Exporter, out string? exporterOption); + // This listener will only listen for activity from this specific connection instance. + bool shouldListenTo(ActivitySource source) => source.Tags?.Any(t => ReferenceEquals(t.Key, _traceInstanceId)) == true; + return FileActivityListener.TryActivateFileListener(AssemblyName, exporterOption, out fileActivityListener, shouldListenTo: shouldListenTo); + } + + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + IEnumerable>? tags = base.GetActivitySourceTags(properties); + tags ??= []; + tags = tags.Concat([new(_traceInstanceId, null)]); + return tags; + } + + /// + /// Conditional used to determines if it is safe to trace + /// + /// + /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource). + /// + /// + internal bool IsSafeToTrace => _fileActivityListener != null; + internal TCLIService.IAsync Client { get { return _client ?? throw new InvalidOperationException("connection not open"); } @@ -732,6 +764,7 @@ protected override void Dispose(bool disposing) if (!_isDisposed && disposing) { DisposeClient(); + _fileActivityListener?.Dispose(); _isDisposed = true; } base.Dispose(disposing); @@ -1540,7 +1573,7 @@ public override IArrowArrayStream GetInfo(IReadOnlyList codes) nullCount++; break; } - ActivityExtensions.AddTag(activity, tagKey, tagValue); + Tracing.ActivityExtensions.AddTag(activity, tagKey, tagValue); } StructType entryType = new StructType( diff --git a/csharp/src/Drivers/Apache/Hive2/README.md b/csharp/src/Drivers/Apache/Hive2/README.md index cde81ecd69..9404cdf88b 100644 --- a/csharp/src/Drivers/Apache/Hive2/README.md +++ b/csharp/src/Drivers/Apache/Hive2/README.md @@ -149,3 +149,34 @@ The Collector can be configure to receive trace messages from the driver and exp Ensure to set the [environment variable](https://opentelemetry.io/docs/specs/otel/protocol/exporter/) `OTEL_EXPORTER_OTLP_INSECURE` to `true`, in this scenario. Ensure to follow [Collector configuration best practices](https://opentelemetry.io/docs/security/config-best-practices/). + +## Tracing + +### Tracing Exporters + +To enable tracing messages to be observed, a tracing exporter needs to be activated. +Use either the environment variable `OTEL_TRACES_EXPORTER` or the parameter `adbc.traces.exporter` to select one of the +supported exporters. The parameter has precedence over the environment variable. The parameter must be set before +the connection is initialized. + +The following exporters are supported: + +| Exporter | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | + +#### File Exporter (adbcfile) + +Rotating trace files are written to a folder. The file names are created with the following pattern: +`apache.arrow.adbc.drivers.bigquery--.log`. + +The folder used depends on the platform. + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. diff --git a/csharp/src/Drivers/Apache/Impala/README.md b/csharp/src/Drivers/Apache/Impala/README.md index 6313c83b33..f2567f56f1 100644 --- a/csharp/src/Drivers/Apache/Impala/README.md +++ b/csharp/src/Drivers/Apache/Impala/README.md @@ -139,3 +139,34 @@ The Collector can be configure to receive trace messages from the driver and exp Ensure to set the [environment variable](https://opentelemetry.io/docs/specs/otel/protocol/exporter/) `OTEL_EXPORTER_OTLP_INSECURE` to `true`, in this scenario. Ensure to follow [Collector configuration best practices](https://opentelemetry.io/docs/security/config-best-practices/). + +## Tracing + +### Tracing Exporters + +To enable tracing messages to be observed, a tracing exporter needs to be activated. +Use either the environment variable `OTEL_TRACES_EXPORTER` or the parameter `adbc.traces.exporter` to select one of the +supported exporters. The parameter has precedence over the environment variable. The parameter must be set before +the connection is initialized. + +The following exporters are supported: + +| Exporter | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | + +#### File Exporter (adbcfile) + +Rotating trace files are written to a folder. The file names are created with the following pattern: +`apache.arrow.adbc.drivers.bigquery--.log`. + +The folder used depends on the platform. + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. diff --git a/csharp/src/Drivers/Apache/Spark/README.md b/csharp/src/Drivers/Apache/Spark/README.md index 6c44c85238..bb38720244 100644 --- a/csharp/src/Drivers/Apache/Spark/README.md +++ b/csharp/src/Drivers/Apache/Spark/README.md @@ -149,3 +149,34 @@ The Collector can be configure to receive trace messages from the driver and exp Ensure to set the [environment variable](https://opentelemetry.io/docs/specs/otel/protocol/exporter/) `OTEL_EXPORTER_OTLP_INSECURE` to `true`, in this scenario. Ensure to follow [Collector configuration best practices](https://opentelemetry.io/docs/security/config-best-practices/). + +## Tracing + +### Tracing Exporters + +To enable tracing messages to be observed, a tracing exporter needs to be activated. +Use either the environment variable `OTEL_TRACES_EXPORTER` or the parameter `adbc.traces.exporter` to select one of the +supported exporters. The parameter has precedence over the environment variable. The parameter must be set before +the connection is initialized. + +The following exporters are supported: + +| Exporter | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | + +#### File Exporter (adbcfile) + +Rotating trace files are written to a folder. The file names are created with the following pattern: +`apache.arrow.adbc.drivers.bigquery--.log`. + +The folder used depends on the platform. + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. diff --git a/csharp/src/Drivers/BigQuery/Apache.Arrow.Adbc.Drivers.BigQuery.csproj b/csharp/src/Drivers/BigQuery/Apache.Arrow.Adbc.Drivers.BigQuery.csproj index b20d938f3e..16316d3010 100644 --- a/csharp/src/Drivers/BigQuery/Apache.Arrow.Adbc.Drivers.BigQuery.csproj +++ b/csharp/src/Drivers/BigQuery/Apache.Arrow.Adbc.Drivers.BigQuery.csproj @@ -13,6 +13,7 @@ + diff --git a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs index 9855e76be1..ffb562d789 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryConnection.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryConnection.cs @@ -25,6 +25,8 @@ using System.Text.RegularExpressions; using System.Threading.Tasks; using Apache.Arrow.Adbc.Extensions; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener; using Apache.Arrow.Adbc.Tracing; using Apache.Arrow.Ipc; using Apache.Arrow.Types; @@ -45,6 +47,9 @@ public class BigQueryConnection : TracingConnection, ITokenProtectedResource bool includePublicProjectIds = false; const string infoDriverName = "ADBC BigQuery Driver"; const string infoVendorName = "BigQuery"; + // Note: this needs to be set before the constructor runs + private readonly string _traceInstanceId = Guid.NewGuid().ToString("N"); + private readonly FileActivityListener? _fileActivityListener; private readonly string infoDriverArrowVersion = BigQueryUtils.GetAssemblyVersion(typeof(IArrowArray)); @@ -66,6 +71,8 @@ public BigQueryConnection(IReadOnlyDictionary properties) : base this.properties = properties.ToDictionary(k => k.Key, v => v.Value); } + TryInitTracerProvider(out _fileActivityListener); + // add the default value for now and set to true until C# has a BigDecimal this.properties[BigQueryParameters.LargeDecimalsAsString] = BigQueryConstants.TreatLargeDecimalAsString; this.httpClient = new HttpClient(); @@ -85,6 +92,31 @@ public BigQueryConnection(IReadOnlyDictionary properties) : base } } + private bool TryInitTracerProvider(out FileActivityListener? fileActivityListener) + { + properties.TryGetValue(ListenersOptions.Exporter, out string? exporterOption); + // This listener will only listen for activity from this specific connection instance. + bool shouldListenTo(ActivitySource source) => source.Tags?.Any(t => ReferenceEquals(t.Key, _traceInstanceId)) == true; + return FileActivityListener.TryActivateFileListener(AssemblyName, exporterOption, out fileActivityListener, shouldListenTo: shouldListenTo); + } + + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + IEnumerable>? tags = base.GetActivitySourceTags(properties); + tags ??= []; + tags = tags.Concat([new(_traceInstanceId, null)]); + return tags; + } + + /// + /// Conditional used to determines if it is safe to trace + /// + /// + /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource). + /// + /// + internal bool IsSafeToTrace => _fileActivityListener != null; + /// /// The function to call when updating the token. /// @@ -470,7 +502,7 @@ internal void UpdateClientToken() return this.TraceActivity(activity => { - activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, BigQueryUtils.IsSafeToTrace()); + activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, sql, IsSafeToTrace); Func> func = () => Client.ExecuteQueryAsync(sql, parameters ?? Enumerable.Empty(), queryOptions, resultsOptions); BigQueryResults? result = ExecuteWithRetriesAsync(func, activity).GetAwaiter().GetResult(); @@ -1273,6 +1305,7 @@ public override void Dispose() Client?.Dispose(); Client = null; this.httpClient?.Dispose(); + this._fileActivityListener?.Dispose(); } private static Regex sanitizedInputRegex = new Regex("^[a-zA-Z0-9_-]+"); diff --git a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs index e8b89f18ab..eabdcc4da8 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryStatement.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryStatement.cs @@ -91,7 +91,7 @@ private async Task ExecuteQueryInternalAsync() { QueryOptions queryOptions = ValidateOptions(activity); - activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, BigQueryUtils.IsSafeToTrace()); + activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace); BigQueryJob job = await Client.CreateQueryJobAsync(SqlQuery, null, queryOptions); @@ -216,7 +216,7 @@ private async Task> GetArrowReaders( ReadSession rrs = await bigQueryReadClient.CreateReadSessionAsync("projects/" + projectId, rs, maxStreamCount); var readers = rrs.Streams - .Select(s => ReadChunk(bigQueryReadClient, s.Name, activity)) + .Select(s => ReadChunk(bigQueryReadClient, s.Name, activity, this.bigQueryConnection.IsSafeToTrace)) .Where(chunk => chunk != null) .Cast(); @@ -242,7 +242,7 @@ private async Task ExecuteUpdateInternalAsync() activity?.AddBigQueryParameterTag(BigQueryParameters.GetQueryResultsOptionsTimeout, seconds); } - activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, BigQueryUtils.IsSafeToTrace()); + activity?.AddConditionalTag(SemanticConventions.Db.Query.Text, SqlQuery, this.bigQueryConnection.IsSafeToTrace); // Cannot set destination table in jobs with DDL statements, otherwise an error will be prompted Func> func = () => Client.ExecuteQueryAsync(SqlQuery, null, null, getQueryResultsOptions); @@ -340,11 +340,11 @@ private IArrowType GetType(TableFieldSchema field, IArrowType type) return type; } - private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity) + private static IArrowReader? ReadChunk(BigQueryReadClient client, string streamName, Activity? activity, bool isSafeToTrace) { // Ideally we wouldn't need to indirect through a stream, but the necessary APIs in Arrow // are internal. (TODO: consider changing Arrow). - activity?.AddConditionalBigQueryTag("read_stream", streamName, BigQueryUtils.IsSafeToTrace()); + activity?.AddConditionalBigQueryTag("read_stream", streamName, isSafeToTrace); BigQueryReadClient.ReadRowsStream readRowsStream = client.ReadRows(new ReadRowsRequest { ReadStream = streamName }); IAsyncEnumerator enumerator = readRowsStream.GetResponseStream().GetAsyncEnumerator(); diff --git a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs index b8fca804f0..956486d2a4 100644 --- a/csharp/src/Drivers/BigQuery/BigQueryUtils.cs +++ b/csharp/src/Drivers/BigQuery/BigQueryUtils.cs @@ -42,18 +42,5 @@ public static bool TokenRequiresUpdate(Exception ex) internal static string GetAssemblyName(Type type) => type.Assembly.GetName().Name!; internal static string GetAssemblyVersion(Type type) => FileVersionInfo.GetVersionInfo(type.Assembly.Location).ProductVersion ?? string.Empty; - - /// - /// Conditional used to determines if it is safe to trace - /// - /// - /// It is safe to write to some output types (ie, files) but not others (ie, a shared resource). - /// - /// - internal static bool IsSafeToTrace() - { - // TODO: Add logic to determine if a file writer is listening - return false; - } } } diff --git a/csharp/src/Drivers/BigQuery/readme.md b/csharp/src/Drivers/BigQuery/readme.md index 96605a9c29..1d98d055ea 100644 --- a/csharp/src/Drivers/BigQuery/readme.md +++ b/csharp/src/Drivers/BigQuery/readme.md @@ -204,3 +204,34 @@ Some environments may also require: - [Running jobs programmatically | BigQuery | Google Cloud](https://cloud.google.com/bigquery/docs/running-jobs) - [Create datasets | BigQuery | Google Cloud](https://cloud.google.com/bigquery/docs/datasets#required_permissions) - [Use the BigQuery Storage Read API to read table data | Google Cloud](https://cloud.google.com/bigquery/docs/reference/storage/#permissions) + +## Tracing + +### Tracing Exporters + +To enable tracing messages to be observed, a tracing exporter needs to be activated. +Use either the environment variable `OTEL_TRACES_EXPORTER` or the parameter `adbc.traces.exporter` to select one of the +supported exporters. The parameter has precedence over the environment variable. The parameter must be set before +the connection is initialized. + +The following exporters are supported: + +| Exporter | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | + +#### File Exporter (adbcfile) + +Rotating trace files are written to a folder. The file names are created with the following pattern: +`apache.arrow.adbc.drivers.bigquery--.log`. + +The folder used depends on the platform. + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. diff --git a/csharp/src/Drivers/Databricks/readme.md b/csharp/src/Drivers/Databricks/readme.md index 49bf2d9d5d..3250ef8e4e 100644 --- a/csharp/src/Drivers/Databricks/readme.md +++ b/csharp/src/Drivers/Databricks/readme.md @@ -178,3 +178,34 @@ The following table depicts how the Databricks ADBC driver converts a Databricks | UNION | String | string | | USER_DEFINED | String | string | | VARCHAR | String | string | + +## Tracing + +### Tracing Exporters + +To enable tracing messages to be observed, a tracing exporter needs to be activated. +Use either the environment variable `OTEL_TRACES_EXPORTER` or the parameter `adbc.traces.exporter` to select one of the +supported exporters. The parameter has precedence over the environment variable. The parameter must be set before +the connection is initialized. + +The following exporters are supported: + +| Exporter | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | + +#### File Exporter (adbcfile) + +Rotating trace files are written to a folder. The file names are created with the following pattern: +`apache.arrow.adbc.drivers.bigquery--.log`. + +The folder used depends on the platform. + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. diff --git a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj index 4dec891d12..51deb03a62 100644 --- a/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj +++ b/csharp/src/Telemetry/Traces/Exporters/Apache.Arrow.Adbc.Telemetry.Traces.Exporters.csproj @@ -21,6 +21,7 @@ + diff --git a/csharp/src/Telemetry/Traces/Exporters/ExportersBuilder.cs b/csharp/src/Telemetry/Traces/Exporters/ExportersBuilder.cs index 1c232448c2..3544cf0e84 100644 --- a/csharp/src/Telemetry/Traces/Exporters/ExportersBuilder.cs +++ b/csharp/src/Telemetry/Traces/Exporters/ExportersBuilder.cs @@ -94,32 +94,48 @@ public static Builder Build(string sourceName, string? sourceVersion = default, out string? exporterName, string environmentName = ExportersOptions.Environment.Exporter) { - TracerProvider? tracerProvider = null; - exporterName = null; - - if (string.IsNullOrWhiteSpace(exporterOption)) + if (TryActivate(exporterOption, out exporterName, out TracerProvider? tracerProvider, environmentName)) { - // Fall back to check the environment variable - exporterOption = Environment.GetEnvironmentVariable(environmentName); + return tracerProvider; } - if (string.IsNullOrWhiteSpace(exporterOption)) + if (!string.IsNullOrEmpty(exporterName) && exporterName != ExportersOptions.Exporters.None) { - // Neither option or environment variable is set - no tracer provider will be activated. - return null; + // Requested option has not been added via the builder + throw AdbcException.NotImplemented($"Exporter option '{exporterName}' is not implemented."); } + return null; + } + + /// + /// Tries to activate an exporter based on the dictionary of factories. + /// + /// The value (name) of the exporter option, typically passed as option . + /// The actual exporter name when successfully activated. + /// A non-null when successfully activated. Returns null if not successful. Note: this object must be explicitly disposed when no longer necessary. + /// The (optional) name of the environment variable to test for the exporter name. Default: + /// Returns true if the exporter was successfully activated. Returns false, otherwise. + public bool TryActivate( + string? exporterOption, + out string? exporterName, + out TracerProvider? tracerProvider, + string environmentName = ExportersOptions.Environment.Exporter) + { + tracerProvider = null; + exporterName = null; - if (!_tracerProviderFactories.TryGetValue(exporterOption!, out Func? factory)) + if (!TryGetExporterName(exporterOption, environmentName, out exporterName) + || !_tracerProviderFactories.TryGetValue(exporterName!, out Func? factory)) { - // Requested option has not been added via the builder - throw AdbcException.NotImplemented($"Exporter option '{exporterOption}' is not implemented."); + return false; } tracerProvider = factory.Invoke(_sourceName, _sourceVersion); - if (tracerProvider != null) + if (tracerProvider == null) { - exporterName = exporterOption; + return false; } - return tracerProvider; + + return true; } public static TracerProvider NewAdbcFileTracerProvider(string sourceName, string? sourceVersion) => @@ -155,6 +171,23 @@ public static TracerProvider NewOtlpTracerProvider(string sourceName, string? so public static TracerProvider? NewNoopTracerProvider(string sourceName, string? sourceVersion) => null; + private static bool TryGetExporterName(string? exporterOption, string environmentName, out string? exporterName) + { + if (string.IsNullOrWhiteSpace(exporterOption)) + { + // Fall back to check the environment variable + exporterOption = Environment.GetEnvironmentVariable(environmentName); + } + if (string.IsNullOrWhiteSpace(exporterOption)) + { + // Neither option or environment variable is set - no tracer provider will be activated. + exporterName = null; + return false; + } + exporterName = exporterOption!; + return true; + } + public class Builder { private readonly string _sourceName; diff --git a/csharp/src/Telemetry/Traces/Exporters/ExportersOptions.cs b/csharp/src/Telemetry/Traces/Exporters/ExportersOptions.cs index 8cd631ac24..40c2199cda 100644 --- a/csharp/src/Telemetry/Traces/Exporters/ExportersOptions.cs +++ b/csharp/src/Telemetry/Traces/Exporters/ExportersOptions.cs @@ -15,23 +15,25 @@ * limitations under the License. */ +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; + namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters { public class ExportersOptions { - public const string Exporter = "adbc.traces.exporter"; + public const string Exporter = ListenersOptions.Exporter; public static class Environment { - public const string Exporter = "OTEL_TRACES_EXPORTER"; + public const string Exporter = ListenersOptions.Environment.Exporter; } public static class Exporters { - public const string None = "none"; + public const string None = ListenersOptions.Exporters.None; public const string Otlp = "otlp"; public const string Console = "console"; - public const string AdbcFile = "adbcfile"; + public const string AdbcFile = ListenersOptions.Exporters.AdbcFile; } } } diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs index a1251a8d7b..08d4336ab1 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs +++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporter.cs @@ -24,6 +24,7 @@ using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener; using OpenTelemetry; namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs index b35ca43f33..2f5a495ebc 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs +++ b/csharp/src/Telemetry/Traces/Exporters/FileExporter/FileExporterExtensions.cs @@ -129,7 +129,7 @@ public static OpenTelemetry.Trace.TracerProviderBuilder AddAdbcFileExporter( if (FileExporter.TryCreate(fileBaseName, traceLocation, maxTraceFileSizeKb.Value, maxTraceFiles.Value, out FileExporter? fileExporter)) { // Only add a new processor if there isn't already one listening for the source/location. - return builder.AddProcessor(_ => new BatchActivityExportProcessor(fileExporter!)); + return builder.AddProcessor(_ => new SimpleActivityExportProcessor(fileExporter!)); } return builder; } diff --git a/csharp/src/Telemetry/Traces/Exporters/readme.md b/csharp/src/Telemetry/Traces/Exporters/readme.md index 8b743c7e71..6bd2e9444c 100644 --- a/csharp/src/Telemetry/Traces/Exporters/readme.md +++ b/csharp/src/Telemetry/Traces/Exporters/readme.md @@ -33,7 +33,7 @@ The default folder used is: | macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | | Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | -By default, up to 100 files of maximum size 1024 KB are written to +By default, up to 999 files of maximum size 1024 KB are written to the trace folder. ## ExportersBuilder @@ -49,6 +49,6 @@ The following exporters are supported: | Exporter | Description | | --- | --- | | `otlp` | Exports traces to an OpenTelemetry Collector or directly to an Open Telemetry Line Protocol (OTLP) endpoint. | -| `file` | Exports traces to rotating files in a folder. | +| `adbcfile` | Exports traces to rotating files in a folder. | | `console` | Exports traces to the console output. | | `none` | Disables trace exporting. | diff --git a/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj b/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj new file mode 100644 index 0000000000..a688ba3fab --- /dev/null +++ b/csharp/src/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Telemetry.Traces.Listeners.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0;net8.0 + enable + + + + + + + + + + + + + diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs new file mode 100644 index 0000000000..20fff58408 --- /dev/null +++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/ActivityProcessor.cs @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Diagnostics; +using System.IO; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener +{ + internal sealed class ActivityProcessor : IDisposable + { + private static readonly byte[] s_newLine = Encoding.UTF8.GetBytes(Environment.NewLine); + private Task? _processingTask; + private readonly Channel _channel; + private readonly Func _streamWriterFunc; + private CancellationTokenSource? _cancellationTokenSource; + + public ActivityProcessor(Func streamWriterFunc) + { + _channel = Channel.CreateUnbounded(); + _streamWriterFunc = streamWriterFunc; + } + + public bool TryWrite(Activity activity) => _channel.Writer.TryWrite(activity); + + public async Task TryStartAsync() + { + if (_processingTask != null) + { + if (!_processingTask.IsCompleted) + { + // Already running + return false; + } + await StopAsync().ConfigureAwait(false); + } + _cancellationTokenSource = new CancellationTokenSource(); + _processingTask = Task.Run(() => ProcessActivitiesAsync(_cancellationTokenSource.Token)); + return true; + } + + public async Task StopAsync(int timeout = 5000) + { + // Try to gracefully stop to allow processing of all queued items. + _channel.Writer.TryComplete(); + if (_processingTask != null) + { + if (await Task.WhenAny(_processingTask, Task.Delay(timeout)).ConfigureAwait(false) != _processingTask) + { + // Timeout - cancel + _cancellationTokenSource?.Cancel(); + // Assume it will NOT throw any exceptions + await _processingTask.ConfigureAwait(false); + } + _processingTask.Dispose(); + } + _processingTask = null; + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + } + + private async Task ProcessActivitiesAsync(CancellationToken cancellationToken) + { + try + { + using MemoryStream stream = new(); + await foreach (Activity activity in _channel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false)) + { + if (cancellationToken.IsCancellationRequested) break; + + stream.SetLength(0); + SerializableActivity serializableActivity = new(activity); + await JsonSerializer.SerializeAsync( + stream, + serializableActivity, cancellationToken: cancellationToken).ConfigureAwait(false); + stream.Write(s_newLine, 0, s_newLine.Length); + stream.Position = 0; + + await _streamWriterFunc(stream, cancellationToken).ConfigureAwait(false); + } + } + catch (OperationCanceledException ex) + { + // Expected when cancellationToken is cancelled. + Trace.TraceError(ex.ToString()); + } + catch (Exception ex) + { + // Since this will be called on an independent thread, we need to avoid uncaught exceptions. + Trace.TraceError(ex.ToString()); + } + } + + public void Dispose() + { + StopAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + } +} diff --git a/csharp/src/Telemetry/Traces/Listeners/FileListener/FileActivityListener.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/FileActivityListener.cs new file mode 100644 index 0000000000..c170bf6362 --- /dev/null +++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/FileActivityListener.cs @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System; +using System.Diagnostics; +using System.IO; + +namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener +{ + public sealed class FileActivityListener : IDisposable + { + public const long MaxFileSizeKbDefault = 1024; + public const int MaxTraceFilesDefault = 999; + internal const string ApacheArrowAdbcNamespace = "Apache.Arrow.Adbc"; + private const string TracesFolderName = "Traces"; + + private readonly ActivityListener _listener; + private readonly TracingFile _tracingFile; + private readonly ActivityProcessor _activityProcessor; + + public static bool TryActivateFileListener( + string activitySourceName, + string? exporterOption, + out FileActivityListener? listener, + Func? shouldListenTo = default, + string environmentName = ListenersOptions.Environment.Exporter, + string? tracesLocation = null, + long maxTraceFileSizeKb = MaxFileSizeKbDefault, + int maxTraceFiles = MaxTraceFilesDefault) + { + listener = null; + if (string.IsNullOrWhiteSpace(exporterOption)) + { + // Fall back to check the environment variable + exporterOption = Environment.GetEnvironmentVariable(environmentName); + } + if (string.IsNullOrWhiteSpace(exporterOption)) + { + // Neither option or environment variable is set - no tracer provider will be activated. + return false; + } + + if (string.Equals(exporterOption, ListenersOptions.Exporters.AdbcFile, StringComparison.OrdinalIgnoreCase)) + { + try + { + listener = new FileActivityListener( + activitySourceName, + tracesLocation, + maxTraceFileSizeKb, + maxTraceFiles, + shouldListenTo); + return true; + } + catch (Exception ex) + { + // Swallow any exceptions to avoid impacting application behavior + Trace.WriteLine(ex.Message); + listener = null; + return false; + } + } + return false; + } + + public FileActivityListener(string fileBaseName, string? tracesLocation = null, long maxTraceFileSizeKb = MaxFileSizeKbDefault, int maxTraceFiles = MaxTraceFilesDefault, Func? shouldListenTo = default) + { + tracesLocation = string.IsNullOrWhiteSpace(tracesLocation) ? TracingLocationDefault : tracesLocation; + ValidateParameters(fileBaseName, tracesLocation!, maxTraceFileSizeKb, maxTraceFiles); + DirectoryInfo tracesDirectory = new(tracesLocation); // Ensured to be valid by ValidateParameters + Func shouldListenToAll = (source) => source.Name == fileBaseName; + _listener = new ActivityListener() + { + ShouldListenTo = shouldListenTo ?? shouldListenToAll, + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, + }; + + _tracingFile = new TracingFile(fileBaseName, tracesDirectory.FullName, maxTraceFileSizeKb, maxTraceFiles); + _activityProcessor = new ActivityProcessor(_tracingFile.WriteLineAsync); + _listener.ActivityStopped = OnActivityStopped; + ActivitySource.AddActivityListener(_listener); + + _activityProcessor.TryStartAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + } + + public void Dispose() + { + _listener.Dispose(); + _activityProcessor.Dispose(); + _tracingFile.Dispose(); + } + + private void OnActivityStopped(Activity activity) + { + // Write activity to file or other storage + _activityProcessor.TryWrite(activity); + } + + internal static void ValidateParameters(string fileBaseName, string traceLocation, long maxTraceFileSizeKb, int maxTraceFiles) + { + if (string.IsNullOrWhiteSpace(fileBaseName)) + throw new ArgumentNullException(nameof(fileBaseName)); + if (fileBaseName.IndexOfAny(Path.GetInvalidFileNameChars()) >= 0) + throw new ArgumentException("Invalid or unsupported file name", nameof(fileBaseName)); + if (string.IsNullOrWhiteSpace(traceLocation) || traceLocation.IndexOfAny(Path.GetInvalidPathChars()) >= 0) + throw new ArgumentException("Invalid or unsupported folder name", nameof(traceLocation)); + if (maxTraceFileSizeKb < 1) + throw new ArgumentException("maxTraceFileSizeKb must be greater than zero", nameof(maxTraceFileSizeKb)); + if (maxTraceFiles < 1) + throw new ArgumentException("maxTraceFiles must be greater than zero.", nameof(maxTraceFiles)); + + IsDirectoryWritable(traceLocation, throwIfFails: true); + } + + private static bool IsDirectoryWritable(string traceLocation, bool throwIfFails = false) + { + try + { + if (!Directory.Exists(traceLocation)) + { + Directory.CreateDirectory(traceLocation); + } + string tempFilePath = Path.Combine(traceLocation, Path.GetRandomFileName()); + using FileStream fs = File.Create(tempFilePath, 1, FileOptions.DeleteOnClose); + return true; + } + catch when (!throwIfFails) + { + return false; + } + } + + internal static string TracingLocationDefault { get; } = + new DirectoryInfo( + Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + ApacheArrowAdbcNamespace, + TracesFolderName)).FullName; + } +} diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivity.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs similarity index 99% rename from csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivity.cs rename to csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs index aff03a868e..2af071a681 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivity.cs +++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/SerializableActivity.cs @@ -21,7 +21,7 @@ using System.Linq; using System.Text.Json.Serialization; -namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter +namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener { /// /// Simplified version of that excludes some properties, etc. diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs b/csharp/src/Telemetry/Traces/Listeners/FileListener/TracingFile.cs similarity index 92% rename from csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs rename to csharp/src/Telemetry/Traces/Listeners/FileListener/TracingFile.cs index 3bb0a4cfc8..a6679bf4da 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/TracingFile.cs +++ b/csharp/src/Telemetry/Traces/Listeners/FileListener/TracingFile.cs @@ -23,7 +23,7 @@ using System.Threading; using System.Threading.Tasks; -namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter +namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener { /// /// Provides access to writing trace files, limiting the @@ -32,10 +32,14 @@ namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter internal class TracingFile : IDisposable { private const int KbInByes = 1024; - private static readonly string s_defaultTracePath = FileExporter.TracingLocationDefault; + private static readonly string s_defaultTracePath = FileActivityListener.TracingLocationDefault; private static readonly Random s_globalRandom = new(); private static readonly ThreadLocal s_threadLocalRandom = new(NewRandom); - private static readonly Lazy s_processId = new(() => Process.GetCurrentProcess().Id.ToString(), isThreadSafe: true); +#if NET5_0_OR_GREATER + private static readonly Lazy s_processId = new(static () => Environment.ProcessId.ToString(), isThreadSafe: true); +#else + private static readonly Lazy s_processId = new(static () => Process.GetCurrentProcess().Id.ToString(), isThreadSafe: true); +#endif private readonly string _fileBaseName; private readonly DirectoryInfo _tracingDirectory; private FileInfo? _currentTraceFileInfo; @@ -43,7 +47,7 @@ internal class TracingFile : IDisposable private readonly long _maxFileSizeKb; private readonly int _maxTraceFiles; - internal TracingFile(string fileBaseName, string? traceDirectoryPath = default, long maxFileSizeKb = FileExporter.MaxFileSizeKbDefault, int maxTraceFiles = FileExporter.MaxTraceFilesDefault) : + internal TracingFile(string fileBaseName, string? traceDirectoryPath = default, long maxFileSizeKb = FileActivityListener.MaxFileSizeKbDefault, int maxTraceFiles = FileActivityListener.MaxTraceFilesDefault) : this(fileBaseName, ResolveTraceDirectory(traceDirectoryPath), maxFileSizeKb, maxTraceFiles) { } @@ -79,9 +83,9 @@ await ActionWithRetryAsync(async () => } catch (Exception ex) { - this._currentTraceFileInfo = null; - this._currentFileStream?.Dispose(); - this._currentFileStream = null; + _currentTraceFileInfo = null; + _currentFileStream?.Dispose(); + _currentFileStream = null; Trace.TraceError(ex.ToString()); } @@ -96,7 +100,7 @@ private async Task TryRemoveOlderFiles() string deleteSearchPattern = _fileBaseName + $"-trace-*.log"; IOrderedEnumerable orderedFiles = await GetTracingFilesAsync(_tracingDirectory, deleteSearchPattern).ConfigureAwait(false); // Avoid accidentally trying to delete the current file. - FileInfo[] tracingFiles = orderedFiles.Where(f => !f.FullName.Equals(_currentTraceFileInfo?.FullName))?.ToArray() ?? new FileInfo[0]; + FileInfo[] tracingFiles = orderedFiles.Where(f => !f.FullName.Equals(_currentTraceFileInfo?.FullName))?.ToArray() ?? []; if (tracingFiles.Length >= _maxTraceFiles) { int lastIndex = Math.Max(0, _maxTraceFiles - 1); @@ -122,6 +126,8 @@ private async Task WriteSingleLineAsync(Stream stream) await OpenNewTracingFileAsync().ConfigureAwait(false); } await stream.CopyToAsync(_currentFileStream).ConfigureAwait(false); + // Flush for robustness to crashing + await stream.FlushAsync().ConfigureAwait(false); } private async Task OpenNewTracingFileAsync() diff --git a/csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivityJsonContext.cs b/csharp/src/Telemetry/Traces/Listeners/ListenersOptions.cs similarity index 56% rename from csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivityJsonContext.cs rename to csharp/src/Telemetry/Traces/Listeners/ListenersOptions.cs index b5cec25f6f..ad59d74994 100644 --- a/csharp/src/Telemetry/Traces/Exporters/FileExporter/SerializableActivityJsonContext.cs +++ b/csharp/src/Telemetry/Traces/Listeners/ListenersOptions.cs @@ -15,18 +15,21 @@ * limitations under the License. */ -using System.Text.Json.Serialization; - -namespace Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter +namespace Apache.Arrow.Adbc.Telemetry.Traces.Listeners { - /// - /// Provides a source-generated JSON serialization context for the type. - /// - /// This context is used to optimize JSON serialization and deserialization of objects by leveraging source generation. It is intended for internal use within - /// the application. - [JsonSerializable(typeof(SerializableActivity))] - internal partial class SerializableActivityJsonContext : JsonSerializerContext + public class ListenersOptions { + public const string Exporter = "adbc.traces.exporter"; + + public static class Environment + { + public const string Exporter = "OTEL_TRACES_EXPORTER"; + } + + public static class Exporters + { + public const string None = "none"; + public const string AdbcFile = "adbcfile"; + } } } diff --git a/csharp/src/Telemetry/Traces/Listeners/Properties/AssemblyInfo.cs b/csharp/src/Telemetry/Traces/Listeners/Properties/AssemblyInfo.cs new file mode 100644 index 0000000000..bfb6f13536 --- /dev/null +++ b/csharp/src/Telemetry/Traces/Listeners/Properties/AssemblyInfo.cs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Telemetry.Traces.Exporters, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")] +[assembly: InternalsVisibleTo("Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners, PublicKey=0024000004800000940000000602000000240000525341310004000001000100e504183f6d470d6b67b6d19212be3e1f598f70c246a120194bc38130101d0c1853e4a0f2232cb12e37a7a90e707aabd38511dac4f25fcb0d691b2aa265900bf42de7f70468fc997551a40e1e0679b605aa2088a4a69e07c117e988f5b1738c570ee66997fba02485e7856a49eca5fd0706d09899b8312577cbb9034599fc92d4")] diff --git a/csharp/src/Telemetry/Traces/Listeners/readme.md b/csharp/src/Telemetry/Traces/Listeners/readme.md new file mode 100644 index 0000000000..1328d87bdb --- /dev/null +++ b/csharp/src/Telemetry/Traces/Listeners/readme.md @@ -0,0 +1,49 @@ + + +# Traces Listeners + +## FileActivityListener + +Provides an ActivityListener to write telemetry traces to +rotating files in folder. File names are created with the following pattern: +`--.log`. + +For example: `apache.arrow.adbc.drivers.databricks-2025-08-15-10-35-56-012345-99999.log`. + +The default folder used is: + +| Platform | Folder | +| --- | --- | +| Windows | `%LOCALAPPDATA%/Apache.Arrow.Adbc/Traces` | +| macOS | `$HOME/Library/Application Support/Apache.Arrow.Adbc/Traces` | +| Linux | `$HOME/.local/share/Apache.Arrow.Adbc/Traces` | + +By default, up to 999 files of maximum size 1024 KB are written to +the trace folder. + +The environment variable `OTEL_TRACES_EXPORTER` can be used to select one of the +available exporters. Or the database parameter `adbc.traces.exporter` can be used, +which has precedence over the environment variable. + +The following listeners are supported: + +| Listener | Description | +| --- | --- | +| `adbcfile` | Exports traces to rotating files in a folder. | diff --git a/csharp/test/Drivers/Apache/Common/TelemetryTests.cs b/csharp/test/Drivers/Apache/Common/TelemetryTests.cs new file mode 100644 index 0000000000..12be91cf92 --- /dev/null +++ b/csharp/test/Drivers/Apache/Common/TelemetryTests.cs @@ -0,0 +1,110 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; +using Apache.Arrow.Adbc.Tracing; +using Xunit; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Common +{ + public abstract class TelemetryTests : TestBase + where TConfig : TestConfiguration + where TEnv : CommonTestEnvironment + { + public TelemetryTests(ITestOutputHelper output, TestEnvironment.Factory testEnvFactory) + : base(output, testEnvFactory) { } + + [SkippableTheory] + [InlineData(ListenersOptions.Exporters.AdbcFile)] + [InlineData(null)] + [InlineData(ListenersOptions.Exporters.None)] + public void CanEnableFileTracingExporterViaEnvVariable(string? exporterName) + { + Environment.SetEnvironmentVariable(ListenersOptions.Environment.Exporter, exporterName); + + DirectoryInfo directoryInfo = GetTracesDirectoryInfo(); + ResetTraceDirectory(directoryInfo); + + directoryInfo.Refresh(); + IEnumerable files = directoryInfo.EnumerateFiles(); + Assert.Empty(files); + string activitySourceName = string.Empty; + + try + { + Dictionary parameters = GetDriverParameters(TestConfiguration); + using (AdbcDatabase database = NewDriver.Open(parameters)) + { + try + { + using AdbcConnection connection = database.Connect(new Dictionary()); + TracingConnection? tc = connection as TracingConnection; + Assert.NotNull(tc); + Assert.True(string.IsNullOrEmpty(tc.ActivitySourceName), "expecting non-empty ActivitySourceName"); + activitySourceName = tc.ActivitySourceName; + } + catch (Exception ex) + { + // We don't really need the connection to succeed for this test, + OutputHelper?.WriteLine(ex.Message); + } + } + + directoryInfo.Refresh(); + files = directoryInfo.EnumerateFiles(); + switch (exporterName) + { + case ListenersOptions.Exporters.AdbcFile: + Assert.NotEmpty(files); + Assert.NotEqual(0, files.First().Length); + Assert.StartsWith(activitySourceName, files.First().Name); + break; + default: + Assert.Empty(files); + break; + } + } + finally + { + ResetTraceDirectory(directoryInfo, create: false); + } + } + + private static void ResetTraceDirectory(DirectoryInfo directoryInfo, bool create = true) + { + if (directoryInfo.Exists) + { + directoryInfo.Delete(recursive: true); + } + if (create) + { + directoryInfo.Create(); + } + } + + private static DirectoryInfo GetTracesDirectoryInfo() => + new DirectoryInfo(Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "Apache.Arrow.Adbc", + "Traces")); + } +} diff --git a/csharp/test/Drivers/Apache/Hive2/TelemetryTests.cs b/csharp/test/Drivers/Apache/Hive2/TelemetryTests.cs new file mode 100644 index 0000000000..6082a49bfc --- /dev/null +++ b/csharp/test/Drivers/Apache/Hive2/TelemetryTests.cs @@ -0,0 +1,29 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Hive2; + +public class TelemetryTests : TelemetryTests +{ + public TelemetryTests(ITestOutputHelper outputHelper) + : base(outputHelper, new HiveServer2TestEnvironment.Factory()) + { + } +} diff --git a/csharp/test/Drivers/Apache/Impala/TelemetryTests.cs b/csharp/test/Drivers/Apache/Impala/TelemetryTests.cs new file mode 100644 index 0000000000..94ab33cdda --- /dev/null +++ b/csharp/test/Drivers/Apache/Impala/TelemetryTests.cs @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Impala +{ + public class TelemetryTests : TelemetryTests + { + public TelemetryTests(ITestOutputHelper outputHelper) + : base(outputHelper, new ImpalaTestEnvironment.Factory()) + { + } + } +} diff --git a/csharp/test/Drivers/Apache/Spark/TelemetryTests.cs b/csharp/test/Drivers/Apache/Spark/TelemetryTests.cs new file mode 100644 index 0000000000..aacdb1f3d9 --- /dev/null +++ b/csharp/test/Drivers/Apache/Spark/TelemetryTests.cs @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark +{ + public class TelemetryTests : TelemetryTests + { + public TelemetryTests(ITestOutputHelper outputHelper) + : base(outputHelper, new SparkTestEnvironment.Factory()) + { + } + } +} diff --git a/csharp/test/Drivers/BigQuery/TelemetryTests.cs b/csharp/test/Drivers/BigQuery/TelemetryTests.cs new file mode 100644 index 0000000000..ba30eddad3 --- /dev/null +++ b/csharp/test/Drivers/BigQuery/TelemetryTests.cs @@ -0,0 +1,120 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using Apache.Arrow.Adbc.Drivers.BigQuery; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; +using Apache.Arrow.Adbc.Tracing; +using Xunit; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.BigQuery +{ + public class TelemetryTests + { + readonly BigQueryTestConfiguration _testConfiguration; + readonly List _environments; + readonly ITestOutputHelper _outputHelper; + + public TelemetryTests(ITestOutputHelper outputHelper) + { + _testConfiguration = MultiEnvironmentTestUtils.LoadMultiEnvironmentTestConfiguration(BigQueryTestingUtils.BIGQUERY_TEST_CONFIG_VARIABLE); + _environments = MultiEnvironmentTestUtils.GetTestEnvironments(_testConfiguration); + _outputHelper = outputHelper; + } + + [SkippableTheory] + [InlineData(ListenersOptions.Exporters.AdbcFile)] + [InlineData(null)] + [InlineData(ListenersOptions.Exporters.None)] + public void CanEnableFileTracingExporterViaEnvVariable(string? exporterName) + { + Environment.SetEnvironmentVariable(ListenersOptions.Environment.Exporter, exporterName); + + foreach (BigQueryTestEnvironment environment in _environments) + { + DirectoryInfo directoryInfo = GetTracesDirectoryInfo(); + ResetTraceDirectory(directoryInfo); + + directoryInfo.Refresh(); + IEnumerable files = directoryInfo.EnumerateFiles(); + Assert.Empty(files); + string activitySourceName = string.Empty; + + try + { + Dictionary parameters = BigQueryTestingUtils.GetBigQueryParameters(environment); + using (AdbcDatabase database = new BigQueryDriver().Open(parameters)) + { + try + { + using AdbcConnection connection = database.Connect(new Dictionary()); + TracingConnection? tc = connection as TracingConnection; + Assert.NotNull(tc); + Assert.True(tc.ActivitySourceName.Length > 0, "Activity source name should not be empty"); + activitySourceName = tc.ActivitySourceName; + } + catch (Exception ex) + { + // We don't really need the connection to succeed for this test, + _outputHelper?.WriteLine(ex.Message); + } + } + + directoryInfo.Refresh(); + files = directoryInfo.EnumerateFiles(); + switch (exporterName) + { + case ListenersOptions.Exporters.AdbcFile: + Assert.NotEmpty(files); + Assert.NotEqual(0, files.First().Length); + Assert.StartsWith(activitySourceName, files.First().Name, StringComparison.OrdinalIgnoreCase); + break; + default: + Assert.Empty(files); + break; + } + } + finally + { + ResetTraceDirectory(directoryInfo, create: false); + } + } + } + + private static void ResetTraceDirectory(DirectoryInfo directoryInfo, bool create = true) + { + if (directoryInfo.Exists) + { + directoryInfo.Delete(recursive: true); + } + if (create) + { + directoryInfo.Create(); + } + } + + private static DirectoryInfo GetTracesDirectoryInfo() => + new DirectoryInfo(Path.Combine( + Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), + "Apache.Arrow.Adbc", + "Traces")); + } +} diff --git a/csharp/test/Drivers/Databricks/E2E/TelemetryTests.cs b/csharp/test/Drivers/Databricks/E2E/TelemetryTests.cs new file mode 100644 index 0000000000..0b257b3eec --- /dev/null +++ b/csharp/test/Drivers/Databricks/E2E/TelemetryTests.cs @@ -0,0 +1,30 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using Apache.Arrow.Adbc.Tests.Drivers.Apache.Common; +using Xunit.Abstractions; + +namespace Apache.Arrow.Adbc.Tests.Drivers.Databricks +{ + public class TelemetryTests : TelemetryTests + { + public TelemetryTests(ITestOutputHelper outputHelper) + : base(outputHelper, new DatabricksTestEnvironment.Factory()) + { + } + } +} diff --git a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs index 2eaf7dc72d..4e62ea67b3 100644 --- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs +++ b/csharp/test/Telemetry/Traces/Exporters/FileExporter/FileExporterTests.cs @@ -193,7 +193,6 @@ internal async Task CanSetSingleMaxFiles() { const long maxTraceFileSizeKb = 5; const int maxTraceFiles = 1; - var delay = TimeSpan.FromSeconds(8); string customFolderName = ExportersBuilderTests.NewName(); string traceFolder = Path.Combine(s_localApplicationDataFolderPath, customFolderName); @@ -265,8 +264,8 @@ internal async Task CanTraceMultipleConcurrentWriters() .Build(); var tasks = new Task[] { - Task.Run(async () => await TraceActivities(traceFolder, "activity1", writeCount, provider1)), - Task.Run(async () => await TraceActivities(traceFolder, "activity2", writeCount, provider2)), + Task.Run(async () => await TraceActivities("activity1", writeCount, provider1)), + Task.Run(async () => await TraceActivities("activity2", writeCount, provider2)), }; await Task.WhenAll(tasks); await Task.Delay(500); @@ -301,7 +300,7 @@ internal async Task CanTraceMultipleConcurrentWriters() } } - private async Task TraceActivities(string traceFolder, string activityName, int writeCount, TracerProvider provider) + private async Task TraceActivities(string activityName, int writeCount, TracerProvider provider) { for (int i = 0; i < writeCount; i++) { diff --git a/csharp/test/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.csproj b/csharp/test/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.csproj new file mode 100644 index 0000000000..935cbfb7af --- /dev/null +++ b/csharp/test/Telemetry/Traces/Listeners/Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.csproj @@ -0,0 +1,29 @@ + + + + net8.0;net472 + net8.0 + enable + enable + + false + true + + + + + + + + + + + + + + + + + + + diff --git a/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs new file mode 100644 index 0000000000..ad33ee809a --- /dev/null +++ b/csharp/test/Telemetry/Traces/Listeners/FileListener/FileActivityListenerTests.cs @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +using System.Diagnostics; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener; +using Apache.Arrow.Adbc.Tracing; +using Apache.Arrow.Ipc; + +namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.FileListener +{ + public class FileActivityListenerTests + { + private const string TraceLocation = "adbc.trace.location"; + + [Theory] + [InlineData(null, false)] + [InlineData("", false)] + [InlineData(" ", false)] + [InlineData(ListenersOptions.Exporters.None, false)] + [InlineData(ListenersOptions.Exporters.AdbcFile, true)] + public void TestTryActivateFileListener(string? exporterOption, bool expected) + { + Assert.Equal(expected, FileActivityListener.TryActivateFileListener("TestSource", exporterOption, out FileActivityListener? listener)); + Assert.True(expected == (listener != null)); + listener?.Dispose(); + } + + [Fact] + public async Task CanTraceConcurrentConnections() + { + const int numConnections = 5; + const int numActivitiesPerConnection = 1000; + string folderLocation = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.LocalApplicationData), TracingFileTests.NewName()); + + try + { + TestConnection[] connections = new TestConnection[numConnections]; + for (int i = 0; i < numConnections; i++) + { + connections[i] = new TestConnection(new Dictionary + { + { ListenersOptions.Exporter, ListenersOptions.Exporters.AdbcFile }, + { TraceLocation, folderLocation }, + }); + } + + Task[] tasks = new Task[numConnections]; + for (int i = 0; i < numConnections; i++) + { + TestConnection testConnection = connections[i]; + int connectionId = i; + tasks[i] = Task.Run(async () => + { + for (int j = 0; j < numActivitiesPerConnection; j++) + { + await testConnection.EmulateWorkAsync("Key", $"Value-{connectionId}-{j}"); + } + }); + } + for (int i = 0; i < numConnections; i++) + { + await tasks[i]; + tasks[i].Dispose(); + } + for (int i = 0; i < numConnections; i++) + { + var testConnection = (TestConnection)connections[i]; + testConnection.Dispose(); + } + + Assert.True(Directory.Exists(folderLocation)); + DirectoryInfo dirInfo = new(folderLocation); + FileInfo[] files = dirInfo.GetFiles(); + Assert.True(files.Length > 0, "No trace files were created."); + int totalLines = 0; + foreach (FileInfo file in files) + { + totalLines += File.ReadAllLines(file.FullName).Length; + } + Assert.Equal(numConnections * numActivitiesPerConnection, totalLines); + } + finally + { + if (Directory.Exists(folderLocation)) + { + Directory.Delete(folderLocation, recursive: true); + } + } + } + + private class TestConnection : TracingConnection + { + private static readonly string s_assemblyName = typeof(TestConnection).Assembly.GetName().Name!; + private static readonly string s_assemblyVersion = typeof(TestConnection).Assembly.GetName().Version!.ToString(); + + private readonly string _traceId = Guid.NewGuid().ToString("N"); + private readonly FileActivityListener? _fileListener; + + public TestConnection(IReadOnlyDictionary properties) + : base(properties) + { + properties.TryGetValue(TraceLocation, out string? tracesLocation); + properties.TryGetValue(ListenersOptions.Exporter, out string? exporterOption); + bool shouldListenTo(ActivitySource source) => source.Tags?.Any(t => ReferenceEquals(t.Key, _traceId)) == true; + FileActivityListener.TryActivateFileListener(ActivitySourceName, exporterOption, out _fileListener, shouldListenTo, tracesLocation: tracesLocation); + } + + public async Task EmulateWorkAsync(string key, string value) + { + await this.TraceActivityAsync(async (activity) => + { + activity?.SetTag(key, value); + // Simulate some work + await Task.Yield(); + }); + } + + public override IEnumerable>? GetActivitySourceTags(IReadOnlyDictionary properties) + { + IEnumerable>? tags = base.GetActivitySourceTags(properties); + tags ??= []; + tags = tags.Concat([new(_traceId, null)]); + return tags; + } + + public override string AssemblyVersion => s_assemblyVersion; + + public override string AssemblyName => s_assemblyName; + + public override AdbcStatement CreateStatement() => throw new NotImplementedException(); + public override IArrowArrayStream GetObjects(GetObjectsDepth depth, string? catalogPattern, string? dbSchemaPattern, string? tableNamePattern, IReadOnlyList? tableTypes, string? columnNamePattern) => throw new NotImplementedException(); + public override Schema GetTableSchema(string? catalog, string? dbSchema, string tableName) => throw new NotImplementedException(); + public override IArrowArrayStream GetTableTypes() => throw new NotImplementedException(); + + protected override void Dispose(bool disposing) + { + _fileListener?.Dispose(); + base.Dispose(disposing); + } + } + } +} diff --git a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs b/csharp/test/Telemetry/Traces/Listeners/TracingFileTests.cs similarity index 82% rename from csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs rename to csharp/test/Telemetry/Traces/Listeners/TracingFileTests.cs index 3392ea031f..54504a729a 100644 --- a/csharp/test/Telemetry/Traces/Exporters/FileExporter/TracingFileTests.cs +++ b/csharp/test/Telemetry/Traces/Listeners/TracingFileTests.cs @@ -15,15 +15,10 @@ * limitations under the License. */ -using System; -using System.Collections.Generic; -using System.IO; using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; -using Apache.Arrow.Adbc.Telemetry.Traces.Exporters.FileExporter; +using Apache.Arrow.Adbc.Telemetry.Traces.Listeners.FileListener; -namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Exporters.FileExporter +namespace Apache.Arrow.Adbc.Tests.Telemetry.Traces.Listeners.FileListener { public class TracingFileTests { @@ -40,12 +35,12 @@ public class TracingFileTests [Fact] internal async Task TestMultipleConcurrentTracingFiles() { - CancellationTokenSource tokenSource = new CancellationTokenSource(); + CancellationTokenSource tokenSource = new(); int concurrentCount = 50; Task[] tasks = new Task[concurrentCount]; int[] lineCounts = new int[concurrentCount]; - string sourceName = ExportersBuilderTests.NewName(); - string customFolderName = ExportersBuilderTests.NewName(); + string sourceName = NewName(); + string customFolderName = NewName(); string traceFolder = Path.Combine(s_localApplicationDataFolderPath, customFolderName); if (Directory.Exists(traceFolder)) Directory.Delete(traceFolder, true); try @@ -56,7 +51,7 @@ internal async Task TestMultipleConcurrentTracingFiles() } await Task.WhenAll(tasks); - foreach (var file in Directory.GetFiles(traceFolder)) + foreach (string file in Directory.GetFiles(traceFolder)) { foreach (string line in File.ReadLines(file)) { @@ -80,8 +75,8 @@ internal async Task TestMultipleConcurrentTracingFiles() private async Task Run(string sourceName, string traceFolder, CancellationToken cancellationToken) { int instanceNumber = Interlocked.Increment(ref _testInstance) - 1; - using TracingFile tracingFile = new TracingFile(sourceName, traceFolder); - await foreach (var stream in GetLinesAsync(instanceNumber, 100, cancellationToken)) + using TracingFile tracingFile = new(sourceName, traceFolder); + await foreach (Stream stream in GetLinesAsync(instanceNumber, 100, cancellationToken)) { await tracingFile.WriteLineAsync(stream, cancellationToken); } @@ -93,8 +88,10 @@ private static async IAsyncEnumerable GetLinesAsync(int instanceNumber, { if (cancellationToken.IsCancellationRequested) yield break; yield return new MemoryStream(System.Text.Encoding.UTF8.GetBytes($"line{instanceNumber}" + Environment.NewLine)); - await Task.Delay(10); // Simulate some delay + await Task.Delay(10, cancellationToken); // Simulate some delay } } + + internal static string NewName() => Guid.NewGuid().ToString("N"); } }