Skip to content

Commit 617eaa9

Browse files
committed
Add public API for observing the client pipeline
A helper `ClientOptions.Observable` provides a shortcut for doing `new OpenAIClientOptions().Observe(...)`.
1 parent 37e6533 commit 617eaa9

File tree

6 files changed

+134
-100
lines changed

6 files changed

+134
-100
lines changed
Lines changed: 9 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,21 @@
11
using System.ClientModel.Primitives;
22
using System.Text.Json;
3-
using System.Text.Json.Nodes;
43

54
namespace Devlooped.Extensions.AI;
65

76
public static class PipelineTestOutput
87
{
9-
/// <summary>
10-
/// Sets a <see cref="ClientPipelineOptions.Transport"/> that renders HTTP messages to the
11-
/// console using Spectre.Console rich JSON formatting, but only if the console is interactive.
12-
/// </summary>
13-
/// <typeparam name="TOptions">The options type to configure for HTTP logging.</typeparam>
14-
/// <param name="pipelineOptions">The options instance to configure.</param>
15-
/// <param name="output">The test output helper to write to.</param>
16-
/// <param name="onRequest">A callback to process the <see cref="JsonNode"/> that was sent.</param>
17-
/// <param name="onResponse">A callback to process the <see cref="JsonNode"/> that was received.</param>
18-
/// <remarks>
19-
/// NOTE: this is the lowst-level logging after all chat pipeline processing has been done.
20-
/// <para>
21-
/// If the options already provide a transport, it will be wrapped with the console
22-
/// logging transport to minimize the impact on existing configurations.
23-
/// </para>
24-
/// </remarks>
25-
public static TOptions WriteTo<TOptions>(this TOptions pipelineOptions, ITestOutputHelper? output = default, Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default)
26-
where TOptions : ClientPipelineOptions
27-
{
28-
pipelineOptions.AddPolicy(new TestOutputPolicy(output ?? NullTestOutputHelper.Default, onRequest, onResponse), PipelinePosition.BeforeTransport);
29-
return pipelineOptions;
30-
}
31-
32-
class NullTestOutputHelper : ITestOutputHelper
8+
static readonly JsonSerializerOptions options = new(JsonSerializerDefaults.General)
339
{
34-
public static ITestOutputHelper Default { get; } = new NullTestOutputHelper();
35-
NullTestOutputHelper() { }
36-
public void WriteLine(string message) { }
37-
public void WriteLine(string format, params object[] args) { }
38-
}
10+
WriteIndented = true,
11+
};
3912

40-
class TestOutputPolicy(ITestOutputHelper output, Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default) : PipelinePolicy
13+
public static TOptions WriteTo<TOptions>(this TOptions pipelineOptions, ITestOutputHelper output = default)
14+
where TOptions : ClientPipelineOptions
4115
{
42-
static readonly JsonSerializerOptions options = new JsonSerializerOptions(JsonSerializerDefaults.General)
43-
{
44-
WriteIndented = true,
45-
};
46-
47-
public override void Process(PipelineMessage message, IReadOnlyList<PipelinePolicy> pipeline, int currentIndex)
48-
{
49-
message.BufferResponse = true;
50-
ProcessNext(message, pipeline, currentIndex);
51-
52-
if (message.Request.Content is not null)
53-
{
54-
using var memory = new MemoryStream();
55-
message.Request.Content.WriteTo(memory);
56-
memory.Position = 0;
57-
using var reader = new StreamReader(memory);
58-
var content = reader.ReadToEnd();
59-
var node = JsonNode.Parse(content);
60-
onRequest?.Invoke(node!);
61-
output?.WriteLine(node!.ToJsonString(options));
62-
}
63-
64-
if (message.Response != null)
65-
{
66-
var node = JsonNode.Parse(message.Response.Content.ToString());
67-
onResponse?.Invoke(node!);
68-
output?.WriteLine(node!.ToJsonString(options));
69-
}
70-
}
71-
72-
public override async ValueTask ProcessAsync(PipelineMessage message, IReadOnlyList<PipelinePolicy> pipeline, int currentIndex)
73-
{
74-
message.BufferResponse = true;
75-
await ProcessNextAsync(message, pipeline, currentIndex);
76-
77-
if (message.Request.Content is not null)
78-
{
79-
using var memory = new MemoryStream();
80-
message.Request.Content.WriteTo(memory);
81-
memory.Position = 0;
82-
using var reader = new StreamReader(memory);
83-
var content = await reader.ReadToEndAsync();
84-
var node = JsonNode.Parse(content);
85-
onRequest?.Invoke(node!);
86-
output?.WriteLine(node!.ToJsonString(options));
87-
}
88-
89-
if (message.Response != null)
90-
{
91-
var node = JsonNode.Parse(message.Response.Content.ToString());
92-
onResponse?.Invoke(node!);
93-
output?.WriteLine(node!.ToJsonString(options));
94-
}
95-
}
16+
return pipelineOptions.Observe(
17+
request => output.WriteLine(request.ToJsonString(options)),
18+
response => output.WriteLine(response.ToJsonString(options))
19+
);
9620
}
9721
}

