Skip to content

整体总结#359

Merged
Matrix-X merged 7 commits intodev/michaelhufrom
004-eventbus-message-fabric
Feb 27, 2026
Merged

整体总结#359
Matrix-X merged 7 commits intodev/michaelhufrom
004-eventbus-message-fabric

Conversation

@Matrix-X
Copy link
Contributor

  • 这次把异步链路从“分散功能”收敛成统一运行时:Event Topic + Task + WebSocket + Scheduler(Cron),并按同一命名与联调方式对齐。
  • 你当前已实测通过:WS 实时订阅/多 topic 复用、Replay 状态推送、Pipeline 通知推送、Cron run-now/pause/resume。

核心实现调整

  • 统一 WS 入口为 /api/ws(不再走 /ws 兼容思路),并修复前端连接构造避免再打到 /ws。
  • 联调入口统一为两类任务:replay 与 pipeline,并通过 Task 队列落地和历史可追溯。
  • Queue 语义明确为分片模型:tenant_key + subscriber_id;运行态与历史态分离,历史可持续追溯。
  • 系统级 topic 与业务/插件/第三方 topic 的治理逻辑已按 event_topics 统一管理路径推进。

脚本与回归体系

  • Event 现有:scripts/event_fabric/integration_playbook.sh
  • 新增 WebSocket:scripts/websocket/integration_playbook.sh:1
  • 新增 Cron:scripts/cron/integration_playbook.sh:1
  • 旧 smoke 已改为转发壳:scripts/event_fabric/smoke_task_event.sh:1
  • Layer1 回归已对齐并通过:scripts/ci/event_fabric_layer1.sh:26

文档体系重构

  • 总入口已收敛到 docs/guides/async_runtime/README.md
  • Task 手册(已补成可操作步骤):docs/guides/async_runtime/task/README.md:6
  • Scheduler 手册(已补成可操作步骤):docs/guides/async_runtime/scheduler/README.md:6
  • Event 联调总手册:docs/guides/async_runtime/event_fabric/integration_playbook.md:91
  • WS 规范文档已对齐 /api/ws:docs/plan/wx/WS-NOTIFY.md:191

当前结论

  • 这条链路已经不是“概念设计”,而是“可执行、可联调、可回归”的状态。
  • 你现在继续做手工验收是正确阶段。需要的话我下一步可以给你一份“验收打勾清单(按页面+接口+日志)”。

  - 这次把异步链路从“分散功能”收敛成统一运行时:Event Topic + Task + WebSocket + Scheduler(Cron),并按同一命名与联调方式对齐。
  - 你当前已实测通过:WS 实时订阅/多 topic 复用、Replay 状态推送、Pipeline 通知推送、Cron run-now/pause/resume。

  核心实现调整

  - 统一 WS 入口为 /api/ws(不再走 /ws 兼容思路),并修复前端连接构造避免再打到 /ws。
  - 联调入口统一为两类任务:replay 与 pipeline,并通过 Task 队列落地和历史可追溯。
  - Queue 语义明确为分片模型:tenant_key + subscriber_id;运行态与历史态分离,历史可持续追溯。
  - 系统级 topic 与业务/插件/第三方 topic 的治理逻辑已按 event_topics 统一管理路径推进。

  脚本与回归体系

  - Event 现有:scripts/event_fabric/integration_playbook.sh
  - 新增 WebSocket:scripts/websocket/integration_playbook.sh:1
  - 新增 Cron:scripts/cron/integration_playbook.sh:1
  - 旧 smoke 已改为转发壳:scripts/event_fabric/smoke_task_event.sh:1
  - Layer1 回归已对齐并通过:scripts/ci/event_fabric_layer1.sh:26

  文档体系重构

  - 总入口已收敛到 docs/guides/async_runtime/README.md
  - Task 手册(已补成可操作步骤):docs/guides/async_runtime/task/README.md:6
  - Scheduler 手册(已补成可操作步骤):docs/guides/async_runtime/scheduler/README.md:6
  - Event 联调总手册:docs/guides/async_runtime/event_fabric/integration_playbook.md:91
  - WS 规范文档已对齐 /api/ws:docs/plan/wx/WS-NOTIFY.md:191

  当前结论

  - 这条链路已经不是“概念设计”,而是“可执行、可联调、可回归”的状态。
  - 你现在继续做手工验收是正确阶段。需要的话我下一步可以给你一份“验收打勾清单(按页面+接口+日志)”。
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ff5b94576b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

})
pxlog.WarnF(context.Background(), "[event_fabric.task_driver] queue.driver=kafka enabled; using kafka adapter with redis fallback")
retryWorkerDriverName = string(taskDriver.Type())
retryWorkerFallbackEnabled = false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge 保持非 Redis 驱动下重试轮询开启

这里将 retryWorkerFallbackEnabled 置为 false 后,EventFabricRetryWorker.Run 会直接返回并停止调用 delivery.PollRetry,但当前 Kafka/RabbitMQ/NATS 驱动的 Enqueue/Dequeue/Nack 仍全部委托给 FallbackDriver(即 Redis 任务驱动),重试链路依然依赖现有 Redis scheduler;在 queue.driver 配置为这些驱动时,失败投递会堆积在 retry 队列里而不会被再次分发。

Useful? React with 👍 / 👎.

@@ -1,22 +0,0 @@
package eventbus

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge 为旧 eventbus 包路径保留兼容导出

