Skip to content

Commit 8b5acf3

Browse files
committed
fix: try to improve the fetch messages
1 parent 8f2e2e3 commit 8b5acf3

File tree

2 files changed

+59
-27
lines changed

2 files changed

+59
-27
lines changed

matrix/client.go

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package matrix
22

33
import (
44
"context"
5+
"encoding/json"
56
"errors"
67
"fmt"
78
"net/http"
@@ -94,21 +95,66 @@ func (mc *MatrixClient) SendMessage(ctx context.Context, userID id.UserID, roomI
9495
}
9596

9697
// Sync performs a sync for the specified user to fetch messages.
97-
func (mc *MatrixClient) Sync(ctx context.Context, userID id.UserID, since string) (*mautrix.RespSync, error) {
98+
// If filterAfterEventID is provided (non-empty), uses post-processing to filter events.
99+
func (mc *MatrixClient) Sync(ctx context.Context, userID id.UserID, since, filterAfterEventID string) (*mautrix.RespSync, error) {
98100
mc.mu.Lock()
99101
defer mc.mu.Unlock()
100102

101-
logger.Debug().Str("user_id", string(userID)).Str("since", since).Msg("matrix: performing sync")
103+
logger.Debug().Str("user_id", string(userID)).Str("since", since).Str("filter_after_event_id", filterAfterEventID).Msg("matrix: performing sync")
102104

103105
mc.cli.UserID = userID
104-
// The SyncRequest method takes filter parameters directly in this version.
105-
// Using empty filter to ensure we get all rooms and messages
106-
resp, err := mc.cli.SyncRequest(ctx, 30000, since, "", false, event.PresenceOnline)
106+
107+
// Create a filter that only returns message events to optimize bandwidth
108+
filter := &mautrix.Filter{
109+
Room: &mautrix.RoomFilter{
110+
Timeline: &mautrix.FilterPart{
111+
Types: []event.Type{event.EventMessage},
112+
Limit: 100, // Reasonable limit to get enough context
113+
},
114+
},
115+
}
116+
117+
// Marshal the filter to JSON string for the SyncRequest
118+
filterBytes, err := json.Marshal(filter)
119+
if err != nil {
120+
logger.Error().Str("user_id", string(userID)).Err(err).Msg("matrix: failed to marshal filter")
121+
return nil, fmt.Errorf("marshal filter: %w", err)
122+
}
123+
filterJSON := string(filterBytes)
124+
logger.Debug().Str("user_id", string(userID)).Str("filter", filterJSON).Msg("matrix: using message filter for sync")
125+
126+
// The SyncRequest method signature: SyncRequest(ctx, timeoutMS, since, filter, fullState, setPresence)
127+
resp, err := mc.cli.SyncRequest(ctx, 30000, since, filterJSON, false, event.PresenceOnline)
107128
if err != nil {
108129
logger.Error().Str("user_id", string(userID)).Err(err).Msg("matrix: sync failed")
109130
return nil, err
110131
}
111132

133+
// Post-process: If filterAfterEventID is provided, filter room timelines to only include events after it
134+
// This is necessary because the Matrix spec doesn't support server-side filtering by event ID
135+
if filterAfterEventID != "" {
136+
logger.Debug().Str("user_id", string(userID)).Str("filter_after_event_id", filterAfterEventID).Msg("matrix: filtering sync results by event ID")
137+
for roomID, room := range resp.Rooms.Join {
138+
foundEvent := false
139+
// Find the index of the target event
140+
for j, evt := range room.Timeline.Events {
141+
if string(evt.ID) == filterAfterEventID {
142+
// Keep only events after this one
143+
room.Timeline.Events = room.Timeline.Events[j+1:]
144+
resp.Rooms.Join[roomID] = room
145+
logger.Debug().Str("user_id", string(userID)).Str("room_id", roomID.String()).Int("events_removed", j+1).Msg("matrix: filtered room timeline")
146+
foundEvent = true
147+
break
148+
}
149+
}
150+
if !foundEvent && len(room.Timeline.Events) > 0 {
151+
// Event not found in this timeline window - the requested event is older than what we got
152+
// Keep all events since they're all newer than the requested event
153+
logger.Debug().Str("user_id", string(userID)).Str("room_id", roomID.String()).Msg("matrix: filter event not found in timeline, keeping all events")
154+
}
155+
}
156+
}
157+
112158
logger.Debug().Str("user_id", string(userID)).Int("rooms", len(resp.Rooms.Join)).Msg("matrix: sync completed")
113159
return resp, nil
114160
}

service/messages.go

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ func (s *MessageService) SendMessage(ctx context.Context, req *models.SendMessag
122122

123123
// FetchMessages translates Matrix /sync into the Acrobits fetch_messages response.
124124
func (s *MessageService) FetchMessages(ctx context.Context, req *models.FetchMessagesRequest) (*models.FetchMessagesResponse, error) {
125-
// Debug full request
126125
logger.Debug().Interface("request", req).Msg("fetch messages request received")
126+
127127
// The user to impersonate is taken from the 'Username' field.
128128
userID := s.resolveMatrixUser(strings.TrimSpace(req.Username))
129129
if userID == "" {
@@ -132,27 +132,27 @@ func (s *MessageService) FetchMessages(ctx context.Context, req *models.FetchMes
132132
}
133133

134134
since := req.LastID
135-
var filterAfterEventID string
135+
filterAfterEventID := ""
136136

137137
// Acrobits might send a Matrix Event ID (starts with $) as last_id.
138138
// Matrix Sync requires a stream token (usually starts with s).
139139
// If we get an Event ID, we must perform an initial sync (empty since)
140-
// and manually filter the results to return only messages after that event.
140+
// and let the Matrix client filter the results to return only messages after that event.
141141
if strings.HasPrefix(since, "$") {
142-
logger.Debug().Str("last_id", since).Msg("received event ID as last_id, performing initial sync and filtering")
142+
logger.Debug().Str("last_id", since).Msg("received event ID as last_id, performing initial sync with event filtering")
143143
filterAfterEventID = since
144144
since = ""
145145
}
146146

147147
logger.Debug().Str("user_id", string(userID)).Str("since", since).Msg("syncing messages from matrix")
148148

149-
resp, err := s.matrixClient.Sync(ctx, userID, since)
149+
resp, err := s.matrixClient.Sync(ctx, userID, since, filterAfterEventID)
150150
if err != nil {
151151
// If the token is invalid (e.g. expired or from a different session), retry with a full sync.
152152
if strings.Contains(err.Error(), "Invalid stream token") || strings.Contains(err.Error(), "M_UNKNOWN") {
153153
logger.Warn().Err(err).Msg("invalid stream token, retrying with full sync")
154154
since = ""
155-
resp, err = s.matrixClient.Sync(ctx, userID, since)
155+
resp, err = s.matrixClient.Sync(ctx, userID, since, filterAfterEventID)
156156
}
157157
}
158158
if err != nil {
@@ -166,21 +166,7 @@ func (s *MessageService) FetchMessages(ctx context.Context, req *models.FetchMes
166166
callerIdentifier := s.resolveMatrixIDToIdentifier(string(userID))
167167

168168
for _, room := range resp.Rooms.Join {
169-
// If we are filtering by event ID, check if the event is in this room's timeline.
170-
// If it is, we only want events AFTER it.
171-
// If it's not, we assume the event is older than the timeline window, so we take all events.
172-
startIndex := 0
173-
if filterAfterEventID != "" {
174-
for i, evt := range room.Timeline.Events {
175-
if string(evt.ID) == filterAfterEventID {
176-
startIndex = i + 1
177-
break
178-
}
179-
}
180-
}
181-
182-
for i := startIndex; i < len(room.Timeline.Events); i++ {
183-
evt := room.Timeline.Events[i]
169+
for _, evt := range room.Timeline.Events {
184170
if evt.Type != event.EventMessage {
185171
continue
186172
}
@@ -191,7 +177,7 @@ func (s *MessageService) FetchMessages(ctx context.Context, req *models.FetchMes
191177
isSent := isSentBy(senderMatrixID, string(userID))
192178

193179
// Remap sender to identifier (e.g. "202" or "91201")
194-
msg.Sender = s.resolveMatrixIDToIdentifier(senderMatrixID)
180+
msg.Sender = string(s.resolveMatrixUser(senderMatrixID))
195181

196182
// Determine Recipient
197183
if isSent {

0 commit comments

Comments
 (0)