src/AI.Tests/GrokTests.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public async Task GrokInvokesToolAndSearch()
5252
var responses = new List<JsonNode>();
5353

5454
var grok = new GrokChatClient(Configuration["XAI_API_KEY"]!, "grok-3",
55-
new OpenAI.OpenAIClientOptions().WriteTo(output, requests.Add, responses.Add))
55+
ClientOptions.Observable(requests.Add, responses.Add)
56+
.WriteTo(output))
5657
.AsBuilder()
5758
.UseFunctionInvocation()
5859
.Build();
@@ -105,7 +106,8 @@ public async Task GrokInvokesHostedSearchTool()
105106
var responses = new List<JsonNode>();
106107

107108
var grok = new GrokChatClient(Configuration["XAI_API_KEY"]!, "grok-3",
108-
new OpenAI.OpenAIClientOptions().WriteTo(output, requests.Add, responses.Add));
109+
ClientOptions.Observable(requests.Add, responses.Add)
110+
.WriteTo(output));
109111

110112
var options = new ChatOptions
111113
{
@@ -169,4 +171,4 @@ public async Task GrokThinksHard()
169171
// different model and the grok client honors that choice.
170172
Assert.StartsWith("grok-3-mini", response.ModelId);
171173
}
172-
}
174+
}

src/AI.Tests/OpenAITests.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ public async Task OpenAISwitchesModel()
1414
{ "user", "What products does Tesla make?" },
1515
};
1616

17-
var chat = new OpenAIChatClient(Configuration["OPENAI_API_KEY"]!, "gpt-4.1-nano", new OpenAI.OpenAIClientOptions().WriteTo(output));
17+
var chat = new OpenAIChatClient(Configuration["OPENAI_API_KEY"]!, "gpt-4.1-nano",
18+
new OpenAI.OpenAIClientOptions().WriteTo(output));
1819

