-
-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathpending_broadcast.go
More file actions
191 lines (171 loc) · 6.92 KB
/
pending_broadcast.go
File metadata and controls
191 lines (171 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package processor
import (
"slices"
"github.com/tphakala/birdnet-go/internal/imageprovider"
"github.com/tphakala/birdnet-go/internal/logger"
)
// Pending broadcast constants.
const (
// pendingBroadcastBufferSize is the channel buffer for pending detection broadcasts.
pendingBroadcastBufferSize = 10
)
// PendingDetectionStatus represents the lifecycle state of a pending detection.
type PendingDetectionStatus string
const (
PendingStatusActive PendingDetectionStatus = "active"
PendingStatusApproved PendingDetectionStatus = "approved"
PendingStatusRejected PendingDetectionStatus = "rejected"
)
// SSEPendingDetection is the lightweight DTO sent over SSE for pending detections.
// It contains only the fields needed for the "currently hearing" dashboard card.
type SSEPendingDetection struct {
Species string `json:"species"` // Common name
ScientificName string `json:"scientificName"` // Scientific name
Thumbnail string `json:"thumbnail"` // Bird image URL
Status PendingDetectionStatus `json:"status"` // "active", "approved", "rejected"
FirstDetected int64 `json:"firstDetected"` // Unix timestamp (seconds)
Source string `json:"source"` // Source display name
SourceID string `json:"sourceID"` // Raw source ID for client-side filtering
HitCount int `json:"hitCount"` // Number of inference hits accumulated
}
// sortPendingSnapshot sorts a pending detection snapshot by FirstDetected
// (oldest first), with species name and source ID as tie-breakers for determinism.
// This ordering is required by pendingSnapshotChanged which does index-based comparison.
func sortPendingSnapshot(s []SSEPendingDetection) {
slices.SortFunc(s, func(a, b SSEPendingDetection) int {
if a.FirstDetected != b.FirstDetected {
if a.FirstDetected < b.FirstDetected {
return -1
}
return 1
}
if a.Species != b.Species {
if a.Species < b.Species {
return -1
}
return 1
}
if a.SourceID < b.SourceID {
return -1
}
if a.SourceID > b.SourceID {
return 1
}
return 0
})
}
// CalculateVisibilityThreshold computes the minimum hit count for a pending
// detection to be visible in the "currently hearing" card.
// It returns 25% of minDetections, floored at 2, but never exceeds minDetections.
// Without the cap, detections could be approved (flushed) before ever becoming
// visible — e.g. when minDetections=1 (level 0, no filtering) the old floor of 2
// meant single-hit detections bypassed "currently hearing" entirely.
func CalculateVisibilityThreshold(minDetections int) int {
threshold := minDetections / 4
threshold = max(2, threshold)
// Never exceed minDetections: approved detections must always be visible first.
return min(threshold, minDetections)
}
// SnapshotVisiblePending returns all pending detections that have accumulated
// enough hits to pass the visibility threshold. Results have status "active".
// The caller must NOT hold pendingMutex.
func (p *Processor) SnapshotVisiblePending(minDetections int) []SSEPendingDetection {
threshold := CalculateVisibilityThreshold(minDetections)
p.pendingMutex.RLock()
result := make([]SSEPendingDetection, 0, len(p.pendingDetections))
for key := range p.pendingDetections {
item := p.pendingDetections[key]
if item.Count < threshold {
continue
}
result = append(result, SSEPendingDetection{
Species: item.Detection.Result.Species.CommonName,
ScientificName: item.Detection.Result.Species.ScientificName,
Thumbnail: p.getThumbnailURL(item.Detection.Result.Species.ScientificName),
Status: PendingStatusActive,
FirstDetected: item.CreatedAt.Unix(),
Source: p.getDisplayNameForSource(item.Source),
SourceID: item.Source,
HitCount: item.Count,
})
}
p.pendingMutex.RUnlock()
sortPendingSnapshot(result)
return result
}
// getThumbnailURL returns the thumbnail URL for a species from the bird image cache.
// Returns empty string if the cache is unavailable or the species has no image.
func (p *Processor) getThumbnailURL(scientificName string) string {
if p.BirdImageCache == nil {
return ""
}
img, err := p.BirdImageCache.Get(scientificName)
if err != nil || img.IsNegativeEntry() || img.URL == "" {
return ""
}
return imageprovider.ProxyImageURL(scientificName)
}
// broadcastPendingSnapshot broadcasts a pending detection snapshot via the
// PendingBroadcaster callback only when the snapshot differs from the last
// broadcast (new species, removed species, or updated hit counts).
// If no broadcaster is set, this is a no-op.
func (p *Processor) broadcastPendingSnapshot(snapshot []SSEPendingDetection) {
p.pendingBroadcasterMu.RLock()
broadcaster := p.PendingBroadcaster
p.pendingBroadcasterMu.RUnlock()
if broadcaster == nil {
return
}
// Skip broadcast if snapshot is identical to the last one sent.
// This prevents spamming SSE clients with repeated messages when
// no new predictions arrived for any visible pending species.
p.lastBroadcastSnapshotMu.Lock()
if !pendingSnapshotChanged(p.lastBroadcastSnapshot, snapshot) {
p.lastBroadcastSnapshotMu.Unlock()
return
}
// Store a copy so subsequent comparisons are independent.
p.lastBroadcastSnapshot = make([]SSEPendingDetection, len(snapshot))
copy(p.lastBroadcastSnapshot, snapshot)
p.lastBroadcastSnapshotMu.Unlock()
broadcaster(snapshot)
}
// buildFlushNotification creates an SSEPendingDetection with terminal status
// for a detection that has been flushed (approved or rejected).
func (p *Processor) buildFlushNotification(item *PendingDetection, status PendingDetectionStatus) SSEPendingDetection {
return SSEPendingDetection{
Species: item.Detection.Result.Species.CommonName,
ScientificName: item.Detection.Result.Species.ScientificName,
Thumbnail: p.getThumbnailURL(item.Detection.Result.Species.ScientificName),
Status: status,
FirstDetected: item.CreatedAt.Unix(),
Source: p.getDisplayNameForSource(item.Source),
SourceID: item.Source,
HitCount: item.Count,
}
}
// pendingSnapshotChanged reports whether two sorted pending snapshots differ
// in species composition, hit counts, or status.
func pendingSnapshotChanged(prev, curr []SSEPendingDetection) bool {
if len(prev) != len(curr) {
return true
}
for i := range prev {
if prev[i].Species != curr[i].Species ||
prev[i].SourceID != curr[i].SourceID ||
prev[i].HitCount != curr[i].HitCount ||
prev[i].Status != curr[i].Status {
return true
}
}
return false
}
// logPendingBroadcast logs pending broadcast activity at debug level.
func logPendingBroadcast(activeCount, terminalCount int) {
if activeCount > 0 || terminalCount > 0 {
GetLogger().Debug("Broadcasting pending detections",
logger.Int("active", activeCount),
logger.Int("terminal", terminalCount),
logger.String("operation", "pending_broadcast"))
}
}