11package reminderscheduler
22
33import (
4+ "context"
45 "fmt"
56 "time"
67
8+ "github.com/keybase/go-keybase-chat-bot/kbchat/types/chat1"
79 "github.com/keybase/managed-bots/gcalbot/gcalbot"
10+ "github.com/keybase/pipeliner"
811)
912
1013func (r * ReminderScheduler ) sendReminderLoop (shutdownCh chan struct {}) error {
@@ -37,6 +40,10 @@ func (r *ReminderScheduler) sendReminderLoop(shutdownCh chan struct{}) error {
3740
3841func (r * ReminderScheduler ) sendReminders (sendMinute time.Time ) {
3942 timestamp := getReminderTimestamp (sendMinute , 0 )
43+ var sendTasks []struct {
44+ convID chat1.ConvIDStr
45+ message string
46+ }
4047 r .minuteReminders .ForEachReminderMessageInMinute (timestamp , func (msg * ReminderMessage ) {
4148 for duration := range msg .MinuteReminders {
4249 msgTimestamp := getReminderTimestamp (msg .StartTime , duration )
@@ -48,12 +55,17 @@ func (r *ReminderScheduler) sendReminders(sendMinute time.Time) {
4855 } else {
4956 eventSummary = "An event"
5057 }
58+ var message string
5159 if minutesBefore == 0 {
52- r . ChatEcho ( msg . KeybaseConvID , "%s is starting now: %s" , eventSummary , msg .MsgContent )
60+ message = fmt . Sprintf ( "%s is starting now: %s" , eventSummary , msg .MsgContent )
5361 } else {
54- r . ChatEcho ( msg . KeybaseConvID , "%s is starting in %s: %s" ,
62+ message = fmt . Sprintf ( "%s is starting in %s: %s" ,
5563 eventSummary , gcalbot .MinutesBeforeString (minutesBefore ), msg .MsgContent )
5664 }
65+ sendTasks = append (sendTasks , struct {
66+ convID chat1.ConvIDStr
67+ message string
68+ }{msg .KeybaseConvID , message })
5769 delete (msg .MinuteReminders , duration )
5870 r .stats .Count ("sendReminders - reminder" )
5971 }
@@ -65,9 +77,26 @@ func (r *ReminderScheduler) sendReminders(sendMinute time.Time) {
6577 }
6678 })
6779 r .minuteReminders .RemoveMinute (timestamp )
80+
81+ const sendWindow = 10
82+ ctx := context .Background ()
83+ pipe := pipeliner .NewPipeliner (sendWindow )
84+ worker := func (_ context.Context , i int ) error { // nolint:unparam
85+ t := sendTasks [i ]
86+ r .ChatEcho (t .convID , "%s" , t .message )
87+ return nil
88+ }
89+ for i := range sendTasks {
90+ if err := pipe .WaitForRoom (ctx ); err != nil {
91+ break
92+ }
93+ go func () { pipe .CompleteOne (worker (ctx , i )) }()
94+ }
95+ _ = pipe .Flush (ctx )
96+
6897 sendDuration := time .Since (sendMinute )
6998 if sendDuration .Seconds () > 15 {
70- r .Errorf ("sending reminders took %s" , sendDuration .String ())
99+ r .Errorf ("sending %d reminders took %s" , len ( sendTasks ) , sendDuration .String ())
71100 }
72101 r .stats .Value ("sendReminders - duration - seconds" , sendDuration .Seconds ())
73102}
0 commit comments