该提交删除了 internal/eventbus 包后,仓库里仍有代码继续导入旧路径(例如 backend/tests/integration/capability_registry/load/load_test.gobackend/tests/integration/capability_registry/event_latency_test.go),会在编译测试时直接触发无法解析导入路径的问题;迁移包路径时需要同步更新剩余导入,或暂时保留一个兼容层避免测试套件被整体打断。

Useful? React with 👍 / 👎.

  - 把旧导入改为新包:
      - backend/tests/integration/capability_registry/load/load_test.go:21
      - backend/tests/integration/capability_registry/event_latency_test.go:10
  - 同步修正事件常量引用:
      - backend/tests/integration/capability_registry/event_latency_test.go:87
  - 删除我刚才加的兼容文件:
      - backend/internal/eventbus/topics_compat.go
      - backend/internal/eventbus/subscribers_compat.go
  - 在 backend/internal/app/shared/deps.go:942 保持 retryWorkerFallbackEnabled = true。
  - 删除了 kafka/rabbitmq/nats 分支里把它改成 false 的逻辑(以及对应 degrade 日志)。
  - 结果是:只要配置了 Redis fallback driver,EventFabricRetryWorker.Run 就不会提前退出,delivery.PollRetry 会继续跑。

  关键位置

  - backend/internal/app/shared/deps.go:942
  - backend/internal/app/shared/deps.go:943
  - backend/internal/app/shared/deps.go:954
  - backend/internal/app/shared/deps.go:966

  本地校验

  - 已跑最小编译检查:go test ./internal/app/shared -run TestTaskHistory -count=1(通过)。

  这次改动后,非 Redis 主驱动 + Redis fallback 的场景下,retry 队列不会因 worker 关闭而堆积不分发。
  - 重构为“按步骤操作”结构,先做什么后做什么一眼可见。
    web-admin/app/pages/settings/monitor.vue
  - 状态区语义化:
      - 连接状态:已连接/连接中/未连接/错误
      - 订阅状态:已订阅(绿色)、未订阅(黄色)
  - 连接控制交互优化:
      - 当前只显示可执行主动作(连接或断开)
      - 断开按钮改为红色语义
  - topic 选择改为下拉(USelectMenu),不再要求手输记忆。
  - topic 默认值改为 _topic.system.notification(最符合 Replay/Pipeline 联调观察)。
  - 进入 WebSocket 页签会加载 topic 列表(合并默认 + 后端 topics)。
  - 文案明确了 Replay/Pipeline 推荐订阅 topic 和预期结果。
  - 实时消息预览补了 type/topic/trace_id/payload,便于现场判读。

  你刚才实测结果判断

  - Replay 3 条(pending/running/completed)= 正常
  - Pipeline 1 条通知实体事件 = 正常
  - JWT 主体验证:cache-first + DB-fallback + 回填缓存
  - 主动失效:user/member/tenant 更新或删除后,立即清理对应认证缓存 key

  本次改动

  - 新增:backend/internal/http/auth_subject_validator.go:1
      - 校验 tenant/user/member 状态与归属一致性
      - 缓存 key:
          - auth:user:{user_id}
          - auth:member:{member_id}
          - auth:tenant_uuid:{tenant_uuid}
      - TTL:60s
      - 注册 GORM 回调(update/delete)做缓存主动失效
  - 接入路由:backend/internal/http/router.go:18
      - 在 SetupRouter 注册缓存失效回调
      - 把 JWT 主体验证回调挂入 APIKeyOrJwtMiddleware
      - wscat 成功连接 /api/ws
      - subscribe _topic.template.update 收到 ack
      - 能收到实际事件 payload(含 trace_id)
  - 鉴权与权限模型已对齐
      - ws-bus/register 全面改为 ws-bus/grant
      - grant 只做授权绑定,不创建 topic
      - topic 必须先在 event_topics 存在
      - API Key 权限改为固定动作级(publish/subscribe/replay),不按 topic 自动生 permission
  - 关键行为已修正
      - Profile 权限保存后,自动同步该 Profile 下 active key 的快照(不再强制轮换)
      - 新增同步日志与返回字段:synced_keys/synced_perms
      - API Key 支持“删除”(软删除并从列表隐藏),保留“吊销”
  - 文档已对齐
      - 插件 topic 双层约束已明确:plugin.yaml 声明层 + config/event_fabric.yaml 执行层
      - 明确 topics 创建 与 grant 授权边界
      - wscat 成功连接 /api/ws
      - subscribe _topic.template.update 收到 ack
      - 能收到实际事件 payload(含 trace_id)
  - 鉴权与权限模型已对齐
      - ws-bus/register 全面改为 ws-bus/grant
      - grant 只做授权绑定,不创建 topic
      - topic 必须先在 event_topics 存在
      - API Key 权限改为固定动作级(publish/subscribe/replay),不按 topic 自动生 permission
  - 关键行为已修正
      - Profile 权限保存后,自动同步该 Profile 下 active key 的快照(不再强制轮换)
      - 新增同步日志与返回字段:synced_keys/synced_perms
      - API Key 支持“删除”(软删除并从列表隐藏),保留“吊销”
  - 文档已对齐
      - 插件 topic 双层约束已明确:plugin.yaml 声明层 + config/event_fabric.yaml 执行层
      - 明确 topics 创建 与 grant 授权边界
@Matrix-X Matrix-X merged commit 3f03b3a into dev/michaelhu Feb 27, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant