Skip to content

Commit b88e9b1

Browse files
Forceflush fix (#474)
* fix * fix * changeset * fix * fix
1 parent dd59fd7 commit b88e9b1

File tree

9 files changed

+23
-20
lines changed

9 files changed

+23
-20
lines changed

.changeset/quick-moments-juggle.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@inkeep/agents-run-api": patch
3+
---
4+
5+
force flush fix

agents-run-api/src/agents/Agent.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ import { toolSessionManager } from './ToolSessionManager';
6060
import type { SystemPromptV1 } from './types';
6161
import { Phase1Config } from './versions/v1/Phase1Config';
6262
import { Phase2Config } from './versions/v1/Phase2Config';
63-
import { defaultBatchProcessor } from '../instrumentation';
6463

6564
/**
6665
* Creates a stopWhen condition that stops when any tool call name starts with the given prefix
@@ -750,7 +749,6 @@ export class Agent {
750749
requestContext: requestContext || {},
751750
tenantId: this.config.tenantId,
752751
});
753-
await defaultBatchProcessor.forceFlush();
754752

755753
// Add built-in variables to resolved context
756754
const contextWithBuiltins = {
@@ -1473,7 +1471,6 @@ export class Agent {
14731471
throw err;
14741472
} finally {
14751473
childSpan.end();
1476-
await defaultBatchProcessor.forceFlush();
14771474
}
14781475
}
14791476
);
@@ -2044,7 +2041,6 @@ ${output}${structureHintsFormatted}`;
20442041
// Mark span as successful
20452042
span.setStatus({ code: SpanStatusCode.OK });
20462043
span.end();
2047-
await defaultBatchProcessor.forceFlush();
20482044

20492045
// Format response - handle object vs text responses differently
20502046
// Only format if we don't already have formattedContent from streaming

agents-run-api/src/app.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { cors } from 'hono/cors';
1212
import { HTTPException } from 'hono/http-exception';
1313
import { requestId } from 'hono/request-id';
1414
import type { StatusCode } from 'hono/utils/http-status';
15-
import { defaultBatchProcessor } from './instrumentation';
15+
import { flushBatchProcessor } from './instrumentation';
1616
import { getLogger } from './logger';
1717
import { apiKeyAuth } from './middleware/api-key-auth';
1818
import { setupOpenAPIRoutes } from './openapi';
@@ -263,19 +263,19 @@ function createExecutionHono(
263263

264264
app.use('/tenants/*', async (_c, next) => {
265265
await next();
266-
await defaultBatchProcessor.forceFlush();
266+
await flushBatchProcessor();
267267
});
268268
app.use('/agents/*', async (_c, next) => {
269269
await next();
270-
await defaultBatchProcessor.forceFlush();
270+
await flushBatchProcessor();
271271
});
272272
app.use('/v1/*', async (_c, next) => {
273273
await next();
274-
await defaultBatchProcessor.forceFlush();
274+
await flushBatchProcessor();
275275
});
276276
app.use('/api/*', async (_c, next) => {
277277
await next();
278-
await defaultBatchProcessor.forceFlush();
278+
await flushBatchProcessor();
279279
});
280280

281281
const baseApp = new Hono();

agents-run-api/src/env.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ const envSchema = z.object({
1919
OPENAI_API_KEY: z.string().optional(),
2020
ANTHROPIC_API_KEY: z.string(),
2121
INKEEP_AGENTS_RUN_API_BYPASS_SECRET: z.string().optional(),
22+
OTEL_BSP_SCHEDULE_DELAY: z.coerce.number().optional().default(500),
23+
OTEL_BSP_MAX_EXPORT_BATCH_SIZE: z.coerce.number().optional().default(64),
2224
});
2325

2426
const parseEnv = () => {

agents-run-api/src/handlers/executionHandler.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import { agentInitializingOp, completionOp, errorOp } from '../utils/agent-opera
2020
import type { StreamHelper } from '../utils/stream-helpers.js';
2121
import { MCPStreamHelper } from '../utils/stream-helpers.js';
2222
import { registerStreamHelper, unregisterStreamHelper } from '../utils/stream-registry.js';
23-
import { defaultBatchProcessor } from '../instrumentation';
2423

2524
const logger = getLogger('ExecutionHandler');
2625

@@ -454,7 +453,6 @@ export class ExecutionHandler {
454453
throw error;
455454
} finally {
456455
span.end();
457-
await defaultBatchProcessor.forceFlush();
458456
}
459457
});
460458
}

agents-run-api/src/instrumentation.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
NoopSpanProcessor,
2020
} from '@opentelemetry/sdk-trace-base';
2121
import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
22+
import { env } from './env';
2223
import { getLogger } from './logger';
2324

2425
const otlpExporter = new OTLPTraceExporter();
@@ -29,7 +30,8 @@ const logger = getLogger('instrumentation');
2930
function createSafeBatchProcessor(): SpanProcessor {
3031
try {
3132
return new BatchSpanProcessor(otlpExporter, {
32-
scheduledDelayMillis: 1000,
33+
scheduledDelayMillis: env.OTEL_BSP_SCHEDULE_DELAY,
34+
maxExportBatchSize: env.OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
3335
});
3436
} catch (error) {
3537
logger.warn({ error }, 'Failed to create batch processor');
@@ -84,3 +86,11 @@ export const defaultSDK = new NodeSDK({
8486
spanProcessors: defaultSpanProcessors,
8587
instrumentations: defaultInstrumentations,
8688
});
89+
90+
export async function flushBatchProcessor(): Promise<void> {
91+
try {
92+
await defaultBatchProcessor.forceFlush();
93+
} catch (error) {
94+
logger.warn({ error }, 'Failed to flush batch processor');
95+
}
96+
}

agents-run-api/src/routes/chatDataStream.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { ExecutionHandler } from '../handlers/executionHandler';
2222
import { getLogger } from '../logger';
2323
import { errorOp } from '../utils/agent-operations';
2424
import { createVercelStreamHelper } from '../utils/stream-helpers';
25-
import { defaultBatchProcessor } from '../instrumentation';
2625

2726
type AppVariables = {
2827
credentialStores: CredentialStoreRegistry;
@@ -170,7 +169,6 @@ app.openapi(chatDataStreamRoute, async (c) => {
170169
dbClient,
171170
credentialStores,
172171
});
173-
await defaultBatchProcessor.forceFlush();
174172

175173
// Store last user message
176174
const lastUserMessage = body.messages.filter((m: any) => m.role === 'user').slice(-1)[0];

agents-run-api/src/services/GraphSession.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { getStreamHelper } from '../utils/stream-registry';
1919
import { setSpanWithError, tracer } from '../utils/tracer';
2020
import { ArtifactParser } from './ArtifactParser';
2121
import { ArtifactService } from './ArtifactService';
22-
import { defaultBatchProcessor } from '../instrumentation';
2322

2423
const logger = getLogger('GraphSession');
2524

@@ -959,7 +958,6 @@ ${this.statusUpdateState?.config.prompt?.trim() || ''}`;
959958
return { summaries: [] };
960959
} finally {
961960
span.end();
962-
await defaultBatchProcessor.forceFlush();
963961
}
964962
}
965963
);
@@ -1474,7 +1472,6 @@ Make it specific and relevant.`;
14741472
} finally {
14751473
// Always end the main span
14761474
span.end();
1477-
await defaultBatchProcessor.forceFlush();
14781475
}
14791476
}
14801477
);

agents-run-api/src/services/ResponseFormatter.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { getLogger } from '../logger';
33
import { ArtifactParser, type StreamPart } from '../services/ArtifactParser';
44
import { tracer, setSpanWithError } from '../utils/tracer';
55
import { graphSessionManager } from '../services/GraphSession';
6-
import { defaultBatchProcessor } from '../instrumentation';
76

87
const logger = getLogger('ResponseFormatter');
98

@@ -84,7 +83,6 @@ export class ResponseFormatter {
8483
};
8584
} finally {
8685
span.end();
87-
await defaultBatchProcessor.forceFlush();
8886
}
8987
});
9088
}
@@ -151,7 +149,6 @@ export class ResponseFormatter {
151149
return { text: responseText };
152150
} finally {
153151
span.end();
154-
await defaultBatchProcessor.forceFlush();
155152
}
156153
});
157154
}

0 commit comments

Comments
 (0)