Skip to content

Commit e515d00

Browse files
committed
Switch to gRPC implementation
1 parent b49021e commit e515d00

29 files changed

+3685
-25
lines changed

.netconfig

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,3 +169,70 @@
169169
weak
170170
sha = a1ec2c6746d96b4f6f140509aa68dcff09271146
171171
etag = 9e5c6908edc34eb661d647671f79153d8f3a54ebdc848c8765c78d2715f2f657
172+
[file "src/Extensions.Grok/proto/."]
173+
url = https://github.com/xai-org/xai-proto/tree/main/proto/xai/api/v1/
174+
[file "src/Extensions.Grok/proto/auth.proto"]
175+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/auth.proto
176+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
177+
etag = ea074da4f67b67ebf0e8a0b2114a1720d6b3176a8324aa1cfd22740e78049d80
178+
weak
179+
[file "src/Extensions.Grok/proto/chat.proto"]
180+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/chat.proto
181+
sha = afc88be2698cf4fb5ad476734d02b931241c0624
182+
etag = b451f89cb67e77a021f258babf6b52118570afff3de53d2bf4463a1076031e69
183+
weak
184+
[file "src/Extensions.Grok/proto/deferred.proto"]
185+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/deferred.proto
186+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
187+
etag = d47b84dddfc4252abbced302fde18ea8093933a3a8d5516350db596d3ae86595
188+
weak
189+
[file "src/Extensions.Grok/proto/documents.proto"]
190+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/documents.proto
191+
sha = 736b835b0c0dd93698664732daad49f87a2fbc6f
192+
etag = 3719cf7bc6280bc244ec25290be31fc925c95d0833f5fe282d9d0be805827ec6
193+
weak
194+
[file "src/Extensions.Grok/proto/embed.proto"]
195+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/embed.proto
196+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
197+
etag = e39d176e278c4b31be375fe8bd7a21a17fceb61422b38fb0ba0341bdb58e6b36
198+
weak
199+
[file "src/Extensions.Grok/proto/image.proto"]
200+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/image.proto
201+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
202+
etag = 72140789ccabd84bb51e0120c4b86f78ffa03de324410887034c8c45684004c6
203+
weak
204+
[file "src/Extensions.Grok/proto/models.proto"]
205+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/models.proto
206+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
207+
etag = af6557257e396c857f85997e118358a689a5627303d0d4c89124aae0691813c5
208+
weak
209+
[file "src/Extensions.Grok/proto/sample.proto"]
210+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/sample.proto
211+
sha = 6c67dda8aed9c1c82cb75d4548c14785c43c654c
212+
etag = 0749bb07dcf0078c5cf5832b7c78abec3a332197d83781eacc57999af8846a65
213+
weak
214+
[file "src/Extensions.Grok/proto/tokenize.proto"]
215+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/tokenize.proto
216+
sha = 626a3a3f22d8aa11c7a185bf8902e7d415df0462
217+
etag = 000b345ae3d238d6be847eb8336b22f89d43a8bc1876d0bebfb81c1930260f2e
218+
weak
219+
[file "src/Extensions.Grok/proto/usage.proto"]
220+
url = https://github.com/xai-org/xai-proto/blob/main/proto/xai/api/v1/usage.proto
221+
sha = afc88be2698cf4fb5ad476734d02b931241c0624
222+
etag = e760ecb2f328565e57bbd0ad1fec6a62004088c8b217c0cb178653cd2c1bf432
223+
weak
224+
[file "src/Extensions.Grok/Extensions/Throw.cs"]
225+
url = https://github.com/devlooped/catbag/blob/main/System/Throw.cs
226+
sha = 3012d56be7554c483e5c5d277144c063969cada9
227+
etag = 43c81c6c6dcdf5baee40a9e3edc5e871e473e6c954c901b82bb87a3a48888ea0
228+
weak
229+
[file "src/Extensions.Grok/proto/google/protobuf/timestamp.proto"]
230+
url = https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/timestamp.proto
231+
sha = 71f247a9cf5ddcd310d8aa5e05cea2acc72f9a7f
232+
etag = ca7512680f5ac0d26b57c5cfebca26d906112e3cb02a9739bad342ad70a45ed5
233+
weak
234+
[file "src/Extensions.Grok/proto/google/protobuf/empty.proto"]
235+
url = https://github.com/protocolbuffers/protobuf/blob/main/src/google/protobuf/empty.proto
236+
sha = 407aa2d9319f5db12964540810b446fecc22d419
237+
etag = 0dca55f20a72d3279554837f4eba867a1de37fe0f4a7535c2d9bc43867361cc5
238+
weak

