Skip to content

Commit aba8483

Browse files
committed
feat: 添加首字时长 (TTFT) 字段用于流式响应延迟追踪
为 ProxyRequest 和 ProxyUpstreamAttempt 添加 TTFT (Time To First Token) 字段, 记录流式 API 请求从开始到收到第一个 token 的时间间隔。 主要改动: - domain: 添加 TTFT time.Duration 字段 - sqlite: 添加 TTFTMs 毫秒存储字段及转换逻辑 - adapters: custom/antigravity/kiro 适配器追踪首字时间 - executor: 处理 EventFirstToken 事件并同步到 proxyReq - frontend: 同步添加 ttft 类型字段
1 parent 17eaf93 commit aba8483

File tree

16 files changed

+1039
-527
lines changed

16 files changed

+1039
-527
lines changed

internal/adapter/provider/antigravity/adapter.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,7 @@ func (a *AntigravityAdapter) handleStreamResponse(ctx context.Context, w http.Re
648648
// Read chunks and accumulate until we have complete lines
649649
var lineBuffer bytes.Buffer
650650
buf := make([]byte, 4096)
651+
firstChunkSent := false // Track TTFT
651652

652653
for {
653654
// Check context before reading
@@ -700,6 +701,12 @@ func (a *AntigravityAdapter) handleStreamResponse(ctx context.Context, w http.Re
700701
return domain.NewProxyErrorWithMessage(writeErr, false, "client disconnected")
701702
}
702703
flusher.Flush()
704+
705+
// Track TTFT: send first token time on first successful write
706+
if !firstChunkSent {
707+
firstChunkSent = true
708+
eventChan.SendFirstToken(time.Now().UnixMilli())
709+
}
703710
}
704711
}
705712
}

internal/adapter/provider/custom/adapter.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
316316
// Use buffer-based approach to handle incomplete lines properly
317317
var lineBuffer bytes.Buffer
318318
buf := make([]byte, 4096)
319+
firstChunkSent := false // Track TTFT
319320

320321
for {
321322
// Check context before reading
@@ -361,6 +362,12 @@ func (a *CustomAdapter) handleStreamResponse(ctx context.Context, w http.Respons
361362
return domain.NewProxyErrorWithMessage(writeErr, false, "client disconnected")
362363
}
363364
flusher.Flush()
365+
366+
// Track TTFT: send first token time on first successful write
367+
if !firstChunkSent {
368+
firstChunkSent = true
369+
eventChan.SendFirstToken(time.Now().UnixMilli())
370+
}
364371
}
365372
}
366373
}

internal/adapter/provider/kiro/adapter.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,42 +362,47 @@ func (a *KiroAdapter) handleStreamResponse(ctx context.Context, w http.ResponseW
362362

363363
if err := streamCtx.sendInitialEvents(); err != nil {
364364
inTok, outTok := streamCtx.GetTokenCounts()
365-
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel)
365+
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel, streamCtx.GetFirstTokenTimeMs())
366366
return domain.NewProxyErrorWithMessage(err, false, "failed to send initial events")
367367
}
368368

369369
err = streamCtx.processEventStream(ctx, resp.Body)
370370
if err != nil {
371371
if ctx.Err() != nil {
372372
inTok, outTok := streamCtx.GetTokenCounts()
373-
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel)
373+
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel, streamCtx.GetFirstTokenTimeMs())
374374
return domain.NewProxyErrorWithMessage(ctx.Err(), false, "client disconnected")
375375
}
376376

377377
_ = streamCtx.sendFinalEvents()
378378
inTok, outTok := streamCtx.GetTokenCounts()
379-
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel)
379+
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel, streamCtx.GetFirstTokenTimeMs())
380380
return nil
381381
}
382382

383383
if err := streamCtx.sendFinalEvents(); err != nil {
384384
inTok, outTok := streamCtx.GetTokenCounts()
385-
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel)
385+
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel, streamCtx.GetFirstTokenTimeMs())
386386
return domain.NewProxyErrorWithMessage(err, false, "failed to send final events")
387387
}
388388

389389
inTok, outTok := streamCtx.GetTokenCounts()
390-
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel)
390+
a.sendFinalEvents(ctx, sseBuffer.String(), inTok, outTok, requestModel, streamCtx.GetFirstTokenTimeMs())
391391
return nil
392392
}
393393

