Skip to content

Commit c84d368

Browse files
force flush and 1 sec interval (#468)
* force flush * changeset * changes
1 parent bc14f9f commit c84d368

File tree

7 files changed

+22
-1
lines changed

7 files changed

+22
-1
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@inkeep/agents-run-api": patch
3+
---
4+
5+
force flush

agents-run-api/src/agents/Agent.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import { toolSessionManager } from './ToolSessionManager';
6060
import type { SystemPromptV1 } from './types';
6161
import { Phase1Config } from './versions/v1/Phase1Config';
6262
import { Phase2Config } from './versions/v1/Phase2Config';
63+
import { defaultBatchProcessor } from '../instrumentation';
6364

6465
/**
6566
* Creates a stopWhen condition that stops when any tool call name starts with the given prefix
@@ -749,6 +750,7 @@ export class Agent {
749750
requestContext: requestContext || {},
750751
tenantId: this.config.tenantId,
751752
});
753+
await defaultBatchProcessor.forceFlush();
752754

753755
// Add built-in variables to resolved context
754756
const contextWithBuiltins = {
@@ -1471,6 +1473,7 @@ export class Agent {
14711473
throw err;
14721474
} finally {
14731475
childSpan.end();
1476+
await defaultBatchProcessor.forceFlush();
14741477
}
14751478
}
14761479
);
@@ -2041,6 +2044,7 @@ ${output}${structureHintsFormatted}`;
20412044
// Mark span as successful
20422045
span.setStatus({ code: SpanStatusCode.OK });
20432046
span.end();
2047+
await defaultBatchProcessor.forceFlush();
20442048

20452049
// Format response - handle object vs text responses differently
20462050
// Only format if we don't already have formattedContent from streaming

agents-run-api/src/handlers/executionHandler.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { agentInitializingOp, completionOp, errorOp } from '../utils/agent-opera
2020
import type { StreamHelper } from '../utils/stream-helpers.js';
2121
import { MCPStreamHelper } from '../utils/stream-helpers.js';
2222
import { registerStreamHelper, unregisterStreamHelper } from '../utils/stream-registry.js';
23+
import { defaultBatchProcessor } from '../instrumentation';
2324

2425
const logger = getLogger('ExecutionHandler');
2526

@@ -453,6 +454,7 @@ export class ExecutionHandler {
453454
throw error;
454455
} finally {
455456
span.end();
457+
await defaultBatchProcessor.forceFlush();
456458
}
457459
});
458460
}

agents-run-api/src/instrumentation.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
1818

1919
const otlpExporter = new OTLPTraceExporter();
2020

21-
export const defaultBatchProcessor = new BatchSpanProcessor(otlpExporter);
21+
export const defaultBatchProcessor = new BatchSpanProcessor(otlpExporter, {
22+
scheduledDelayMillis: 1000,
23+
});
2224

2325
export const defaultResource = resourceFromAttributes({
2426
[ATTR_SERVICE_NAME]: 'inkeep-agents-run-api',

agents-run-api/src/routes/chatDataStream.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { ExecutionHandler } from '../handlers/executionHandler';
2222
import { getLogger } from '../logger';
2323
import { errorOp } from '../utils/agent-operations';
2424
import { createVercelStreamHelper } from '../utils/stream-helpers';
25+
import { defaultBatchProcessor } from '../instrumentation';
2526

2627
type AppVariables = {
2728
credentialStores: CredentialStoreRegistry;
@@ -169,6 +170,7 @@ app.openapi(chatDataStreamRoute, async (c) => {
169170
dbClient,
170171
credentialStores,
171172
});
173+
await defaultBatchProcessor.forceFlush();
172174

173175
// Store last user message
174176
const lastUserMessage = body.messages.filter((m: any) => m.role === 'user').slice(-1)[0];

agents-run-api/src/services/GraphSession.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { getStreamHelper } from '../utils/stream-registry';
1919
import { setSpanWithError, tracer } from '../utils/tracer';
2020
import { ArtifactParser } from './ArtifactParser';
2121
import { ArtifactService } from './ArtifactService';
22+
import { defaultBatchProcessor } from '../instrumentation';
2223

2324
const logger = getLogger('GraphSession');
2425

@@ -958,6 +959,7 @@ ${this.statusUpdateState?.config.prompt?.trim() || ''}`;
958959
return { summaries: [] };
959960
} finally {
960961
span.end();
962+
await defaultBatchProcessor.forceFlush();
961963
}
962964
}
963965
);
@@ -1472,6 +1474,7 @@ Make it specific and relevant.`;
14721474
} finally {
14731475
// Always end the main span
14741476
span.end();
1477+
await defaultBatchProcessor.forceFlush();
14751478
}
14761479
}
14771480
);

agents-run-api/src/services/ResponseFormatter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { getLogger } from '../logger';
33
import { ArtifactParser, type StreamPart } from '../services/ArtifactParser';
44
import { tracer, setSpanWithError } from '../utils/tracer';
55
import { graphSessionManager } from '../services/GraphSession';
6+
import { defaultBatchProcessor } from '../instrumentation';
67

78
const logger = getLogger('ResponseFormatter');
89

@@ -83,6 +84,7 @@ export class ResponseFormatter {
8384
};
8485
} finally {
8586
span.end();
87+
await defaultBatchProcessor.forceFlush();
8688
}
8789
});
8890
}
@@ -149,6 +151,7 @@ export class ResponseFormatter {
149151
return { text: responseText };
150152
} finally {
151153
span.end();
154+
await defaultBatchProcessor.forceFlush();
152155
}
153156
});
154157
}

0 commit comments

Comments
 (0)