Skip to content

Commit 4bc398f

Browse files
[otlp] Deserialize GrpcStatusDetailsHeader to Retrieve Grpc Retry Delay (open-telemetry#5980)
Co-authored-by: Mikel Blanchard <[email protected]>
1 parent 68c79a3 commit 4bc398f

File tree

2 files changed

+651
-0
lines changed

2 files changed

+651
-0
lines changed
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
using System.Text;
5+
6+
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient.Grpc;
7+
8+
internal static class GrpcStatusDeserializer
9+
{
10+
#pragma warning disable SA1310 // Field names should not contain underscore
11+
// Wire types in protocol buffers
12+
private const int WIRETYPE_VARINT = 0;
13+
private const int WIRETYPE_FIXED64 = 1;
14+
private const int WIRETYPE_LENGTH_DELIMITED = 2;
15+
private const int WIRETYPE_FIXED32 = 5;
16+
#pragma warning restore SA1310 // Field names should not contain underscore
17+
18+
internal static TimeSpan? TryGetGrpcRetryDelay(string? grpcStatusDetailsHeader)
19+
{
20+
try
21+
{
22+
var retryInfo = ExtractRetryInfo(grpcStatusDetailsHeader);
23+
if (retryInfo?.RetryDelay != null)
24+
{
25+
return TimeSpan.FromSeconds(retryInfo.Value.RetryDelay.Value.Seconds) +
26+
TimeSpan.FromTicks(retryInfo.Value.RetryDelay.Value.Nanos / 100); // Convert nanos to ticks
27+
}
28+
}
29+
catch
30+
{
31+
// TODO: Log exception to event source.
32+
return null;
33+
}
34+
35+
return null;
36+
}
37+
38+
// Marked as internal for test.
39+
internal static Status? DeserializeStatus(string? grpcStatusDetailsBin)
40+
{
41+
if (string.IsNullOrWhiteSpace(grpcStatusDetailsBin))
42+
{
43+
return null;
44+
}
45+
46+
var status = new Status();
47+
byte[] data = Convert.FromBase64String(grpcStatusDetailsBin);
48+
using (var stream = new MemoryStream(data))
49+
{
50+
while (stream.Position < stream.Length)
51+
{
52+
var tag = DecodeTag(stream);
53+
var fieldNumber = tag >> 3;
54+
var wireType = tag & 0x7;
55+
56+
switch (fieldNumber)
57+
{
58+
case 1: // code
59+
status.Code = DecodeInt32(stream);
60+
break;
61+
case 2: // message
62+
status.Message = DecodeString(stream);
63+
break;
64+
case 3: // details
65+
status.Details.Add(DecodeAny(stream));
66+
break;
67+
default:
68+
SkipField(stream, wireType);
69+
break;
70+
}
71+
}
72+
}
73+
74+
return status;
75+
}
76+
77+
// Marked as internal for test.
78+
internal static RetryInfo? ExtractRetryInfo(string? grpcStatusDetailsBin)
79+
{
80+
var status = DeserializeStatus(grpcStatusDetailsBin);
81+
if (status == null)
82+
{
83+
return null;
84+
}
85+
86+
foreach (var detail in status.Value.Details)
87+
{
88+
if (detail.TypeUrl != null && detail.TypeUrl.EndsWith("/google.rpc.RetryInfo"))
89+
{
90+
return DeserializeRetryInfo(detail.Value!);
91+
}
92+
}
93+
94+
return null;
95+
}
96+
97+
private static RetryInfo? DeserializeRetryInfo(byte[] data)
98+
{
99+
RetryInfo? retryInfo = null;
100+
using (var stream = new MemoryStream(data))
101+
{
102+
while (stream.Position < stream.Length)
103+
{
104+
var tag = DecodeTag(stream);
105+
var fieldNumber = tag >> 3;
106+
var wireType = tag & 0x7;
107+
108+
switch (fieldNumber)
109+
{
110+
case 1: // retry_delay
111+
retryInfo = new RetryInfo(DecodeDuration(stream));
112+
break;
113+
default:
114+
SkipField(stream, wireType);
115+
break;
116+
}
117+
}
118+
}
119+
120+
return retryInfo;
121+
}
122+
123+
private static Duration DecodeDuration(Stream stream)
124+
{
125+
var length = DecodeVarint(stream);
126+
var endPosition = stream.Position + length;
127+
long seconds = 0;
128+
int nanos = 0;
129+
130+
while (stream.Position < endPosition)
131+
{
132+
var tag = DecodeTag(stream);
133+
var fieldNumber = tag >> 3;
134+
var wireType = tag & 0x7;
135+
136+
switch (fieldNumber)
137+
{
138+
case 1: // seconds
139+
seconds = DecodeInt64(stream);
140+
break;
141+
case 2: // nanos
142+
nanos = DecodeInt32(stream);
143+
break;
144+
default:
145+
SkipField(stream, wireType);
146+
break;
147+
}
148+
}
149+
150+
return new Duration(seconds, nanos);
151+
}
152+
153+
private static Any DecodeAny(Stream stream)
154+
{
155+
var length = DecodeVarint(stream);
156+
var endPosition = stream.Position + length;
157+
158+
string? typeUrl = null;
159+
byte[]? value = null;
160+
161+
while (stream.Position < endPosition)
162+
{
163+
var tag = DecodeTag(stream);
164+
var fieldNumber = tag >> 3;
165+
var wireType = tag & 0x7;
166+
167+
switch (fieldNumber)
168+
{
169+
case 1: // type_url
170+
typeUrl = DecodeString(stream);
171+
break;
172+
case 2: // value
173+
value = DecodeBytes(stream);
174+
break;
175+
default:
176+
SkipField(stream, wireType);
177+
break;
178+
}
179+
}
180+
181+
return new Any(typeUrl, value);
182+
}
183+
184+
private static uint DecodeTag(Stream stream)
185+
{
186+
return (uint)DecodeVarint(stream);
187+
}
188+
189+
private static long DecodeVarint(Stream stream)
190+
{
191+
long result = 0;
192+
int shift = 0;
193+
194+
while (true)
195+
{
196+
int b = stream.ReadByte();
197+
if (b == -1)
198+
{
199+
throw new EndOfStreamException();
200+
}
201+
202+
result |= (long)(b & 0x7F) << shift;
203+
if ((b & 0x80) == 0)
204+
{
205+
return result;
206+
}
207+
208+
shift += 7;
209+
if (shift >= 64)
210+
{
211+
throw new InvalidDataException("Invalid varint");
212+
}
213+
}
214+
}
215+
216+
private static int DecodeInt32(Stream stream) => (int)DecodeVarint(stream);
217+
218+
private static long DecodeInt64(Stream stream) => DecodeVarint(stream);
219+
220+
private static string DecodeString(Stream stream)
221+
{
222+
var bytes = DecodeBytes(stream);
223+
return Encoding.UTF8.GetString(bytes);
224+
}
225+
226+
private static byte[] DecodeBytes(Stream stream)
227+
{
228+
var length = (int)DecodeVarint(stream);
229+
var buffer = new byte[length];
230+
int read = stream.Read(buffer, 0, length);
231+
if (read != length)
232+
{
233+
throw new EndOfStreamException();
234+
}
235+
236+
return buffer;
237+
}
238+
239+
private static void SkipField(Stream stream, uint wireType)
240+
{
241+
switch (wireType)
242+
{
243+
case WIRETYPE_VARINT:
244+
DecodeVarint(stream);
245+
break;
246+
case WIRETYPE_FIXED64:
247+
stream.Position += 8;
248+
break;
249+
case WIRETYPE_LENGTH_DELIMITED:
250+
var length = DecodeVarint(stream);
251+
stream.Position += length;
252+
break;
253+
case WIRETYPE_FIXED32:
254+
stream.Position += 4;
255+
break;
256+
default:
257+
throw new InvalidDataException($"Unknown wire type: {wireType}");
258+
}
259+
}
260+
261+
internal readonly struct Duration
262+
{
263+
internal Duration(long seconds, int nanos)
264+
{
265+
this.Seconds = seconds;
266+
this.Nanos = nanos;
267+
}
268+
269+
public long Seconds { get; }
270+
271+
public int Nanos { get; }
272+
}
273+
274+
internal readonly struct RetryInfo
275+
{
276+
public RetryInfo(Duration? retryDelay)
277+
{
278+
this.RetryDelay = retryDelay;
279+
}
280+
281+
public Duration? RetryDelay { get; }
282+
}
283+
284+
internal readonly struct Any
285+
{
286+
public Any(string? typeUrl, byte[]? value)
287+
{
288+
this.TypeUrl = typeUrl;
289+
this.Value = value;
290+
}
291+
292+
public string? TypeUrl { get; }
293+
294+
public byte[]? Value { get; }
295+
}
296+
297+
internal struct Status
298+
{
299+
public Status()
300+
{
301+
this.Code = 0;
302+
this.Message = null;
303+
this.Details = [];
304+
}
305+
306+
public int Code { get; set; }
307+
308+
public string? Message { get; set; }
309+
310+
public List<Any> Details { get; set; }
311+
}
312+
}

0 commit comments

Comments
 (0)