Skip to content

Commit d6cd025

Browse files
authored
fix: correctly aggregate read seqs by conversation and user before DB update. (#3442)
* build: docker compose file add some comments. * fix: correctly aggregate read seqs by conversation and user before DB update.
1 parent becc999 commit d6cd025

File tree

2 files changed

+51
-34
lines changed

2 files changed

+51
-34
lines changed

docker-compose.yml

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,17 +176,38 @@ services:
176176
environment:
177177
#KAFKA_HEAP_OPTS: "-Xms128m -Xmx256m"
178178
TZ: Asia/Shanghai
179+
# Unique identifier for the Kafka node (required in controller mode)
179180
KAFKA_CFG_NODE_ID: 0
181+
# Defines the roles this Kafka node plays: broker, controller, or both
180182
KAFKA_CFG_PROCESS_ROLES: controller,broker
183+
# Specifies which nodes are controller nodes for quorum voting.
184+
# The syntax follows the KRaft mode (no ZooKeeper): node.id@host:port
185+
# The controller listener endpoint here is kafka:9093
181186
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka:9093
187+
# Specifies which listener is used for controller-to-controller communication
182188
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
189+
# Default number of partitions for new topics
183190
KAFKA_NUM_PARTITIONS: 8
191+
# Whether to enable automatic topic creation
184192
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
193+
# Kafka internal listeners; Kafka supports multiple ports with different protocols
194+
# Each port is used for a specific purpose: INTERNAL for internal broker communication,
195+
# CONTROLLER for controller communication, EXTERNAL for external client connections.
196+
# These logical listener names are mapped to actual protocols via KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
197+
# In short, Kafka is listening on three logical ports: 9092 for internal communication,
198+
# 9093 for controller traffic, and 9094 for external access.
199+
KAFKA_CFG_LISTENERS: "INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
200+
# Addresses advertised to clients. INTERNAL://kafka:9092 uses the internal Docker service name 'kafka',
201+
# so other containers can access Kafka via kafka:9092.
202+
# EXTERNAL://localhost:19094 is the address external clients (e.g., in the LAN) should use to connect.
203+
# If Kafka is deployed on a different machine than IM, 'localhost' should be replaced with the LAN IP.
204+
KAFKA_CFG_ADVERTISED_LISTENERS: "INTERNAL://kafka:9092,EXTERNAL://localhost:19094"
205+
# Maps logical listener names to actual protocols.
206+
# Supported protocols include: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
207+
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT"
208+
# Defines which listener is used for inter-broker communication within the Kafka cluster
209+
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "INTERNAL"
185210

186-
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
187-
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:19094"
188-
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
189-
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
190211

191212
# Authentication configuration variables - comment out to disable auth
192213
# KAFKA_USERNAME: "openIM"

internal/msgtransfer/online_history_msg_handler.go

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/openimsdk/tools/discovery"
2727

2828
"github.com/go-redis/redis"
29+
"google.golang.org/protobuf/proto"
30+
2931
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
3032
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
3133
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
@@ -37,7 +39,6 @@ import (
3739
"github.com/openimsdk/tools/log"
3840
"github.com/openimsdk/tools/mcontext"
3941
"github.com/openimsdk/tools/utils/stringutil"
40-
"google.golang.org/protobuf/proto"
4142
)
4243

4344
const (
@@ -134,53 +135,48 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
134135

135136
func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
136137

137-
var conversationID string
138-
var userSeqMap map[string]int64
138+
// Outer map: conversationID -> (userID -> maxHasReadSeq)
139+
conversationUserSeq := make(map[string]map[string]int64)
140+
139141
for _, msg := range msgs {
140142
if msg.message.ContentType != constant.HasReadReceipt {
141143
continue
142144
}
143145
var elem sdkws.NotificationElem
144146
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
145-
log.ZWarn(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
147+
log.ZWarn(ctx, "Unmarshal NotificationElem error", err, "msg", msg)
146148
continue
147149
}
148150
var tips sdkws.MarkAsReadTips
149151
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
150-
log.ZWarn(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
152+
log.ZWarn(ctx, "Unmarshal MarkAsReadTips error", err, "msg", msg)
151153
continue
152154
}
153-
//The conversation ID for each batch of messages processed by the batcher is the same.
154-
conversationID = tips.ConversationID
155-
if len(tips.Seqs) > 0 {
156-
for _, seq := range tips.Seqs {
157-
if tips.HasReadSeq < seq {
158-
tips.HasReadSeq = seq
159-
}
160-
}
161-
clear(tips.Seqs)
162-
tips.Seqs = nil
163-
}
164-
if tips.HasReadSeq < 0 {
155+
if len(tips.ConversationID) == 0 || tips.HasReadSeq < 0 {
165156
continue
166157
}
167-
if userSeqMap == nil {
168-
userSeqMap = make(map[string]int64)
158+
159+
// Calculate the max seq from tips.Seqs
160+
for _, seq := range tips.Seqs {
161+
if tips.HasReadSeq < seq {
162+
tips.HasReadSeq = seq
163+
}
169164
}
170165

171-
if userSeqMap[tips.MarkAsReadUserID] > tips.HasReadSeq {
172-
continue
166+
if _, ok := conversationUserSeq[tips.ConversationID]; !ok {
167+
conversationUserSeq[tips.ConversationID] = make(map[string]int64)
168+
}
169+
if conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] < tips.HasReadSeq {
170+
conversationUserSeq[tips.ConversationID][tips.MarkAsReadUserID] = tips.HasReadSeq
173171
}
174-
userSeqMap[tips.MarkAsReadUserID] = tips.HasReadSeq
175-
}
176-
if userSeqMap == nil {
177-
return
178-
}
179-
if len(conversationID) == 0 {
180-
log.ZWarn(ctx, "conversation err", nil, "conversationID", conversationID)
181172
}
182-
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, conversationID, userSeqMap); err != nil {
183-
log.ZWarn(ctx, "set read seq to db error", err, "conversationID", conversationID, "userSeqMap", userSeqMap)
173+
log.ZInfo(ctx, "doSetReadSeq", "conversationUserSeq", conversationUserSeq)
174+
175+
// persist to db
176+
for convID, userSeqMap := range conversationUserSeq {
177+
if err := och.msgTransferDatabase.SetHasReadSeqToDB(ctx, convID, userSeqMap); err != nil {
178+
log.ZWarn(ctx, "SetHasReadSeqToDB error", err, "conversationID", convID, "userSeqMap", userSeqMap)
179+
}
184180
}
185181

186182
}

0 commit comments

Comments
 (0)