Skip to content

Commit 9642434

Browse files
author
Lessley Dennington
committed
trace2: add collectorwriter
Add the Trace2CollectorWriter class to accept Trace2 messages in the event target format and write them to the OTel Collector/Telemetry Service.
1 parent 4adb60e commit 9642434

File tree

2 files changed

+132
-0
lines changed

2 files changed

+132
-0
lines changed

src/shared/Core/Trace2.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,4 +106,6 @@ public abstract class Trace2Message
106106

107107
[JsonProperty("line", Order = 6)]
108108
public int Line { get; set; }
109+
110+
public abstract string ToJson();
109111
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Diagnostics;
4+
using System.IO.Pipes;
5+
using System.Text;
6+
using System.Threading;
7+
using KnownGitCfg = GitCredentialManager.Constants.GitConfiguration;
8+
9+
namespace GitCredentialManager
10+
{
11+
/// <summary>
12+
/// Accepts string messages from multiple threads and dispatches them over a named pipe from a
13+
/// background thread.
14+
/// </summary>
15+
public class Trace2CollectorWriter : DisposableObject, ITrace2Writer
16+
{
17+
private const int DefaultMaxQueueSize = 256;
18+
19+
private readonly Func<NamedPipeClientStream> _createPipeFunc;
20+
private readonly BlockingCollection<string> _queue;
21+
22+
private Thread _writerThread;
23+
private NamedPipeClientStream _pipeClient;
24+
25+
public bool Failed { get; private set; }
26+
27+
public Trace2CollectorWriter(Func<NamedPipeClientStream> createPipeFunc,
28+
int maxQueueSize = DefaultMaxQueueSize)
29+
{
30+
EnsureArgument.NotNull(createPipeFunc, nameof(createPipeFunc));
31+
EnsureArgument.Positive(maxQueueSize, nameof(maxQueueSize));
32+
33+
_createPipeFunc = createPipeFunc;
34+
_queue = new BlockingCollection<string>(new ConcurrentQueue<string>(), boundedCapacity: maxQueueSize);
35+
36+
Start();
37+
}
38+
39+
public void Write(Trace2Message message)
40+
{
41+
_queue.TryAdd(message.ToJson());
42+
}
43+
44+
protected override void ReleaseManagedResources()
45+
{
46+
Stop();
47+
48+
_pipeClient.Dispose();
49+
_queue.Dispose();
50+
base.ReleaseManagedResources();
51+
52+
_pipeClient = null;
53+
_writerThread = null;
54+
}
55+
56+
private void Start()
57+
{
58+
try
59+
{
60+
_writerThread = new Thread(BackgroundWriterThreadProc)
61+
{
62+
Name = nameof(Trace2CollectorWriter),
63+
IsBackground = true
64+
};
65+
66+
_writerThread.Start();
67+
// Create a new pipe stream instance using the provided factory
68+
_pipeClient = _createPipeFunc();
69+
70+
// Specify an instantaneous timeout because we don't want to hold up the
71+
// background thread loop if the pipe is not available.
72+
_pipeClient.Connect(timeout: 0);
73+
}
74+
catch
75+
{
76+
// Start failed. Disable this writer for this run.
77+
Failed = true;
78+
}
79+
}
80+
81+
private void Stop()
82+
{
83+
if (_queue.IsAddingCompleted)
84+
{
85+
return;
86+
}
87+
88+
// Signal to the queue draining thread that it should drain once more and then terminate.
89+
_queue.CompleteAdding();
90+
_writerThread.Join();
91+
ReleaseManagedResources();
92+
}
93+
94+
private void BackgroundWriterThreadProc()
95+
{
96+
// Drain the queue of all messages currently in the queue.
97+
// TryTake() using an infinite timeout will block until either a message is available (returns true)
98+
// or the queue has been marked as completed _and_ is empty (returns false).
99+
while (_queue.TryTake(out string message, Timeout.Infinite))
100+
{
101+
if (message != null)
102+
{
103+
WriteMessage(message);
104+
}
105+
}
106+
}
107+
108+
private void WriteMessage(string message)
109+
{
110+
try
111+
{
112+
// We should signal the end of each message with a line-feed (LF) character.
113+
if (!message.EndsWith("\n"))
114+
{
115+
message += '\n';
116+
}
117+
118+
byte[] data = Encoding.UTF8.GetBytes(message);
119+
_pipeClient.Write(data, 0, data.Length);
120+
_pipeClient.Flush();
121+
}
122+
catch
123+
{
124+
// We can't send this message for some reason (e.g., broken pipe); we attempt no recovery or retry
125+
// mechanism but rather disable the writer for the rest of this run.
126+
Failed = true;
127+
}
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)