Skip to content

Commit b0f1ed5

Browse files
committed
Tag streaming enforce calls with stable streamId (0.13.1)
Streaming adapters generate one streamId per stream and inject it, plus a slice index, into metadata on every per-chunk enforce call. Lets cloud dashboards collapse N per-chunk audit rows into one logical operation for reviewers. Additive — pure metadata, no public API change. Applies to pre-post-stream.ts (used by Anthropic, LangChain, Vercel AI, etc.) and mastra-processor-stream.ts (Mastra's per-chunk API).
1 parent a1a545a commit b0f1ed5

3 files changed

Lines changed: 114 additions & 12 deletions

File tree

packages/governance/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "governance-sdk",
3-
"version": "0.13.0",
3+
"version": "0.13.1",
44
"description": "AI Agent Governance for TypeScript — policy enforcement, scoring, compliance, and audit for AI agents",
55
"type": "module",
66
"main": "./dist/index.js",

packages/governance/src/plugins/mastra-processor-stream.ts

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,45 @@ interface StreamState {
4545
slidingBuffer?: string;
4646
/** Chunk count in the rolling window (sliding mode). */
4747
slidingChunks?: number;
48+
/**
49+
* Stable id for this stream. Every per-chunk enforce call carries it
50+
* in metadata so the cloud dashboard can collapse N audit rows into
51+
* one logical operation. Lazily generated — first chunk seen.
52+
*/
53+
streamId?: string;
54+
/** Monotonically increasing slice number for per-chunk audit tags. */
55+
streamSlice?: number;
56+
}
57+
58+
/** Zero-dep UUID generator, same helper shape as supply-chain-cyclonedx. */
59+
function generateStreamId(): string {
60+
if (typeof globalThis.crypto !== "undefined" && globalThis.crypto.randomUUID) {
61+
return `str_${globalThis.crypto.randomUUID()}`;
62+
}
63+
const bytes = new Uint8Array(16);
64+
if (typeof globalThis.crypto !== "undefined" && globalThis.crypto.getRandomValues) {
65+
globalThis.crypto.getRandomValues(bytes);
66+
} else {
67+
for (let i = 0; i < 16; i++) bytes[i] = Math.floor(Math.random() * 256);
68+
}
69+
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
70+
return `str_${hex}`;
71+
}
72+
73+
/**
74+
* Read (and lazily init) the streamId + slice index for this Mastra
75+
* stream. Stored on args.state so subsequent chunks share the same id
76+
* across the whole stream lifecycle. Caller mutates state.streamSlice++
77+
* after consuming.
78+
*/
79+
function getOrCreateStreamMeta(
80+
state: StreamState,
81+
): { streamId: string; slice: number } {
82+
if (!state.streamId) state.streamId = generateStreamId();
83+
if (state.streamSlice == null) state.streamSlice = 0;
84+
const slice = state.streamSlice;
85+
state.streamSlice = slice + 1;
86+
return { streamId: state.streamId, slice };
4887
}
4988

5089
// ─── Entry point ──────────────────────────────────────────────
@@ -100,9 +139,13 @@ async function handlePerChunk(
100139
agentLevel: number,
101140
callbacks: OutcomeCallbacks,
102141
): Promise<MastraStreamChunk | null | undefined> {
142+
const state = (args.state ?? {}) as StreamState;
143+
const streamMeta = getOrCreateStreamMeta(state);
144+
if (args.state && args.state !== state) Object.assign(args.state, state);
145+
103146
const decision = await runScan(
104147
governance, chunkText, config, agentId, agentLevel, callbacks,
105-
"mastra.stream:per-chunk",
148+
"mastra.stream:per-chunk", streamMeta,
106149
);
107150
return applyDecisionToChunk(args, decision, chunkText, config);
108151
}
@@ -124,6 +167,7 @@ async function handleSliding(
124167
callbacks: OutcomeCallbacks,
125168
): Promise<MastraStreamChunk | null | undefined> {
126169
const state = (args.state ?? {}) as StreamState;
170+
const streamMeta = getOrCreateStreamMeta(state);
127171
const lookback = config.streamLookbackChunks ?? 2;
128172
const lookbackChars = config.streamLookbackChars;
129173

@@ -132,7 +176,7 @@ async function handleSliding(
132176

133177
const decision = await runScan(
134178
governance, nextBuffer, config, agentId, agentLevel, callbacks,
135-
"mastra.stream:sliding",
179+
"mastra.stream:sliding", streamMeta,
136180
);
137181

138182
// Trim the buffer once it grows past the lookback budget so it doesn't
@@ -170,6 +214,7 @@ async function runScan(
170214
agentLevel: number,
171215
callbacks: OutcomeCallbacks,
172216
toolName: string,
217+
streamMeta?: { streamId: string; slice: number },
173218
): Promise<EnforcementDecision> {
174219
// enforcePostprocess throws on block / require_approval (via handleOutcome).
175220
// In Mastra's per-chunk API we can't throw — we must call args.abort().
@@ -180,7 +225,12 @@ async function runScan(
180225
agentId,
181226
agentName: config.agentName,
182227
agentLevel,
183-
metadata: config.metadata,
228+
metadata: {
229+
...(config.metadata ?? {}),
230+
...(streamMeta
231+
? { streamId: streamMeta.streamId, streamSlice: streamMeta.slice }
232+
: {}),
233+
},
184234
callbacks,
185235
toolName,
186236
});

packages/governance/src/plugins/pre-post-stream.ts

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,46 @@ import type { OutcomeCallbacks } from "./outcome-handler.js";
4747
import { enforcePostprocess } from "./pre-post-enforce.js";
4848
import type { PrePostEnforceOptions } from "./pre-post-enforce.js";
4949

50+
/**
51+
* Generate a stable stream id. Every enforce() call for a single LLM
52+
* stream carries the same id in metadata so the dashboard can collapse
53+
* N per-chunk rows into one logical "operation" for reviewers. Zero-dep
54+
* UUIDv4 fallback mirrors the one in supply-chain-cyclonedx.ts so the
55+
* SDK's "no runtime deps" rule stays intact.
56+
*/
57+
function generateStreamId(): string {
58+
if (typeof globalThis.crypto !== "undefined" && globalThis.crypto.randomUUID) {
59+
return `str_${globalThis.crypto.randomUUID()}`;
60+
}
61+
const bytes = new Uint8Array(16);
62+
if (typeof globalThis.crypto !== "undefined" && globalThis.crypto.getRandomValues) {
63+
globalThis.crypto.getRandomValues(bytes);
64+
} else {
65+
for (let i = 0; i < 16; i++) bytes[i] = Math.floor(Math.random() * 256);
66+
}
67+
const hex = Array.from(bytes, (b) => b.toString(16).padStart(2, "0")).join("");
68+
return `str_${hex}`;
69+
}
70+
71+
/**
72+
* Merge a streamId + slice index into the options.metadata for each
73+
* per-chunk enforce call. Preserves any caller-supplied metadata.
74+
*/
75+
function withStreamMeta<O extends PrePostEnforceOptions>(
76+
options: O,
77+
streamId: string,
78+
sliceIndex: number,
79+
): O {
80+
return {
81+
...options,
82+
metadata: {
83+
...(options.metadata ?? {}),
84+
streamId,
85+
streamSlice: sliceIndex,
86+
},
87+
};
88+
}
89+
5090
// ─── Types ────────────────────────────────────────────────────
5191

5292
export type StreamMode = "buffered" | "sliding" | "per-chunk";
@@ -88,16 +128,21 @@ export async function* enforcePostprocessStream<ChunkT>(
88128
options: StreamEnforceOptions<ChunkT>,
89129
): AsyncIterable<ChunkT> {
90130
const mode: StreamMode = options.streamMode ?? "buffered";
131+
// One id per stream — every enforce call this helper triggers carries
132+
// it in metadata so the cloud dashboard can collapse repeated rows
133+
// into a single logical operation. Buffered mode also tags its single
134+
// call so buffered-vs-per-chunk audit shape is consistent.
135+
const streamId = generateStreamId();
91136

92137
if (mode === "buffered") {
93-
yield* runBuffered(governance, source, options);
138+
yield* runBuffered(governance, source, options, streamId);
94139
return;
95140
}
96141
if (mode === "per-chunk") {
97-
yield* runPerChunk(governance, source, options);
142+
yield* runPerChunk(governance, source, options, streamId);
98143
return;
99144
}
100-
yield* runSliding(governance, source, options);
145+
yield* runSliding(governance, source, options, streamId);
101146
}
102147

103148
// ─── Buffered mode ────────────────────────────────────────────
@@ -106,6 +151,7 @@ async function* runBuffered<ChunkT>(
106151
governance: GovernanceInstance,
107152
source: AsyncIterable<ChunkT>,
108153
options: StreamEnforceOptions<ChunkT>,
154+
streamId: string,
109155
): AsyncIterable<ChunkT> {
110156
const chunks: ChunkT[] = [];
111157
const texts: string[] = [];
@@ -121,7 +167,7 @@ async function* runBuffered<ChunkT>(
121167
}
122168

123169
const result = await enforcePostprocess(governance, combined, {
124-
...options,
170+
...withStreamMeta(options, streamId, 0),
125171
toolName: options.toolName ?? "stream:buffered",
126172
});
127173

@@ -147,7 +193,9 @@ async function* runPerChunk<ChunkT>(
147193
governance: GovernanceInstance,
148194
source: AsyncIterable<ChunkT>,
149195
options: StreamEnforceOptions<ChunkT>,
196+
streamId: string,
150197
): AsyncIterable<ChunkT> {
198+
let sliceIndex = 0;
151199
for await (const chunk of source) {
152200
const text = options.extractText(chunk);
153201
if (!text) {
@@ -156,7 +204,7 @@ async function* runPerChunk<ChunkT>(
156204
}
157205

158206
const result = await enforcePostprocess(governance, text, {
159-
...options,
207+
...withStreamMeta(options, streamId, sliceIndex++),
160208
toolName: options.toolName ?? "stream:per-chunk",
161209
});
162210

@@ -176,18 +224,20 @@ async function* runSliding<ChunkT>(
176224
governance: GovernanceInstance,
177225
source: AsyncIterable<ChunkT>,
178226
options: StreamEnforceOptions<ChunkT>,
227+
streamId: string,
179228
): AsyncIterable<ChunkT> {
180229
const window: ChunkT[] = [];
181230
const windowTexts: string[] = [];
182231
const lookbackChunks = options.streamLookbackChunks ?? 2;
183232
const lookbackChars = options.streamLookbackChars;
233+
const sliceCounter = { value: 0 };
184234

185235
for await (const chunk of source) {
186236
window.push(chunk);
187237
windowTexts.push(options.extractText(chunk));
188238

189239
while (shouldFlush(window.length, windowTexts, lookbackChunks, lookbackChars)) {
190-
yield* flushOldest(governance, window, windowTexts, options);
240+
yield* flushOldest(governance, window, windowTexts, options, streamId, sliceCounter);
191241
}
192242
}
193243

@@ -200,7 +250,7 @@ async function* runSliding<ChunkT>(
200250
return;
201251
}
202252
const result = await enforcePostprocess(governance, tailText, {
203-
...options,
253+
...withStreamMeta(options, streamId, sliceCounter.value++),
204254
toolName: options.toolName ?? "stream:sliding-tail",
205255
});
206256
if (result.text === tailText) {
@@ -234,6 +284,8 @@ async function* flushOldest<ChunkT>(
234284
window: ChunkT[],
235285
windowTexts: string[],
236286
options: StreamEnforceOptions<ChunkT>,
287+
streamId: string,
288+
sliceCounter: { value: number },
237289
): AsyncIterable<ChunkT> {
238290
// Scan the full lookback window (oldest + lookback tail) before flushing
239291
// the oldest chunk. This gives the scanner context straddling boundaries.
@@ -246,7 +298,7 @@ async function* flushOldest<ChunkT>(
246298
}
247299

248300
const result = await enforcePostprocess(governance, scanText, {
249-
...options,
301+
...withStreamMeta(options, streamId, sliceCounter.value++),
250302
toolName: options.toolName ?? "stream:sliding",
251303
});
252304

0 commit comments

Comments
 (0)