Skip to content

Commit efa41c7

Browse files
committed
test impl for gap filling
Signed-off-by: AmarnathCJD <[email protected]>
1 parent 7aa92b7 commit efa41c7

File tree

2 files changed

+153
-9
lines changed

2 files changed

+153
-9
lines changed

telegram/formatting.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ func InsertTagsIntoText(text string, tags []Tag) string {
263263
closeTags[int(tag.Offset+tag.Length)] = append(closeTags[int(tag.Offset+tag.Length)], tag)
264264
}
265265

266-
result := make([]uint16, len(utf16Text))
266+
result := make([]uint16, 0, len(utf16Text)*2)
267267
for i := 0; i < len(utf16Text); i++ {
268268
if opening, exists := openTags[i]; exists {
269269
for _, tag := range opening {

telegram/updates.go

Lines changed: 152 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ type UpdateDispatcher struct {
288288
sortTrigger chan any
289289
logger *utils.Logger
290290
openChats map[int64]*openChat
291+
nextUpdatesDeadline time.Time
292+
currentPts int32
291293
}
292294

293295
// creates and populates a new UpdateDispatcher
@@ -1180,22 +1182,32 @@ func (c *Client) AddRawHandler(updateType Update, handler RawHandler) Handle {
11801182
// Sort and Handle all the Incoming Updates
11811183
// Many more types to be added
11821184
func HandleIncomingUpdates(u any, c *Client) bool {
1185+
c.dispatcher.nextUpdatesDeadline = time.Now().Add(time.Minute * 15)
1186+
11831187
UpdateTypeSwitching:
11841188
switch upd := u.(type) {
11851189
case *UpdatesObj:
11861190
go c.Cache.UpdatePeersToCache(upd.Users, upd.Chats)
11871191
for _, update := range upd.Updates {
11881192
switch update := update.(type) {
11891193
case *UpdateNewMessage:
1190-
go c.handleMessageUpdate(update.Message)
1194+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1195+
go c.handleMessageUpdate(update.Message)
1196+
}
11911197
case *UpdateNewChannelMessage:
1192-
go c.handleMessageUpdate(update.Message)
1198+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1199+
go c.handleMessageUpdate(update.Message)
1200+
}
11931201
case *UpdateNewScheduledMessage:
11941202
go c.handleMessageUpdate(update.Message)
11951203
case *UpdateEditMessage:
1196-
go c.handleEditUpdate(update.Message)
1204+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1205+
go c.handleEditUpdate(update.Message)
1206+
}
11971207
case *UpdateEditChannelMessage:
1198-
go c.handleEditUpdate(update.Message)
1208+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1209+
go c.handleEditUpdate(update.Message)
1210+
}
11991211
case *UpdateBotInlineQuery:
12001212
go c.handleInlineUpdate(update)
12011213
case *UpdateBotCallbackQuery:
@@ -1205,9 +1217,13 @@ UpdateTypeSwitching:
12051217
case *UpdateChannelParticipant:
12061218
go c.handleParticipantUpdate(update)
12071219
case *UpdateDeleteChannelMessages:
1208-
go c.handleDeleteUpdate(update)
1220+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1221+
go c.handleDeleteUpdate(update)
1222+
}
12091223
case *UpdateDeleteMessages:
1210-
go c.handleDeleteUpdate(update)
1224+
if ok, _ := managePts(c, update.Pts, update.PtsCount); ok {
1225+
go c.handleDeleteUpdate(update)
1226+
}
12111227
case *UpdateBotInlineSend:
12121228
go c.handleInlineSendUpdate(update)
12131229
}
@@ -1230,17 +1246,145 @@ UpdateTypeSwitching:
12301246
case *UpdatesCombined:
12311247
u = upd.Updates
12321248
go c.Cache.UpdatePeersToCache(upd.Users, upd.Chats)
1233-
12341249
goto UpdateTypeSwitching
12351250
case *UpdatesTooLong:
12361251
c.Log.Debug("too many updates, forcing getDifference")
1237-
c.UpdatesGetState() //state, err := c.UpdatesGetState() // TODO: figure out the pts to call here
1252+
fetchUpdates(c) // updatesTooLong, too many updates, shall fetch manually
12381253
default:
12391254
c.Log.Debug("skipping unhanded update type: ", reflect.TypeOf(u), " with value: ", c.JSON(u))
12401255
}
12411256
return true
12421257
}
12431258