1920
var options = new ChatOptions
2021
{
@@ -39,8 +40,8 @@ public async Task OpenAIThinks()
3940

4041
var requests = new List<JsonNode>();
4142

42-
var chat = new OpenAIChatClient(Configuration["OPENAI_API_KEY"]!, "o3-mini", new OpenAI.OpenAIClientOptions()
43-
.WriteTo(output, requests.Add));
43+
var chat = new OpenAIChatClient(Configuration["OPENAI_API_KEY"]!, "o3-mini",
44+
ClientOptions.Observable(requests.Add).WriteTo(output));
4445

4546
var options = new ChatOptions
4647
{

src/AI.Tests/RetrievalTests.cs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Linq;
4-
using System.Text;
5-
using System.Threading.Tasks;
6-
using Microsoft.Extensions.AI;
1+
using Microsoft.Extensions.AI;
72
using OpenAI.Responses;
83
using static ConfigurationExtensions;
94

105
namespace Devlooped.Extensions.AI;
116

127
public class RetrievalTests(ITestOutputHelper output)
138
{
14-
[SecretsTheory("OpenAI:Key")]
9+
[SecretsTheory("OPENAI_API_KEY")]
1510
[InlineData("gpt-4.1-nano", "Qué es la rebeldía en el Código Procesal Civil y Comercial Nacional?")]
1611
[InlineData("gpt-4.1-nano", "What's the battery life in an iPhone 15?", true)]
1712
public async Task CanRetrieveContent(string model, string question, bool empty = false)
1813
{
19-
var client = new OpenAI.OpenAIClient(Configuration["OpenAI:Key"]);
14+
var client = new OpenAI.OpenAIClient(Configuration["OPENAI_API_KEY"]);
2015
var store = client.GetVectorStoreClient().CreateVectorStore(true);
2116
try
2217
{
@@ -25,7 +20,7 @@ public async Task CanRetrieveContent(string model, string question, bool empty =
2520
{
2621
client.GetVectorStoreClient().AddFileToVectorStore(store.VectorStoreId, file.Value.Id, true);
2722

28-
var responses = new OpenAIResponseClient(model, Configuration["OpenAI:Key"]);
23+
var responses = new OpenAIResponseClient(model, Configuration["OPENAI_API_KEY"]);
2924

3025
var chat = responses.AsIChatClient(
3126
ResponseTool.CreateFileSearchTool([store.VectorStoreId]))

src/AI/ClientOptions.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System.ClientModel.Primitives;
2+
using System.Text.Json.Nodes;
3+
using OpenAI;
4+
5+
namespace Devlooped.Extensions.AI;
6+
7+
/// <summary>
8+
/// Shortcut factory methods for creating <see cref="ClientPipelineOptions"/> like
9+
/// <see cref="OpenAIClientOptions"/> that provide convenient initialization options.
10+
/// </summary>
11+
public static class ClientOptions
12+
{
13+
/// <summary>
14+
/// Creates an obserbable <see cref="OpenAIClientOptions"/> instance that can
15+
/// be used to log requests and responses.
16+
/// </summary>
17+
/// <param name="onRequest">A callback to process the <see cref="JsonNode"/> that was sent.</param>
18+
/// <param name="onResponse">A callback to process the <see cref="JsonNode"/> that was received.</param>
19+
public static OpenAIClientOptions Observable(Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default)
20+
=> Observable<OpenAIClientOptions>(onRequest, onResponse);
21+
22+
/// <summary>
23+
/// Creates an obserbable <see cref="ClientPipelineOptions"/>-derived instance
24+
/// that can be used to log requests and responses.
25+
/// </summary>
26+
/// <param name="onRequest">A callback to process the <see cref="JsonNode"/> that was sent.</param>
27+
/// <param name="onResponse">A callback to process the <see cref="JsonNode"/> that was received.</param>
28+
public static TOptions Observable<TOptions>(Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default)
29+
where TOptions : ClientPipelineOptions, new()
30+
=> new TOptions().Observe(onRequest, onResponse);
31+
}

src/AI/ClientPipelineExtensions.cs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using System.ClientModel.Primitives;
2+
using System.Text.Json;
3+
using System.Text.Json.Nodes;
4+
5+
namespace Devlooped.Extensions.AI;
6+
7+
public static class ClientPipelineExtensions
8+
{
9+
/// <summary>
10+
/// Adds a <see cref="PipelinePolicy"/> that observes requests and response
11+
/// messages from the <see cref="ClientPipeline"/> and notifies the provided
12+
/// callbacks with the JSON representation of the HTTP messages.
13+
/// </summary>
14+
/// <typeparam name="TOptions">The options type to configure for HTTP logging.</typeparam>
15+
/// <param name="pipelineOptions">The options instance to configure.</param>
16+
/// <param name="onRequest">A callback to process the <see cref="JsonNode"/> that was sent.</param>
17+
/// <param name="onResponse">A callback to process the <see cref="JsonNode"/> that was received.</param>
18+
/// <remarks>
19+
/// This is the lowst-level logging after all chat pipeline processing has been done.
20+
/// If no <see cref="JsonNode"/> can be parsed from the request or response,
21+
/// the callbacks will not be invoked.
22+
/// </remarks>
23+
public static TOptions Observe<TOptions>(this TOptions pipelineOptions,
24+
Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default)
25+
where TOptions : ClientPipelineOptions
26+
{
27+
pipelineOptions.AddPolicy(new ObservePipelinePolicy(onRequest, onResponse), PipelinePosition.BeforeTransport);
28+
return pipelineOptions;
29+
}
30+
31+
class ObservePipelinePolicy(Action<JsonNode>? onRequest = default, Action<JsonNode>? onResponse = default) : PipelinePolicy
32+
{
33+
public override void Process(PipelineMessage message, IReadOnlyList<PipelinePolicy> pipeline, int currentIndex)
34+
{
35+
message.BufferResponse = true;
36+
ProcessNext(message, pipeline, currentIndex);
37+
NotifyObservers(message);
38+
}
39+
40+
public override async ValueTask ProcessAsync(PipelineMessage message, IReadOnlyList<PipelinePolicy> pipeline, int currentIndex)
41+
{
42+
message.BufferResponse = true;
43+
await ProcessNextAsync(message, pipeline, currentIndex);
44+
NotifyObservers(message);
45+
}
46+
47+
void NotifyObservers(PipelineMessage message)
48+
{
49+
if (onRequest != null && message.Request.Content != null)
50+
{
51+
using var memory = new MemoryStream();
52+
message.Request.Content.WriteTo(memory);
53+
memory.Position = 0;
54+
using var reader = new StreamReader(memory);
55+
var content = reader.ReadToEnd();
56+
try
57+
{
58+
if (JsonNode.Parse(content) is { } node)
59+
onRequest.Invoke(node!);
60+
}
61+
catch (JsonException)
62+
{
63+
// We ignore invalid JSON
64+
}
65+
}
66+
67+
if (onResponse != null && message.Response != null)
68+
{
69+
try
70+
{
71+
if (JsonNode.Parse(message.Response.Content.ToString()) is { } node)
72+
onResponse.Invoke(node!);
73+
}
74+
catch (JsonException)
75+
{
76+
// We ignore invalid JSON
77+
}
78+
}
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)