Skip to content

Latest commit

 

History

History
248 lines (180 loc) · 4.95 KB

File metadata and controls

248 lines (180 loc) · 4.95 KB

🔄 SSE 实现更新说明

更新内容

将 SSE (Server-Sent Events) 实现从手动的 ReadableStream 改为使用 Nitro 内置的 createEventStream API

主要变化

之前(手动实现)

// 手动设置响应头
setResponseHeaders(event, {
  'Content-Type': 'text/event-stream',
  'Cache-Control': 'no-cache',
  Connection: 'keep-alive'
})

// 手动创建 ReadableStream
const stream = new ReadableStream({
  start(controller) {
    const message = formatSSEMessage(eventType, data)
    controller.enqueue(new TextEncoder().encode(message))
  }
})

之后(Nitro 内置)

// 使用 Nitro 的 createEventStream
const eventStream = createEventStream(event)

// 简洁的事件推送
await eventStream.push(JSON.stringify(data))

// 优雅的连接管理
eventStream.onClosed(async () => {
  await eventStream.close()
})

return eventStream.send()

优势

  1. 更简洁的代码

    • 无需手动管理响应头
    • 无需手动编码文本
    • 自动处理 SSE 格式
  2. 更好的连接管理

    • onClosed() 回调自动处理断开连接
    • 自动清理资源
    • 更可靠的错误处理
  3. 符合框架规范

    • 使用官方推荐的 API
    • 更好的维护性
    • 未来兼容性保证

事件格式变化

之前

event: status_update
data: {"status":"RUNNING","timestamp":"..."}

event: log
data: {"message":"Processing..."}

之后

data: {"eventType":"status_update","data":{"status":"RUNNING","timestamp":"..."}}

data: {"eventType":"log","data":{"message":"Processing..."}}

客户端使用变化

JavaScript/浏览器

之前

const eventSource = new EventSource('/api/v1/stream/jobId')

eventSource.addEventListener('status_update', (event) => {
  const data = JSON.parse(event.data)
  console.log(data)
})

eventSource.addEventListener('result', (event) => {
  const data = JSON.parse(event.data)
  console.log(data)
})

之后

const eventSource = new EventSource('/api/v1/stream/jobId')

eventSource.addEventListener('message', (event) => {
  const { eventType, data } = JSON.parse(event.data)

  switch (eventType) {
    case 'status_update':
      console.log('Status:', data)
      break
    case 'log':
      console.log('Log:', data.message)
      break
    case 'result':
      console.log('Result:', data.result)
      eventSource.close()
      break
    case 'error':
      console.error('Error:', data.error)
      eventSource.close()
      break
  }
})

cURL

之前和之后都一样

curl -N http://localhost:3005/api/v1/stream/{jobId}

只是输出格式略有不同。

迁移指南

1. 更新前端代码

如果你有现有的客户端代码,需要:

  1. 从监听具体事件类型改为监听 message 事件
  2. 解析 event.data 中的 eventTypedata 字段
  3. 根据 eventType 进行相应处理

2. 测试脚本已更新

test.mjs 已更新以适应新格式,运行:

node test.mjs

技术细节

Nitro createEventStream API

interface EventStream {
  push(data: string): Promise<void>
  close(): Promise<void>
  send(): EventStreamMessage
  onClosed(callback: () => void | Promise<void>): void
}

我们的使用模式

// 1. 创建流
const eventStream = createEventStream(event)

// 2. 推送数据(自动 JSON 编码)
await eventStream.push(
  JSON.stringify({
    eventType: 'status_update',
    data: { status: 'RUNNING' }
  })
)

// 3. 监听断开
eventStream.onClosed(async () => {
  // 清理资源
  jobManager.off(`job:${jobId}`, eventHandler)
  await eventStream.close()
})

// 4. 返回流
return eventStream.send()

兼容性

完全向后兼容 - 所有现有功能保持不变
相同的 API 接口 - 客户端只需调整事件解析方式
更好的性能 - 使用框架优化的实现
更可靠 - 更好的错误处理和连接管理

相关文件

已更新的文件:

  • server/api/v1/stream/[jobId].get.ts - SSE 实现
  • test.mjs - 测试脚本
  • README.md - JavaScript 示例
  • QUICK_START.md - 使用指南

测试验证

# 1. 启动服务器
yarn dev

# 2. 提交测试任务
curl -X POST http://localhost:3005/api/v1/schedule \
  -H "Content-Type: application/json" \
  -d '{
    "serviceId": "example-web-api",
    "payload": {"url": "https://httpbin.org/delay/2", "method": "GET"},
    "queue": "default"
  }'

# 3. 监听 SSE(替换 jobId)
curl -N http://localhost:3005/api/v1/stream/{jobId}

# 4. 运行自动化测试
node test.mjs

总结

这次更新使 SimpleScheduler 的 SSE 实现更加:

  • 🎯 标准化 - 遵循 Nitro 框架规范
  • 🔧 易维护 - 代码更简洁清晰
  • 🚀 高性能 - 使用框架优化
  • 🛡️ 更安全 - 更好的资源管理

更新时间: 2025-11-12
版本: 1.1.0
破坏性变更: 无(仅客户端事件解析方式调整)