394394
// sendFinalEvents sends final events via EventChannel
395-
func (a *KiroAdapter) sendFinalEvents(ctx context.Context, body string, inputTokens, outputTokens int, requestModel string) {
395+
func (a *KiroAdapter) sendFinalEvents(ctx context.Context, body string, inputTokens, outputTokens int, requestModel string, firstTokenTimeMs int64) {
396396
eventChan := ctxutil.GetEventChan(ctx)
397397
if eventChan == nil {
398398
return
399399
}
400400

401+
// Send first token time if available (for TTFT tracking)
402+
if firstTokenTimeMs > 0 {
403+
eventChan.SendFirstToken(firstTokenTimeMs)
404+
}
405+
401406
// Send response info with body
402407
eventChan.SendResponseInfo(&domain.ResponseInfo{
403408
Status: 200, // streaming always returns 200 at this point

internal/adapter/provider/kiro/streaming.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type streamProcessorContext struct {
2525
toolUseIdByBlockIndex map[int]string
2626
completedToolUseIds map[string]bool
2727
jsonBytesByBlockIndex map[int]int
28+
firstTokenTimeMs int64 // Unix milliseconds of first token sent (for TTFT tracking)
2829
}
2930

3031
func newStreamProcessorContext(w http.ResponseWriter, model string, inputTokens int, writer io.Writer) (*streamProcessorContext, error) {
@@ -169,6 +170,12 @@ func (ctx *streamProcessorContext) processEvent(event SSEEvent) error {
169170
return err
170171
}
171172
ctx.flusher.Flush()
173+
174+
// Track TTFT: record first token time on first successful send
175+
if ctx.firstTokenTimeMs == 0 {
176+
ctx.firstTokenTimeMs = time.Now().UnixMilli()
177+
}
178+
172179
return nil
173180
}
174181

@@ -321,3 +328,8 @@ func (ctx *streamProcessorContext) GetTokenCounts() (inputTokens int, outputToke
321328
}
322329
return ctx.inputTokens, outputTokens
323330
}
331+
332+
// GetFirstTokenTimeMs returns the first token time in Unix milliseconds (for TTFT tracking)
333+
func (ctx *streamProcessorContext) GetFirstTokenTimeMs() int64 {
334+
return ctx.firstTokenTimeMs
335+
}

internal/domain/adapter_event.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ const (
1212
EventMetrics
1313
// EventResponseModel is sent when response model is extracted
1414
EventResponseModel
15+
// EventFirstToken is sent when the first token/chunk is received (for TTFT tracking)
16+
EventFirstToken
1517
)
1618

1719
// AdapterMetrics contains token usage metrics (avoids import cycle with usage package)
@@ -26,11 +28,12 @@ type AdapterMetrics struct {
2628

2729
// AdapterEvent represents an event from adapter to executor
2830
type AdapterEvent struct {
29-
Type AdapterEventType
30-
RequestInfo *RequestInfo // for EventRequestInfo
31-
ResponseInfo *ResponseInfo // for EventResponseInfo
32-
Metrics *AdapterMetrics // for EventMetrics
33-
ResponseModel string // for EventResponseModel
31+
Type AdapterEventType
32+
RequestInfo *RequestInfo // for EventRequestInfo
33+
ResponseInfo *ResponseInfo // for EventResponseInfo
34+
Metrics *AdapterMetrics // for EventMetrics
35+
ResponseModel string // for EventResponseModel
36+
FirstTokenTime int64 // for EventFirstToken (Unix milliseconds)
3437
}
3538

3639
// AdapterEventChan is used by adapters to send events to executor
@@ -86,6 +89,17 @@ func (ch AdapterEventChan) SendResponseModel(model string) {
8689
}
8790
}
8891

92+
// SendFirstToken sends first token event with the time when first token was received
93+
func (ch AdapterEventChan) SendFirstToken(timeMs int64) {
94+
if ch == nil || timeMs == 0 {
95+
return
96+
}
97+
select {
98+
case ch <- &AdapterEvent{Type: EventFirstToken, FirstTokenTime: timeMs}:
99+
default:
100+
}
101+
}
102+
89103
// Close closes the event channel
90104
func (ch AdapterEventChan) Close() {
91105
if ch != nil {

internal/domain/model.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ type ProxyRequest struct {
201201
EndTime time.Time `json:"endTime"`
202202
Duration time.Duration `json:"duration"`
203203

204+
// TTFT (Time To First Token) 首字时长,流式接口第一条数据返回的延迟
205+
TTFT time.Duration `json:"ttft"`
206+
204207
// 是否为 SSE 流式请求
205208
IsStream bool `json:"isStream"`
206209

@@ -256,6 +259,9 @@ type ProxyUpstreamAttempt struct {
256259
EndTime time.Time `json:"endTime"`
257260
Duration time.Duration `json:"duration"`
258261

262+
// TTFT (Time To First Token) 首字时长,流式接口第一条数据返回的延迟
263+
TTFT time.Duration `json:"ttft"`
264+
259265
// PENDING, IN_PROGRESS, COMPLETED, FAILED
260266
Status string `json:"status"`
261267

internal/executor/executor.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,7 @@ func (e *Executor) Execute(ctx context.Context, w http.ResponseWriter, req *http
450450
proxyReq.Cache1hWriteCount = metrics.Cache1hCreationCount
451451
}
452452
proxyReq.Cost = attemptRecord.Cost
453+
proxyReq.TTFT = attemptRecord.TTFT
453454

454455
_ = e.proxyRequestRepo.Update(proxyReq)
455456

@@ -520,6 +521,7 @@ func (e *Executor) Execute(ctx context.Context, w http.ResponseWriter, req *http
520521
}
521522
}
522523
proxyReq.Cost = attemptRecord.Cost
524+
proxyReq.TTFT = attemptRecord.TTFT
523525

524526
_ = e.proxyRequestRepo.Update(proxyReq)
525527
if e.broadcaster != nil {
@@ -812,6 +814,11 @@ func (e *Executor) processAdapterEvents(eventChan domain.AdapterEventChan, attem
812814
if event.ResponseModel != "" {
813815
attempt.ResponseModel = event.ResponseModel
814816
}
817+
case domain.EventFirstToken:
818+
if event.FirstTokenTime > 0 {
819+
firstTokenTime := time.UnixMilli(event.FirstTokenTime)
820+
attempt.TTFT = firstTokenTime.Sub(attempt.StartTime)
821+
}
815822
}
816823
default:
817824
// No more events
@@ -862,6 +869,13 @@ func (e *Executor) processAdapterEventsRealtime(eventChan domain.AdapterEventCha
862869
attempt.ResponseModel = event.ResponseModel
863870
needsBroadcast = true
864871
}
872+
case domain.EventFirstToken:
873+
if event.FirstTokenTime > 0 {
874+
// Calculate TTFT as duration from start time to first token time
875+
firstTokenTime := time.UnixMilli(event.FirstTokenTime)
876+
attempt.TTFT = firstTokenTime.Sub(attempt.StartTime)
877+
needsBroadcast = true
878+
}
865879
}
866880

867881
// Broadcast update immediately for real-time visibility

internal/repository/sqlite/models.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ type ProxyRequest struct {
190190
StartTime int64
191191
EndTime int64 `gorm:"index"`
192192
DurationMs int64
193+
TTFTMs int64
193194
Status string `gorm:"size:64"`
194195
RequestInfo LongText
195196
ResponseInfo LongText
@@ -233,6 +234,7 @@ type ProxyUpstreamAttempt struct {
233234
StartTime int64
234235
EndTime int64
235236
DurationMs int64
237+
TTFTMs int64
236238
RequestModel string `gorm:"size:128"`
237239
MappedModel string `gorm:"size:128"`
238240
ResponseModel string `gorm:"size:128"`

internal/repository/sqlite/proxy_request.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ func (r *ProxyRequestRepository) toModel(p *domain.ProxyRequest) *ProxyRequest {
392392
StartTime: toTimestamp(p.StartTime),
393393
EndTime: toTimestamp(p.EndTime),
394394
DurationMs: p.Duration.Milliseconds(),
395+
TTFTMs: p.TTFT.Milliseconds(),
395396
IsStream: boolToInt(p.IsStream),
396397
Status: p.Status,
397398
StatusCode: p.StatusCode,
@@ -428,6 +429,7 @@ func (r *ProxyRequestRepository) toDomain(m *ProxyRequest) *domain.ProxyRequest
428429
StartTime: fromTimestamp(m.StartTime),
429430
EndTime: fromTimestamp(m.EndTime),
430431
Duration: time.Duration(m.DurationMs) * time.Millisecond,
432+
TTFT: time.Duration(m.TTFTMs) * time.Millisecond,
431433
IsStream: m.IsStream == 1,
432434
Status: m.Status,
433435
StatusCode: m.StatusCode,

internal/repository/sqlite/proxy_upstream_attempt.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func (r *ProxyUpstreamAttemptRepository) toModel(a *domain.ProxyUpstreamAttempt)
195195
StartTime: toTimestamp(a.StartTime),
196196
EndTime: toTimestamp(a.EndTime),
197197
DurationMs: a.Duration.Milliseconds(),
198+
TTFTMs: a.TTFT.Milliseconds(),
198199
Status: a.Status,
199200
ProxyRequestID: a.ProxyRequestID,
200201
IsStream: boolToInt(a.IsStream),
@@ -223,6 +224,7 @@ func (r *ProxyUpstreamAttemptRepository) toDomain(m *ProxyUpstreamAttempt) *doma
223224
StartTime: fromTimestamp(m.StartTime),
224225
EndTime: fromTimestamp(m.EndTime),
225226
Duration: time.Duration(m.DurationMs) * time.Millisecond,
227+
TTFT: time.Duration(m.TTFTMs) * time.Millisecond,
226228
Status: m.Status,
227229
ProxyRequestID: m.ProxyRequestID,
228230
IsStream: m.IsStream == 1,

0 commit comments

Comments
 (0)