Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions internal/conversation_msg/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (c *Conversation) GetConversationIDBySessionType(_ context.Context, sourceI
return c.getConversationIDBySessionType(sourceID, sessionType)
}

func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string, p *sdkws.OfflinePushInfo, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
func (c *Conversation) sendMessage(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string, p *sdkws.OfflinePushInfo, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
filepathExt := func(name ...string) string {
for _, path := range name {
if ext := filepath.Ext(path); ext != "" {
Expand Down Expand Up @@ -538,7 +538,7 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,
return c.sendMessageToServer(ctx, s, lc, callback, delFile, p, options, isOnlineOnly)
}

func (c *Conversation) SendMessageNotOss(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string,
func (c *Conversation) sendMessageNotOss(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string,
p *sdkws.OfflinePushInfo, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
options := make(map[string]bool, 2)
lc, err := c.checkID(ctx, s, recvID, groupID, options)
Expand Down Expand Up @@ -623,6 +623,35 @@ func (c *Conversation) SendMessageNotOss(ctx context.Context, s *sdk_struct.MsgS
return c.sendMessageToServer(ctx, s, lc, callback, delFile, p, options, isOnlineOnly)
}

func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string, p *sdkws.OfflinePushInfo, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
task := &sendTask{
ctx: ctx,
msg: s,
exec: func(taskCtx context.Context) (*sdk_struct.MsgStruct, error) {
return c.sendMessage(taskCtx, s, recvID, groupID, p, isOnlineOnly)
},
}
if err := c.getSender().submit(task); err != nil {
return nil, err
}
return nil, nil
}

func (c *Conversation) SendMessageNotOss(ctx context.Context, s *sdk_struct.MsgStruct, recvID, groupID string,
p *sdkws.OfflinePushInfo, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
task := &sendTask{
ctx: ctx,
msg: s,
exec: func(taskCtx context.Context) (*sdk_struct.MsgStruct, error) {
return c.sendMessageNotOss(taskCtx, s, recvID, groupID, p, isOnlineOnly)
},
}
if err := c.getSender().submit(task); err != nil {
return nil, err
}
return nil, nil
}

func (c *Conversation) sendMessageToServer(ctx context.Context, s *sdk_struct.MsgStruct, lc *model_struct.LocalConversation, callback open_im_sdk_callback.SendMsgCallBack,
delFiles []string, offlinePushInfo *sdkws.OfflinePushInfo, options map[string]bool, isOnlineOnly bool) (*sdk_struct.MsgStruct, error) {
if isOnlineOnly {
Expand Down
10 changes: 10 additions & 0 deletions internal/conversation_msg/conversation_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type Conversation struct {
startTime time.Time

typing *typing

sender *messageSender
senderOnce sync.Once
}

func (c *Conversation) ConversationEventQueue() chan common.Cmd2Value {
Expand Down Expand Up @@ -134,6 +137,13 @@ func NewConversation(
return n
}

func (c *Conversation) getSender() *messageSender {
c.senderOnce.Do(func() {
c.sender = newMessageSender(c)
})
return c.sender
}

func (c *Conversation) initSyncer() {
c.conversationSyncer = syncer.New2[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string](
syncer.WithInsert[*model_struct.LocalConversation, pbConversation.GetOwnerConversationResp, string](func(ctx context.Context, value *model_struct.LocalConversation) error {
Expand Down
239 changes: 239 additions & 0 deletions internal/conversation_msg/send_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package conversation_msg

import (
"context"
"encoding/json"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/ccontext"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
"github.com/openimsdk/tools/errs"
)

const (
sendTaskQueueSize = 256
sendChainMaxWait = 3 * time.Second
defaultMediaOrderedBytes = 16 * 1024
minMediaOrderedBytes = 4 * 1024
maxMediaOrderedBytes = 8 * 1024 * 1024
maxSendEnqueueRetry = 100
sendEnqueueRetryInterval = time.Millisecond * 5
)

type sendTask struct {
ctx context.Context
msg *sdk_struct.MsgStruct
exec func(context.Context) (*sdk_struct.MsgStruct, error)
enqueueAt time.Time
ordered bool
lane ccontext.SendOrderLane
seq int64
mediaSize int64
deadline time.Time
}

type messageSender struct {
conversation *Conversation
queue chan *sendTask
wg sync.WaitGroup

textSeq atomic.Int64
mediaSeq atomic.Int64

estimator *thresholdEstimator
}

func newMessageSender(conversation *Conversation) *messageSender {
workers := runtime.NumCPU()
if workers < 4 {
workers = 4
}
ms := &messageSender{
conversation: conversation,
queue: make(chan *sendTask, sendTaskQueueSize),
estimator: newThresholdEstimator(),
}
for i := 0; i < workers; i++ {
ms.wg.Add(1)
go ms.worker()
}
return ms
}

func (m *messageSender) submit(task *sendTask) error {
task.enqueueAt = time.Now()
m.decorate(task)
for i := 0; i < maxSendEnqueueRetry; i++ {
select {
case m.queue <- task:
return nil
default:
time.Sleep(sendEnqueueRetryInterval)
}
}
return errs.New("send task queue full").Wrap()
}

func (m *messageSender) decorate(task *sendTask) {
task.ordered = true
if isMediaContentType(task.msg.ContentType) {
task.lane = ccontext.SendOrderLaneMedia
task.mediaSize = estimateMediaSize(task.msg)
task.ordered = m.shouldKeepMediaOrdered(task.mediaSize)
} else {
task.lane = ccontext.SendOrderLaneText
}

if task.ordered {
if task.lane == ccontext.SendOrderLaneText {
task.seq = m.textSeq.Add(1)
} else {
task.seq = m.mediaSeq.Add(1)
}
task.deadline = task.enqueueAt.Add(sendChainMaxWait)
task.ctx = ccontext.WithSendOrderInfo(task.ctx, &ccontext.SendOrderInfo{
Lane: task.lane,
Ordered: true,
Seq: task.seq,
Deadline: task.deadline,
})
m.conversation.LongConnMgr.RegisterSendOrder(task.lane, task.seq, task.deadline)
} else {
task.seq = 0
task.deadline = time.Time{}
}
}

func (m *messageSender) shouldKeepMediaOrdered(size int64) bool {
if size <= 0 {
return true
}
return float64(size) <= m.estimator.Current()
}

func (m *messageSender) worker() {
defer m.wg.Done()
for task := range m.queue {
m.runTask(task)
}
}

func (m *messageSender) runTask(task *sendTask) {
msg, err := task.exec(task.ctx)
if task.lane == ccontext.SendOrderLaneMedia && task.mediaSize > 0 && err == nil && task.ordered {
m.estimator.Update(task.mediaSize, time.Since(task.enqueueAt))
}
if err != nil {
notifySendError(task.ctx, err)
return
}
notifySendSuccess(task.ctx, msg)
}

func notifySendSuccess(ctx context.Context, msg *sdk_struct.MsgStruct) {
callback, _ := ctx.Value(ccontext.CtxCallback).(open_im_sdk_callback.SendMsgCallBack)
if callback == nil {
return
}
data, err := json.Marshal(msg)
if err != nil {
callback.OnError(sdkerrs.UnknownCode, err.Error())
return
}
callback.OnSuccess(string(data))
}

func notifySendError(ctx context.Context, err error) {
callback, _ := ctx.Value(ccontext.CtxCallback).(open_im_sdk_callback.SendMsgCallBack)
if callback == nil {
return
}
if code, ok := err.(errs.CodeError); ok {
callback.OnError(int32(code.Code()), code.Msg())
return
}
callback.OnError(sdkerrs.UnknownCode, err.Error())
}

type thresholdEstimator struct {
value float64
}

func newThresholdEstimator() *thresholdEstimator {
return &thresholdEstimator{value: defaultMediaOrderedBytes}
}

func (t *thresholdEstimator) Current() float64 {
if t.value <= 0 {
return defaultMediaOrderedBytes
}
if t.value > maxMediaOrderedBytes {
return maxMediaOrderedBytes
}
if t.value < minMediaOrderedBytes {
return minMediaOrderedBytes
}
return t.value
}

func (t *thresholdEstimator) Update(size int64, elapsed time.Duration) {
if size <= 0 || elapsed <= 0 {
return
}
bytesPerSec := float64(size) / elapsed.Seconds()
if bytesPerSec <= 0 {
return
}
target := bytesPerSec * sendChainMaxWait.Seconds()
if target > maxMediaOrderedBytes {
target = maxMediaOrderedBytes
}
if target < minMediaOrderedBytes {
target = minMediaOrderedBytes
}
if t.value <= 0 {
t.value = target
return
}
t.value = 0.6*target + 0.4*t.value
}

func isMediaContentType(contentType int32) bool {
switch contentType {
case constant.Picture, constant.Sound, constant.Video, constant.File:
return true
default:
return false
}
}

func estimateMediaSize(msg *sdk_struct.MsgStruct) int64 {
switch msg.ContentType {
case constant.Picture:
if msg.PictureElem != nil && msg.PictureElem.SourcePicture != nil {
return msg.PictureElem.SourcePicture.Size
}
case constant.Sound:
if msg.SoundElem != nil {
return msg.SoundElem.DataSize
}
case constant.Video:
if msg.VideoElem != nil {
if msg.VideoElem.VideoSize > 0 {
return msg.VideoElem.VideoSize
}
return msg.VideoElem.SnapshotSize
}
case constant.File:
if msg.FileElem != nil {
return msg.FileElem.FileSize
}
}
return 0
}
Loading
Loading