AI.slnx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
</Folder>
1212
<Project Path="src/Agents/Agents.csproj" Id="90827430-b415-47d6-aac9-2dbe4911b348" />
1313
<Project Path="src/Extensions.CodeAnalysis/Extensions.CodeAnalysis.csproj" />
14+
<Project Path="src/Extensions.Grok/Extensions.Grok.csproj" Id="3590dc05-72f0-4ada-823b-60cb7b5ea828" />
1415
<Project Path="src/Extensions/Extensions.csproj" />
1516
<Project Path="src/Tests/Tests.csproj" />
1617
</Solution>
Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
using System.ClientModel;
2+
using System.ClientModel.Primitives;
3+
using System.Diagnostics.CodeAnalysis;
4+
using System.Text;
5+
using Google.Protobuf;
6+
using Grpc.Core;
7+
using System.Buffers;
8+
using System.Buffers.Binary;
9+
10+
namespace Devlooped.Extensions.AI.Grok;
11+
12+
class ClientPipelineCallInvoker(ClientPipeline pipeline, Uri endpoint) : CallInvoker
13+
{
14+
public override TResponse BlockingUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
15+
{
16+
using var message = CreateMessage(method, options, request);
17+
pipeline.Send(message);
18+
19+
var response = message.Response;
20+
EnsureSuccess(response);
21+
22+
using var stream = response.ContentStream;
23+
if (stream == null) throw new RpcException(new Status(StatusCode.Internal, "Response content stream is null"));
24+
return ReadSingleResponse<TResponse>(stream);
25+
}
26+
27+
public override AsyncUnaryCall<TResponse> AsyncUnaryCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
28+
{
29+
var task = SendAsync(method, options, request);
30+
return new AsyncUnaryCall<TResponse>(task, Task.FromResult(new Metadata()), static () => Status.DefaultSuccess, static () => [], static () => { });
31+
}
32+
33+
async Task<TResponse> SendAsync<TRequest, TResponse>(Method<TRequest, TResponse> method, CallOptions options, TRequest request)
34+
where TRequest : class
35+
where TResponse : class
36+
{
37+
using var message = CreateMessage(method, options, request);
38+
await pipeline.SendAsync(message).ConfigureAwait(false);
39+
40+
var response = message.Response;
41+
EnsureSuccess(response);
42+
43+
using var stream = response.ContentStream;
44+
if (stream == null) throw new RpcException(new Status(StatusCode.Internal, "Response content stream is null"));
45+
return await ReadSingleResponseAsync<TResponse>(stream).ConfigureAwait(false);
46+
}
47+
48+
public override AsyncServerStreamingCall<TResponse> AsyncServerStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options, TRequest request)
49+
{
50+
var responseStream = new ClientPipelineResponseStream<TResponse>(pipeline, CreateMessage(method, options, request));
51+
// We need to start the request.
52+
53+
return new AsyncServerStreamingCall<TResponse>(
54+
responseStream,
55+
Task.FromResult(new Metadata()),
56+
() => Status.DefaultSuccess,
57+
() => [],
58+
() => { }
59+
);
60+
}
61+
62+
PipelineMessage CreateMessage<TRequest, TResponse>(Method<TRequest, TResponse> method, CallOptions options, TRequest request)
63+
where TRequest : class
64+
where TResponse : class
65+
{
66+
var message = pipeline.CreateMessage();
67+
message.ResponseClassifier = PipelineMessageClassifier.Create([200]);
68+
var req = message.Request;
69+
req.Method = "POST";
70+
req.Uri = new Uri(endpoint, method.FullName);
71+
req.Headers.Add("Content-Type", "application/grpc");
72+
req.Headers.Add("TE", "trailers");
73+
74+
if (request is IMessage msg)
75+
{
76+
var length = msg.CalculateSize();
77+
var frame = new byte[length + 5];
78+
frame[0] = 0; // No compression
79+
BinaryPrimitives.WriteUInt32BigEndian(frame.AsSpan(1), (uint)length);
80+
msg.WriteTo(frame.AsSpan(5));
81+
req.Content = BinaryContent.Create(BinaryData.FromBytes(frame));
82+
}
83+
84+
if (options.CancellationToken != default)
85+
{
86+
message.Apply(new RequestOptions { CancellationToken = options.CancellationToken });
87+
}
88+
89+
return message;
90+
}
91+
92+
static void EnsureSuccess([NotNull] PipelineResponse? response)
93+
{
94+
if (response == null || response.IsError)
95+
{
96+
throw new RpcException(new Status(StatusCode.Internal, response?.ReasonPhrase ?? "Unknown error"));
97+
}
98+
if (response.Headers.TryGetValue("grpc-status", out var statusStr) && int.TryParse(statusStr, out var status) && status != 0)
99+
{
100+
response.Headers.TryGetValue("grpc-message", out var grpcMessage);
101+
throw new RpcException(new Status((StatusCode)status, grpcMessage ?? "Unknown gRPC error"));
102+
}
103+
}
104+
105+
static TResponse ReadSingleResponse<TResponse>(Stream stream)
106+
{
107+
Span<byte> header = stackalloc byte[5];
108+
var read = 0;
109+
while (read < 5)
110+
{
111+
var r = stream.Read(header.Slice(read));
112+
if (r == 0) throw new IOException("Unexpected end of stream reading gRPC header");
113+
read += r;
114+
}
115+
116+
var length = BinaryPrimitives.ReadUInt32BigEndian(header.Slice(1));
117+
var data = ArrayPool<byte>.Shared.Rent((int)length);
118+
try
119+
{
120+
read = 0;
121+
while (read < length)
122+
{
123+
var r = stream.Read(data, read, (int)(length - read));
124+
if (r == 0) throw new IOException("Unexpected end of stream reading gRPC payload");
125+
read += r;
126+
}
127+
128+
var instance = Activator.CreateInstance<TResponse>();
129+
if (instance is IMessage message)
130+
{
131+
message.MergeFrom(data.AsSpan(0, (int)length));
132+
return (TResponse)message;
133+
}
134+
throw new InvalidOperationException($"Type {typeof(TResponse)} is not a Protobuf Message.");
135+
}
136+
finally
137+
{
138+
ArrayPool<byte>.Shared.Return(data);
139+
}
140+
}
141+
142+
static async Task<TResponse> ReadSingleResponseAsync<TResponse>(Stream stream)
143+
{
144+
var header = new byte[5];
145+
var read = 0;
146+
while (read < 5)
147+
{
148+
var r = await stream.ReadAsync(header.AsMemory(read, 5 - read)).ConfigureAwait(false);
149+
if (r == 0) throw new IOException("Unexpected end of stream reading gRPC header");
150+
read += r;
151+
}
152+
153+
var length = BinaryPrimitives.ReadUInt32BigEndian(header.AsSpan(1));
154+
var data = ArrayPool<byte>.Shared.Rent((int)length);
155+
try
156+
{
157+
read = 0;
158+
while (read < length)
159+
{
160+
var r = await stream.ReadAsync(data.AsMemory(read, (int)(length - read))).ConfigureAwait(false);
161+
if (r == 0) throw new IOException("Unexpected end of stream reading gRPC payload");
162+
read += r;
163+
}
164+
165+
var instance = Activator.CreateInstance<TResponse>();
166+
if (instance is IMessage message)
167+
{
168+
message.MergeFrom(data.AsSpan(0, (int)length));
169+
return (TResponse)message;
170+
}
171+
throw new InvalidOperationException($"Type {typeof(TResponse)} is not a Protobuf Message.");
172+
}
173+
finally
174+
{
175+
ArrayPool<byte>.Shared.Return(data);
176+
}
177+
}
178+
179+
public override AsyncClientStreamingCall<TRequest, TResponse> AsyncClientStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
180+
{
181+
throw new NotSupportedException("Client streaming is not supported over this adapter.");
182+
}
183+
184+
public override AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>(Method<TRequest, TResponse> method, string? host, CallOptions options)
185+
{
186+
throw new NotSupportedException("Duplex streaming is not supported over this adapter.");
187+
}
188+
}
189+
190+
class ClientPipelineResponseStream<TResponse>(ClientPipeline pipeline, PipelineMessage message) : IAsyncStreamReader<TResponse> where TResponse : class
191+
{
192+
IAsyncEnumerator<TResponse>? enumerator;
193+
TResponse? current;
194+
195+
public Task StartAsync() => Task.CompletedTask;
196+
197+
public TResponse Current => current ?? throw new InvalidOperationException("No current element");
198+
199+
public async Task<bool> MoveNext(CancellationToken cancellationToken)
200+
{
201+
if (enumerator == null)
202+
{
203+
await pipeline.SendAsync(message).ConfigureAwait(false);
204+
var response = message.Response;
205+
206+
if (response == null || response.IsError)
207+
{
208+
throw new RpcException(new Status(StatusCode.Internal, response?.ReasonPhrase ?? "Unknown error"));
209+
}
210+
if (response.Headers.TryGetValue("grpc-status", out var statusStr) && int.TryParse(statusStr, out var status) && status != 0)
211+
{
212+
response.Headers.TryGetValue("grpc-message", out var grpcMessage);
213+
throw new RpcException(new Status((StatusCode)status, grpcMessage ?? "Unknown gRPC error"));
214+
}
215+
216+
enumerator = ReadStream(response.ContentStream!, cancellationToken).GetAsyncEnumerator(cancellationToken);
217+
}
218+
219+
if (await enumerator.MoveNextAsync().ConfigureAwait(false))
220+
{
221+
current = enumerator.Current;
222+
return true;
223+
}
224+
225+
return false;
226+
}
227+
228+
async IAsyncEnumerable<TResponse> ReadStream(Stream stream, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken)
229+
{
230+
var header = new byte[5];
231+
while (true)
232+
{
233+
var read = 0;
234+
while (read < 5)
235+
{
236+
var r = await stream.ReadAsync(header, read, 5 - read, cancellationToken).ConfigureAwait(false);
237+
if (r == 0)
238+
{
239+
if (read == 0) yield break; // End of stream
240+
throw new IOException("Unexpected end of stream reading gRPC header");
241+
}
242+
read += r;
243+
}
244+
245+
var length = BinaryPrimitives.ReadUInt32BigEndian(header.AsSpan(1));
246+
var data = ArrayPool<byte>.Shared.Rent((int)length);
247+
try
248+
{
249+
read = 0;
250+
while (read < length)
251+
{
252+
var r = await stream.ReadAsync(data, read, (int)(length - read), cancellationToken).ConfigureAwait(false);
253+
if (r == 0) throw new IOException("Unexpected end of stream reading gRPC payload");
254+
read += r;
255+
}
256+
257+
var instance = Activator.CreateInstance<TResponse>();
258+
if (instance is IMessage message)
259+
{
260+
message.MergeFrom(data.AsSpan(0, (int)length));
261+
yield return (TResponse)message;
262+
}
263+
}
264+
finally
265+
{
266+
ArrayPool<byte>.Shared.Return(data);
267+
}
268+
}
269+
}
270+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net8.0</TargetFramework>
5+
<!--<TargetFramework>net10.0</TargetFramework>-->
6+
<!--<TargetFrameworks>net8.0;net9.0;net10.0</TargetFrameworks>-->
7+
<AssemblyName>Devlooped.Extensions.AI.Grok</AssemblyName>
8+
<RootNamespace>$(AssemblyName)</RootNamespace>
9+
<PackageId>$(AssemblyName)</PackageId>
10+
<Description>Grok implementation for Microsoft.Extensions.AI</Description>
11+
<PackageLicenseExpression></PackageLicenseExpression>
12+
<PackageLicenseFile>OSMFEULA.txt</PackageLicenseFile>
13+
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
14+
<EmitCompilerGeneratedFiles>true</EmitCompilerGeneratedFiles>
15+
</PropertyGroup>
16+
17+
<ItemGroup>
18+
<PackageReference Include="Google.Protobuf" Version="3.33.1" />
19+
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />
20+
<PackageReference Include="Grpc.Tools" Version="2.76.0" PrivateAssets="all" />
21+
<PackageReference Include="NuGetizer" Version="1.4.5" PrivateAssets="all" />
22+
<PackageReference Include="Microsoft.Extensions.AI.Abstractions" Version="10.0.0" />
23+
<PackageReference Include="System.ClientModel" Version="1.8.1" />
24+
</ItemGroup>
25+
26+
<ItemGroup>
27+
<ProjectReference Include="..\Extensions\Extensions.csproj" />
28+
</ItemGroup>
29+
30+
<ItemGroup>
31+
<Protobuf Include="proto\*.proto" GrpcServices="Client" ProtoRoot="proto" />
32+
</ItemGroup>
33+
34+
</Project>

0 commit comments

Comments
 (0)