1259+
const GETDIFF_LIMIT = 1000
1260+
1261+
func fetchUpdates(c *Client) {
1262+
req := &UpdatesGetDifferenceParams{
1263+
Pts: c.dispatcher.currentPts,
1264+
PtsLimit: GETDIFF_LIMIT,
1265+
PtsTotalLimit: GETDIFF_LIMIT,
1266+
Date: int32(time.Now().Unix()),
1267+
Qts: 0,
1268+
QtsLimit: 0,
1269+
}
1270+
1271+
for {
1272+
c.Log.Debug("getting difference with pts: ", req.Pts, " and limit: ", req.PtsLimit)
1273+
updates, err := c.UpdatesGetDifference(req)
1274+
if err != nil {
1275+
c.Log.Error(errors.Wrap(err, "updates.dispatcher.getDifference"))
1276+
}
1277+
1278+
switch u := updates.(type) {
1279+
case *UpdatesDifferenceObj:
1280+
c.Cache.UpdatePeersToCache(u.Users, u.Chats)
1281+
for _, update := range u.NewMessages {
1282+
switch update.(type) {
1283+
case *MessageObj:
1284+
go c.handleMessageUpdate(update)
1285+
}
1286+
}
1287+
1288+
if len(u.OtherUpdates) > 0 {
1289+
HandleIncomingUpdates(UpdatesObj{Updates: u.OtherUpdates}, c)
1290+
}
1291+
1292+
c.dispatcher.Lock()
1293+
c.dispatcher.currentPts = u.State.Pts
1294+
c.dispatcher.Unlock()
1295+
return
1296+
case *UpdatesDifferenceSlice:
1297+
c.Cache.UpdatePeersToCache(u.Users, u.Chats)
1298+
for _, update := range u.NewMessages {
1299+
switch update.(type) {
1300+
case *MessageObj:
1301+
go c.handleMessageUpdate(update)
1302+
}
1303+
}
1304+
1305+
if len(u.OtherUpdates) > 0 {
1306+
HandleIncomingUpdates(UpdatesObj{Updates: u.OtherUpdates}, c)
1307+
}
1308+
1309+
c.dispatcher.Lock()
1310+
c.dispatcher.currentPts = u.IntermediateState.Pts
1311+
c.dispatcher.Unlock()
1312+
req.Pts = c.dispatcher.currentPts
1313+
case *UpdatesDifferenceTooLong:
1314+
c.dispatcher.currentPts = u.Pts
1315+
case *UpdatesDifferenceEmpty:
1316+
break
1317+
}
1318+
1319+
req.Pts = c.dispatcher.currentPts
1320+
}
1321+
}
1322+
1323+
func managePts(c *Client, pts int32, ptsCount int32) (bool, int) {
1324+
if c.dispatcher.currentPts+ptsCount == pts || c.dispatcher.currentPts == 0 {
1325+
c.dispatcher.Lock()
1326+
c.dispatcher.currentPts = pts
1327+
c.dispatcher.Unlock()
1328+
return true, 0
1329+
}
1330+
1331+
if c.dispatcher.currentPts+ptsCount < pts {
1332+
c.Log.Debug("pts is ahead, fetching difference to fill gap")
1333+
missing := pts - c.dispatcher.currentPts
1334+
updates, err := c.UpdatesGetDifference(&UpdatesGetDifferenceParams{
1335+
Pts: c.dispatcher.currentPts + ptsCount,
1336+
PtsLimit: missing,
1337+
PtsTotalLimit: missing,
1338+
Date: int32(time.Now().Unix()),
1339+
Qts: 0,
1340+
QtsLimit: 0,
1341+
})
1342+
1343+
c.Log.Debug("getting difference with pts: ", c.dispatcher.currentPts+ptsCount, " and limit: ", missing)
1344+
1345+
if err != nil {
1346+
c.Log.Error(errors.Wrap(err, "updates.dispatcher.getDifference"))
1347+
}
1348+
1349+
switch u := updates.(type) {
1350+
case *UpdatesDifferenceObj:
1351+
c.Cache.UpdatePeersToCache(u.Users, u.Chats)
1352+
for _, update := range u.NewMessages {
1353+
switch update.(type) {
1354+
case *MessageObj:
1355+
go c.handleMessageUpdate(update)
1356+
}
1357+
}
1358+
1359+
if len(u.OtherUpdates) > 0 {
1360+
HandleIncomingUpdates(UpdatesObj{Updates: u.OtherUpdates}, c)
1361+
}
1362+
1363+
c.dispatcher.Lock()
1364+
c.dispatcher.currentPts = u.State.Pts
1365+
c.dispatcher.Unlock()
1366+
case *UpdatesDifferenceSlice:
1367+
c.Cache.UpdatePeersToCache(u.Users, u.Chats)
1368+
for _, update := range u.NewMessages {
1369+
switch update.(type) {
1370+
case *MessageObj:
1371+
go c.handleMessageUpdate(update)
1372+
}
1373+
}
1374+
1375+
if len(u.OtherUpdates) > 0 {
1376+
HandleIncomingUpdates(UpdatesObj{Updates: u.OtherUpdates}, c)
1377+
}
1378+
1379+
c.dispatcher.Lock()
1380+
c.dispatcher.currentPts = u.IntermediateState.Pts
1381+
c.dispatcher.Unlock()
1382+
}
1383+
}
1384+
1385+
return false, int(pts - c.dispatcher.currentPts)
1386+
}
1387+
12441388
func (c *Client) GetDifference(Pts, Limit int32) (Message, error) {
12451389
c.Log.Debug("getting difference with pts: ", Pts, " and limit: ", Limit)
12461390

0 commit comments

Comments
 (0)