-
Notifications
You must be signed in to change notification settings - Fork 0
Reliability
cuihairu edited this page Oct 20, 2025
·
1 revision
Module: reliability/
Retry / Dead Letter Queue / Deduplication / Rate Limiting
- 提供处理可靠性保障:瞬时失败重试、异常送入 DLQ、重复数据抑制、速率限制等能力
- 支持纯内存与 Redis 两种实现(去重/限流)
- 重试:
RetryExecutor,RetryPolicy - DLQ:
dlq.DeadLetterService,DeadLetterAdmin,RedisDeadLetterService,RedisDeadLetterAdmin,RedisDeadLetterConsumer,DeadLetterRecord,DeadLetterEntry - 去重:
deduplication.BloomFilterDeduplicator,SetDeduplicator,WindowedDeduplicator,DeduplicatorFactory - 限流:
ratelimit.InMemorySlidingWindowRateLimiter,InMemoryTokenBucketRateLimiter,RedisSlidingWindowRateLimiter,RedisTokenBucketRateLimiter,RateLimitingSink,NamedRateLimiter - 指标:
reliability.metrics.ReliabilityMetrics*,RateLimitMetrics*
RetryPolicy p = RetryPolicy.builder()
.maxAttempts(3)
.initialDelay(Duration.ofMillis(100))
.build();
RetryExecutor ex = new RetryExecutor(p);
String r = ex.execute(x -> callExternal((String)x), "input");DeadLetterService svc = new RedisDeadLetterService(redisson);
DeadLetterRecord rec = new DeadLetterRecord();
rec.originalTopic = "topicA"; rec.payload = "oops"; rec.maxRetries = 3;
svc.send(rec);
// admin 回放/清理
DeadLetterAdmin admin = new RedisDeadLetterAdmin(redisson, svc);
admin.replayAll("topicA", 100);Deduplicator<String> d = DeduplicatorFactory.createSet(redisson, "dedup:orders", x -> x);
boolean dup = d.checkAndMark("order-1");RateLimiter rl = new RedisSlidingWindowRateLimiter(redisson, "streaming:rl", 1000, 100);
StreamSink<String> wrapped = RateLimitingSink.drop(rl, k -> k, originalSink);- Reliability:
dlq_replay_total/dlq_replay_duration_nanos、dlq_delete_total、dlq_clear_total - RateLimit:
ratelimit_allowed_total、ratelimit_denied_total
- 组消费 ack 语义:仅回放成功才 ack;失败/异常不 ack
- headers 为字符串 JSON:已在
DeadLetterCodec中兼容解析 - 测试模式钩子:
reliability.dlq.test.*系统属性仅用于 IT 稳定,不影响生产
- Design.md