diff --git a/packages/cli/src/config/config.ts b/packages/cli/src/config/config.ts index 53814697da3..3825a884a15 100755 --- a/packages/cli/src/config/config.ts +++ b/packages/cli/src/config/config.ts @@ -242,7 +242,7 @@ export async function parseArguments(settings: Settings): Promise { .option('output-format', { type: 'string', description: 'The format of the CLI output.', - choices: ['text', 'json'], + choices: ['text', 'json', 'stream-json'], }) .option('resume', { alias: 'r', diff --git a/packages/cli/src/nonInteractiveCli.ts b/packages/cli/src/nonInteractiveCli.ts index d8948d8a5ce..0edcbe7c1f5 100644 --- a/packages/cli/src/nonInteractiveCli.ts +++ b/packages/cli/src/nonInteractiveCli.ts @@ -14,7 +14,9 @@ import { promptIdContext, OutputFormat, JsonFormatter, + StreamJsonFormatter, uiTelemetryService, + streamingTelemetryService, } from '@blocksuser/gemini-cli-core'; import type { Content, Part } from '@google/genai'; @@ -41,6 +43,18 @@ export async function runNonInteractive( try { consolePatcher.patch(); + + // Set up streaming telemetry for stream-json format + const isStreamJsonFormat = config.getOutputFormat() === OutputFormat.STREAM_JSON; + let streamJsonFormatter: StreamJsonFormatter | undefined; + + if (isStreamJsonFormat) { + streamJsonFormatter = new StreamJsonFormatter(); + streamingTelemetryService.enable(); + streamingTelemetryService.addTelemetryListener((event) => { + process.stdout.write(streamJsonFormatter!.formatTelemetryBlock(event) + '\n'); + }); + } // Handle EPIPE errors when the output is piped to a command that closes early. process.stdout.on('error', (err: NodeJS.ErrnoException) => { if (err.code === 'EPIPE') { @@ -123,6 +137,11 @@ export async function runNonInteractive( if (event.type === GeminiEventType.Content) { if (config.getOutputFormat() === OutputFormat.JSON) { responseText += event.value; + } else if (config.getOutputFormat() === OutputFormat.STREAM_JSON) { + responseText += event.value; + if (streamJsonFormatter) { + process.stdout.write(streamJsonFormatter.formatContentBlock(event.value) + '\n'); + } } else { process.stdout.write(event.value); } @@ -162,6 +181,11 @@ export async function runNonInteractive( const formatter = new JsonFormatter(); const stats = uiTelemetryService.getMetrics(); process.stdout.write(formatter.format(responseText, stats)); + } else if (config.getOutputFormat() === OutputFormat.STREAM_JSON) { + if (streamJsonFormatter) { + const stats = uiTelemetryService.getMetrics(); + process.stdout.write(streamJsonFormatter.formatFinalBlock(responseText, stats) + '\n'); + } } else { process.stdout.write('\n'); // Ensure a final newline } @@ -172,6 +196,9 @@ export async function runNonInteractive( handleError(error, config); } finally { consolePatcher.cleanup(); + if (config.getOutputFormat() === OutputFormat.STREAM_JSON) { + streamingTelemetryService.disable(); + } if (isTelemetrySdkInitialized()) { await shutdownTelemetry(config); } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index e076d090c99..3060c074818 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -8,6 +8,7 @@ export * from './config/config.js'; export * from './output/types.js'; export * from './output/json-formatter.js'; +export * from './output/stream-json-formatter.js'; // Export Core Logic export * from './core/client.js'; diff --git a/packages/core/src/output/stream-json-formatter.ts b/packages/core/src/output/stream-json-formatter.ts new file mode 100644 index 00000000000..25256912ab7 --- /dev/null +++ b/packages/core/src/output/stream-json-formatter.ts @@ -0,0 +1,77 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import stripAnsi from 'strip-ansi'; +import type { SessionMetrics } from '../telemetry/uiTelemetry.js'; +import type { JsonError } from './types.js'; +import type { TelemetryEvent } from '../telemetry/types.js'; + +export interface StreamJsonTelemetryBlock { + type: 'telemetry'; + event: TelemetryEvent; +} + +export interface StreamJsonContentBlock { + type: 'content'; + content: string; +} + +export interface StreamJsonFinalBlock { + type: 'final'; + response?: string; + stats?: SessionMetrics; + error?: JsonError; +} + +export type StreamJsonBlock = StreamJsonTelemetryBlock | StreamJsonContentBlock | StreamJsonFinalBlock; + +export class StreamJsonFormatter { + formatTelemetryBlock(event: TelemetryEvent): string { + const block: StreamJsonTelemetryBlock = { + type: 'telemetry', + event, + }; + return JSON.stringify(block); + } + + formatContentBlock(content: string): string { + const block: StreamJsonContentBlock = { + type: 'content', + content: stripAnsi(content), + }; + return JSON.stringify(block); + } + + formatFinalBlock(response?: string, stats?: SessionMetrics, error?: JsonError): string { + const block: StreamJsonFinalBlock = { + type: 'final', + }; + + if (response !== undefined) { + block.response = stripAnsi(response); + } + + if (stats) { + block.stats = stats; + } + + if (error) { + block.error = error; + } + + return JSON.stringify(block); + } + + formatError(error: Error, code?: string | number): string { + const jsonError: JsonError = { + type: error.constructor.name, + message: stripAnsi(error.message), + ...(code && { code }), + }; + + return this.formatFinalBlock(undefined, undefined, jsonError); + } +} \ No newline at end of file diff --git a/packages/core/src/output/types.ts b/packages/core/src/output/types.ts index 08477d21ed5..0c7593dd4ba 100644 --- a/packages/core/src/output/types.ts +++ b/packages/core/src/output/types.ts @@ -9,6 +9,7 @@ import type { SessionMetrics } from '../telemetry/uiTelemetry.js'; export enum OutputFormat { TEXT = 'text', JSON = 'json', + STREAM_JSON = 'stream-json', } export interface JsonError { diff --git a/packages/core/src/telemetry/index.ts b/packages/core/src/telemetry/index.ts index 01317afcce2..2a22f684cf4 100644 --- a/packages/core/src/telemetry/index.ts +++ b/packages/core/src/telemetry/index.ts @@ -54,3 +54,5 @@ export { SemanticAttributes } from '@opentelemetry/semantic-conventions'; export * from './uiTelemetry.js'; export { HighWaterMarkTracker } from './high-water-mark-tracker.js'; export { RateLimiter } from './rate-limiter.js'; +export { streamingTelemetryService } from './streamingTelemetry.js'; +export type { TelemetryStreamListener } from './streamingTelemetry.js'; diff --git a/packages/core/src/telemetry/loggers.ts b/packages/core/src/telemetry/loggers.ts index aecd331d857..5f556a53ecf 100644 --- a/packages/core/src/telemetry/loggers.ts +++ b/packages/core/src/telemetry/loggers.ts @@ -66,6 +66,7 @@ import { import { isTelemetrySdkInitialized } from './sdk.js'; import type { UiEvent } from './uiTelemetry.js'; import { uiTelemetryService } from './uiTelemetry.js'; +import { streamingTelemetryService } from './streamingTelemetry.js'; import { ClearcutLogger } from './clearcut-logger/clearcut-logger.js'; import { safeJsonStringify } from '../utils/safeJsonStringify.js'; import { UserAccountManager } from '../utils/userAccountManager.js'; @@ -119,6 +120,7 @@ export function logCliConfiguration( } export function logUserPrompt(config: Config, event: UserPromptEvent): void { + streamingTelemetryService.emitEvent(event); ClearcutLogger.getInstance(config)?.logNewPromptEvent(event); if (!isTelemetrySdkInitialized()) return; @@ -147,6 +149,7 @@ export function logUserPrompt(config: Config, event: UserPromptEvent): void { } export function logToolCall(config: Config, event: ToolCallEvent): void { + streamingTelemetryService.emitEvent(event); const uiEvent = { ...event, 'event.name': EVENT_TOOL_CALL, @@ -359,6 +362,7 @@ export function logApiError(config: Config, event: ApiErrorEvent): void { } export function logApiResponse(config: Config, event: ApiResponseEvent): void { + streamingTelemetryService.emitEvent(event); const uiEvent = { ...event, 'event.name': EVENT_API_RESPONSE, diff --git a/packages/core/src/telemetry/streamingTelemetry.ts b/packages/core/src/telemetry/streamingTelemetry.ts new file mode 100644 index 00000000000..eab21041425 --- /dev/null +++ b/packages/core/src/telemetry/streamingTelemetry.ts @@ -0,0 +1,44 @@ +/** + * @license + * Copyright 2025 Google LLC + * SPDX-License-Identifier: Apache-2.0 + */ + +import { EventEmitter } from 'node:events'; +import type { TelemetryEvent } from './types.js'; + +export interface TelemetryStreamListener { + (event: TelemetryEvent): void; +} + +class StreamingTelemetryService extends EventEmitter { + private enabled = false; + + enable(): void { + this.enabled = true; + } + + disable(): void { + this.enabled = false; + } + + isEnabled(): boolean { + return this.enabled; + } + + addTelemetryListener(listener: TelemetryStreamListener): void { + this.on('telemetry', listener); + } + + removeTelemetryListener(listener: TelemetryStreamListener): void { + this.off('telemetry', listener); + } + + emitEvent(event: TelemetryEvent): void { + if (this.enabled) { + this.emit('telemetry', event); + } + } +} + +export const streamingTelemetryService = new StreamingTelemetryService(); \ No newline at end of file