@@ -3,9 +3,9 @@ package poll
33
44import (
55 "context"
6- "errors"
76 "fmt"
87 "log/slog"
8+ "sync"
99 "time"
1010
1111 "advrider-notifier/pkg/notifier"
@@ -36,6 +36,7 @@ type Monitor struct {
3636 emailer Emailer
3737 logger * slog.Logger
3838 cycleNumber int
39+ pollMutex sync.Mutex // Prevents concurrent polling
3940}
4041
4142// New creates a new poll monitor.
@@ -49,7 +50,15 @@ func New(scraper Scraper, store Store, emailer Emailer, logger *slog.Logger) *Mo
4950}
5051
5152// CheckAll checks all subscriptions for new posts.
53+ // This function is protected by a mutex to prevent concurrent polling.
5254func (m * Monitor ) CheckAll (ctx context.Context ) error {
55+ // Try to acquire the lock - if already polling, skip this cycle
56+ if ! m .pollMutex .TryLock () {
57+ m .logger .Warn ("Poll cycle already in progress - skipping this invocation" )
58+ return nil
59+ }
60+ defer m .pollMutex .Unlock ()
61+
5362 m .cycleNumber ++
5463 cycleStart := time .Now ()
5564
@@ -284,6 +293,13 @@ func (m *Monitor) checkThreadForSubscribers(ctx context.Context, info *threadChe
284293 // First check for this subscriber - just record the latest post ID
285294 if thread .LastPostID == "" {
286295 thread .LastPostID = latestPost .ID
296+
297+ m .logger .Info ("Saving initial state for subscriber" ,
298+ "cycle" , m .cycleNumber ,
299+ "email" , email ,
300+ "thread_url" , threadURL ,
301+ "post_id" , latestPost .ID )
302+
287303 if err := m .store .Save (ctx , sub ); err != nil {
288304 m .logger .Error ("Failed to save initial state for subscriber" ,
289305 "cycle" , m .cycleNumber ,
@@ -373,6 +389,12 @@ func (m *Monitor) checkThreadForSubscribers(ctx context.Context, info *threadChe
373389 // Update last post ID after successful notification
374390 thread .LastPostID = latestPost .ID
375391
392+ m .logger .Info ("Saving state after successful notification" ,
393+ "cycle" , m .cycleNumber ,
394+ "email" , email ,
395+ "thread_url" , threadURL ,
396+ "new_last_post_id" , latestPost .ID )
397+
376398 // CRITICAL: Save immediately to prevent duplicate notifications if server crashes
377399 if err := m .store .Save (ctx , sub ); err != nil {
378400 m .logger .Error ("CRITICAL: Notification sent but failed to save state - subscriber may get duplicate notification next cycle" ,
@@ -395,149 +417,33 @@ func (m *Monitor) checkThreadForSubscribers(ctx context.Context, info *threadChe
395417 hasUpdates = true
396418 } else {
397419 // No new posts for this subscriber - just save state to update LastPolledAt
420+ m .logger .Info ("No new posts - saving state" ,
421+ "cycle" , m .cycleNumber ,
422+ "email" , email ,
423+ "thread_url" , threadURL ,
424+ "thread_title" , thread .ThreadTitle )
425+
398426 if err := m .store .Save (ctx , sub ); err != nil {
399- m .logger .Warn ("Failed to save state (no new posts)" ,
427+ m .logger .Error ("Failed to save state (no new posts)" ,
400428 "cycle" , m .cycleNumber ,
401429 "email" , email ,
402430 "thread_url" , threadURL ,
403431 "thread_title" , thread .ThreadTitle ,
404432 "error" , err )
405433 } else {
406434 savedEmails [email ] = true
435+ m .logger .Info ("State saved successfully (no new posts)" ,
436+ "cycle" , m .cycleNumber ,
437+ "email" , email ,
438+ "thread_url" , threadURL ,
439+ "thread_title" , thread .ThreadTitle )
407440 }
408441 }
409442 }
410443
411444 return hasUpdates , savedEmails , nil
412445}
413446
414- // getFirstKey returns the first key from a map (for logging purposes).
415- func getFirstKey (m map [string ][]* notifier.Post ) string {
416- for k := range m {
417- return k
418- }
419- return ""
420- }
421-
422- func (m * Monitor ) checkThread (ctx context.Context , sub * notifier.Subscription , threadID string , thread * notifier.Thread , cache map [string ][]* notifier.Post , now time.Time ) error {
423- m .logger .Info ("Starting thread check" ,
424- "email" , sub .Email ,
425- "thread_id" , threadID ,
426- "thread_url" , thread .ThreadURL ,
427- "last_post_id" , thread .LastPostID )
428-
429- // Check cache first to avoid redundant fetches
430- posts , ok := cache [thread .ThreadURL ]
431- if ! ok {
432- // Use smart fetch to get posts efficiently
433- var title string
434- var err error
435- posts , title , err = m .scraper .SmartFetch (ctx , thread .ThreadURL , thread .LastPostID )
436- if err != nil {
437- return fmt .Errorf ("fetch thread page: %w" , err )
438- }
439- cache [thread .ThreadURL ] = posts
440-
441- // Update thread title if not set
442- if thread .ThreadTitle == "" {
443- thread .ThreadTitle = title
444- m .logger .Info ("Thread title captured" , "title" , title )
445- }
446- }
447-
448- // Update last polled time
449- thread .LastPolledAt = now
450-
451- if len (posts ) == 0 {
452- return errors .New ("no posts found in thread" )
453- }
454-
455- latestPost := posts [len (posts )- 1 ]
456-
457- // Parse the timestamp of the latest post
458- if latestPost .Timestamp != "" {
459- if postTime , err := time .Parse (time .RFC3339 , latestPost .Timestamp ); err == nil {
460- thread .LastPostTime = postTime
461- }
462- }
463-
464- m .logger .Info ("Posts fetched for comparison" ,
465- "total_posts" , len (posts ),
466- "first_post_id" , posts [0 ].ID ,
467- "latest_post_id" , latestPost .ID ,
468- "last_seen_post_id" , thread .LastPostID ,
469- "last_post_time" , thread .LastPostTime .Format (time .RFC3339 ))
470-
471- if thread .LastPostID == "" {
472- // First check - just record the latest post ID and times
473- thread .LastPostID = latestPost .ID
474- if err := m .store .Save (ctx , sub ); err != nil {
475- return fmt .Errorf ("save subscription: %w" , err )
476- }
477- m .logger .Info ("Initial post ID recorded" , "email" , sub .Email , "thread_id" , threadID , "post_id" , latestPost .ID , "title" , thread .ThreadTitle )
478- return nil
479- }
480-
481- // Find all new posts since LastPostID
482- var newPosts []* notifier.Post
483- foundLast := false
484- for i , post := range posts {
485- if foundLast {
486- newPosts = append (newPosts , post )
487- m .logger .Debug ("Found new post" , "index" , i , "post_id" , post .ID , "author" , post .Author )
488- }
489- if post .ID == thread .LastPostID {
490- foundLast = true
491- m .logger .Info ("Found last seen post" , "index" , i , "post_id" , post .ID )
492- }
493- }
494-
495- if ! foundLast && thread .LastPostID != "" {
496- m .logger .Warn ("Last seen post ID not found in fetched posts - possible gap or old post" ,
497- "last_seen_post_id" , thread .LastPostID ,
498- "posts_fetched" , len (posts ),
499- "first_fetched_id" , posts [0 ].ID ,
500- "last_fetched_id" , latestPost .ID )
501- // Treat all fetched posts as new (safer than missing posts)
502- newPosts = posts
503- }
504-
505- if len (newPosts ) > 0 {
506- // Apply safety limit - only send the most recent maxPostsPerEmail posts
507- if len (newPosts ) > maxPostsPerEmail {
508- m .logger .Warn ("Too many new posts, limiting to most recent" ,
509- "email" , sub .Email ,
510- "thread_id" , threadID ,
511- "total_new" , len (newPosts ),
512- "sending" , maxPostsPerEmail )
513- newPosts = newPosts [len (newPosts )- maxPostsPerEmail :]
514- }
515-
516- m .logger .Info ("New posts detected" ,
517- "email" , sub .Email ,
518- "thread_id" , threadID ,
519- "count" , len (newPosts ),
520- "latest_post_id" , latestPost .ID ,
521- "previous" , thread .LastPostID )
522-
523- if err := m .emailer .SendNotification (ctx , sub , thread , newPosts ); err != nil {
524- return fmt .Errorf ("send email: %w" , err )
525- }
526-
527- thread .LastPostID = latestPost .ID
528- if err := m .store .Save (ctx , sub ); err != nil {
529- return fmt .Errorf ("save subscription: %w" , err )
530- }
531- } else {
532- // No new posts, but still save to update LastPolledAt and LastPostTime
533- if err := m .store .Save (ctx , sub ); err != nil {
534- return fmt .Errorf ("save subscription: %w" , err )
535- }
536- }
537-
538- return nil
539- }
540-
541447// calculateInterval determines how often to poll a thread based on activity.
542448func calculateInterval (lastPostTime , lastPolledAt time.Time ) time.Duration {
543449 // If never polled or never seen a post, poll now
0 commit comments