Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions frontend/src/lib/types/pending.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ export interface PendingDetection {
source: string;
/** Raw source ID for filtering detections by active stream */
sourceID: string;
/** Number of inference hits accumulated for this pending detection */
hitCount?: number;
}
47 changes: 47 additions & 0 deletions frontend/src/lib/utils/detectionOverlay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,53 @@ describe('diffPendingSnapshot', () => {
const newDetections = diffPendingSnapshot([], curr, 'src1');
expect(newDetections).toHaveLength(1);
});

it('detects increased hitCount as new activity', () => {
const prev = [
{
species: 'Blue Tit',
sourceID: 'src1',
firstDetected: 100,
status: 'active' as const,
hitCount: 3,
},
];
const curr = [
{
species: 'Blue Tit',
sourceID: 'src1',
firstDetected: 100,
status: 'active' as const,
hitCount: 5,
},
];
const newDetections = diffPendingSnapshot(prev, curr, 'src1');
expect(newDetections).toHaveLength(1);
expect(newDetections[0].species).toBe('Blue Tit');
});

it('ignores unchanged hitCount', () => {
const prev = [
{
species: 'Blue Tit',
sourceID: 'src1',
firstDetected: 100,
status: 'active' as const,
hitCount: 3,
},
];
const curr = [
{
species: 'Blue Tit',
sourceID: 'src1',
firstDetected: 100,
status: 'active' as const,
hitCount: 3,
},
];
const newDetections = diffPendingSnapshot(prev, curr, 'src1');
expect(newDetections).toHaveLength(0);
});
});

describe('shouldDedup', () => {
Expand Down
18 changes: 13 additions & 5 deletions frontend/src/lib/utils/detectionOverlay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,32 @@ interface PendingEntry {
sourceID: string;
firstDetected: number;
status: 'active' | 'approved' | 'rejected';
hitCount?: number;
}

const DEDUP_INTERVAL_SECONDS = 6;

/**
* Diff two pending snapshots, returning newly appeared species for a given source.
* Diff two pending snapshots, returning species with new activity for a given source.
* Returns species that are newly appeared OR have an increased hitCount (new inference hit).
* Filters by sourceID and ignores rejected status.
*/
export function diffPendingSnapshot(
prev: PendingEntry[],
curr: PendingEntry[],
activeSourceID: string
): PendingEntry[] {
const prevSpecies = new Set(prev.filter(d => d.sourceID === activeSourceID).map(d => d.species));

return curr.filter(
d => d.sourceID === activeSourceID && d.status !== 'rejected' && !prevSpecies.has(d.species)
const prevBySpecies = new Map(
prev.filter(d => d.sourceID === activeSourceID).map(d => [d.species, d])
);

return curr.filter(d => {
if (d.sourceID !== activeSourceID || d.status === 'rejected') return false;
const prevEntry = prevBySpecies.get(d.species);
if (!prevEntry) return true; // New species
// Existing species with increased hit count means a new inference hit
return (d.hitCount ?? 0) > (prevEntry.hitCount ?? 0);
});
}

/**
Expand Down
82 changes: 64 additions & 18 deletions internal/analysis/processor/pending_broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,34 @@ type SSEPendingDetection struct {
FirstDetected int64 `json:"firstDetected"` // Unix timestamp (seconds)
Source string `json:"source"` // Source display name
SourceID string `json:"sourceID"` // Raw source ID for client-side filtering
HitCount int `json:"hitCount"` // Number of inference hits accumulated
}

// sortPendingSnapshot sorts a pending detection snapshot by FirstDetected
// (oldest first), with species name and source ID as tie-breakers for determinism.
// This ordering is required by pendingSnapshotChanged which does index-based comparison.
func sortPendingSnapshot(s []SSEPendingDetection) {
slices.SortFunc(s, func(a, b SSEPendingDetection) int {
if a.FirstDetected != b.FirstDetected {
if a.FirstDetected < b.FirstDetected {
return -1
}
return 1
}
if a.Species != b.Species {
if a.Species < b.Species {
return -1
}
return 1
}
if a.SourceID < b.SourceID {
return -1
}
if a.SourceID > b.SourceID {
return 1
}
return 0
})
Comment on lines +42 to +62
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current sorting logic in sortPendingSnapshot is not fully deterministic for the comparison performed in pendingSnapshotChanged. The comparison function checks SourceID, but the sort function does not use it as a tie-breaker.

If two detections have the same FirstDetected and Species but different SourceIDs, their relative order is not guaranteed. This can cause pendingSnapshotChanged to incorrectly report a change when the order of items happens to differ between snapshots, even if the set of items is identical.

To ensure a stable comparison, you should also sort by SourceID as a final tie-breaker. I've also taken the liberty to simplify the comparison logic slightly for better readability.

slices.SortFunc(s, func(a, b SSEPendingDetection) int {
		if a.FirstDetected < b.FirstDetected {
			return -1
		}
		if a.FirstDetected > b.FirstDetected {
			return 1
		}
		if a.Species < b.Species {
			return -1
		}
		if a.Species > b.Species {
			return 1
		}
		if a.SourceID < b.SourceID {
			return -1
		}
		if a.SourceID > b.SourceID {
			return 1
		}
		return 0
	})
References
  1. This aligns with the principle of ensuring deterministic ordering through explicit sorting to prevent non-deterministic behavior, as seen in the rule for thermal zones. Explicitly sorting by all relevant fields, including SourceID as a tie-breaker, ensures consistent comparisons and avoids spurious change detections.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in latest push — added SourceID as third-level tie-breaker in sortPendingSnapshot.

Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// CalculateVisibilityThreshold computes the minimum hit count for a pending
Expand Down Expand Up @@ -68,27 +96,12 @@ func (p *Processor) SnapshotVisiblePending(minDetections int) []SSEPendingDetect
FirstDetected: item.CreatedAt.Unix(),
Source: p.getDisplayNameForSource(item.Source),
SourceID: item.Source,
HitCount: item.Count,
})
}
p.pendingMutex.RUnlock()

// Sort by FirstDetected (oldest first) for stable ordering across broadcasts.
slices.SortFunc(result, func(a, b SSEPendingDetection) int {
if a.FirstDetected != b.FirstDetected {
if a.FirstDetected < b.FirstDetected {
return -1
}
return 1
}
// Tie-break by species name for determinism.
if a.Species < b.Species {
return -1
}
if a.Species > b.Species {
return 1
}
return 0
})
sortPendingSnapshot(result)

return result
}
Expand All @@ -107,7 +120,9 @@ func (p *Processor) getThumbnailURL(scientificName string) string {
}

// broadcastPendingSnapshot broadcasts a pending detection snapshot via the
// PendingBroadcaster callback. If no broadcaster is set, this is a no-op.
// PendingBroadcaster callback only when the snapshot differs from the last
// broadcast (new species, removed species, or updated hit counts).
// If no broadcaster is set, this is a no-op.
func (p *Processor) broadcastPendingSnapshot(snapshot []SSEPendingDetection) {
p.pendingBroadcasterMu.RLock()
broadcaster := p.PendingBroadcaster
Comment on lines 120 to 128

This comment was marked as outdated.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — added SourceID as tie-breaker in sortPendingSnapshot.

Expand All @@ -117,6 +132,19 @@ func (p *Processor) broadcastPendingSnapshot(snapshot []SSEPendingDetection) {
return
}

// Skip broadcast if snapshot is identical to the last one sent.
// This prevents spamming SSE clients with repeated messages when
// no new predictions arrived for any visible pending species.
p.lastBroadcastSnapshotMu.Lock()
if !pendingSnapshotChanged(p.lastBroadcastSnapshot, snapshot) {
p.lastBroadcastSnapshotMu.Unlock()
return
}
// Store a copy so subsequent comparisons are independent.
p.lastBroadcastSnapshot = make([]SSEPendingDetection, len(snapshot))
copy(p.lastBroadcastSnapshot, snapshot)
p.lastBroadcastSnapshotMu.Unlock()

broadcaster(snapshot)
}

Expand All @@ -131,7 +159,25 @@ func (p *Processor) buildFlushNotification(item *PendingDetection, status Pendin
FirstDetected: item.CreatedAt.Unix(),
Source: p.getDisplayNameForSource(item.Source),
SourceID: item.Source,
HitCount: item.Count,
}
}

// pendingSnapshotChanged reports whether two sorted pending snapshots differ
// in species composition, hit counts, or status.
func pendingSnapshotChanged(prev, curr []SSEPendingDetection) bool {
if len(prev) != len(curr) {
return true
}
for i := range prev {
if prev[i].Species != curr[i].Species ||
prev[i].SourceID != curr[i].SourceID ||
prev[i].HitCount != curr[i].HitCount ||
prev[i].Status != curr[i].Status {
return true
}
}
return false
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// logPendingBroadcast logs pending broadcast activity at debug level.
Expand Down
141 changes: 141 additions & 0 deletions internal/analysis/processor/pending_broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,144 @@ func TestBuildFlushNotification_UsesCreatedAt(t *testing.T) {
assert.Equal(t, realCreationTime.Unix(), notification.FirstDetected,
"Flush notification should use CreatedAt for display timestamp")
}

func TestSnapshotVisiblePending_IncludesHitCount(t *testing.T) {
t.Parallel()

p := &Processor{
Settings: &conf.Settings{},
pendingDetections: map[string]PendingDetection{
"src1:species_a": {
Detection: Detections{
Result: detection.Result{
Species: detection.Species{
CommonName: "Species A",
ScientificName: "Genus speciesA",
},
},
},
Source: "src1",
CreatedAt: time.Date(2026, 3, 7, 10, 0, 13, 0, time.UTC),
Count: 7,
},
},
}

result := p.SnapshotVisiblePending(4) // threshold = 2, count=7 passes
require.Len(t, result, 1)
assert.Equal(t, 7, result[0].HitCount, "HitCount should match pending detection Count")
}

func TestBuildFlushNotification_IncludesHitCount(t *testing.T) {
t.Parallel()

p := &Processor{Settings: &conf.Settings{}}
item := &PendingDetection{
Detection: Detections{
Result: detection.Result{
Species: detection.Species{
CommonName: "Test Bird",
ScientificName: "Testus birdus",
},
},
},
Source: "src1",
CreatedAt: time.Date(2026, 3, 7, 10, 0, 0, 0, time.UTC),
Count: 5,
}

notif := p.buildFlushNotification(item, PendingStatusApproved)
assert.Equal(t, 5, notif.HitCount, "Flush notification should include HitCount")
}

func TestPendingSnapshotChanged(t *testing.T) {
t.Parallel()

tests := []struct {
name string
prev []SSEPendingDetection
curr []SSEPendingDetection
changed bool
}{
{
name: "both_empty",
prev: []SSEPendingDetection{},
curr: []SSEPendingDetection{},
changed: false,
},
{
name: "identical_snapshots",
prev: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
changed: false,
},
{
name: "new_species_added",
prev: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
{Species: "Great Tit", SourceID: "src1", HitCount: 2, Status: PendingStatusActive},
},
changed: true,
},
{
name: "species_removed",
prev: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
{Species: "Great Tit", SourceID: "src1", HitCount: 2, Status: PendingStatusActive},
},
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
changed: true,
},
{
name: "hit_count_increased",
prev: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 4, Status: PendingStatusActive},
},
changed: true,
},
{
name: "status_changed",
prev: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusActive},
},
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 3, Status: PendingStatusApproved},
},
changed: true,
},
{
name: "nil_prev_empty_curr",
prev: nil,
curr: []SSEPendingDetection{},
changed: false,
},
{
name: "nil_prev_with_curr",
prev: nil,
curr: []SSEPendingDetection{
{Species: "Blue Tit", SourceID: "src1", HitCount: 2, Status: PendingStatusActive},
},
changed: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
result := pendingSnapshotChanged(tt.prev, tt.curr)
assert.Equal(t, tt.changed, result)
})
}
}
14 changes: 10 additions & 4 deletions internal/analysis/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ type Processor struct {
sseBroadcasterMutex sync.RWMutex // Mutex to protect SSE broadcaster access

// Pending detection broadcast fields
PendingBroadcaster func(snapshot []SSEPendingDetection) // Function to broadcast pending detections via SSE
pendingBroadcasterMu sync.RWMutex // Mutex to protect PendingBroadcaster access
pendingFlushNotifs []SSEPendingDetection // Terminal-state notifications from last flush cycle
pendingFlushNotifsMu sync.Mutex // Mutex to protect pendingFlushNotifs
PendingBroadcaster func(snapshot []SSEPendingDetection) // Function to broadcast pending detections via SSE
pendingBroadcasterMu sync.RWMutex // Mutex to protect PendingBroadcaster access
pendingFlushNotifs []SSEPendingDetection // Terminal-state notifications from last flush cycle
pendingFlushNotifsMu sync.Mutex // Mutex to protect pendingFlushNotifs
lastBroadcastSnapshot []SSEPendingDetection // Last broadcast snapshot for change detection
lastBroadcastSnapshotMu sync.Mutex // Mutex to protect lastBroadcastSnapshot

// Backup system fields (optional)
backupManager any // Use interface{} to avoid import cycle
Expand Down Expand Up @@ -1333,11 +1335,15 @@ func (p *Processor) flushPendingDetections(minDetections int) (pendingCount, flu
Status: PendingStatusActive,
FirstDetected: item.CreatedAt.Unix(),
Source: p.getDisplayNameForSource(item.Source),
SourceID: item.Source,
HitCount: item.Count,
})
}
}
Comment on lines 1335 to 1342

This comment was marked as outdated.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e152e50 — extracted sortPendingSnapshot helper and applied it to the flusher's broadcast snapshot before calling broadcastPendingSnapshot.

logPendingBroadcast(len(broadcastSnapshot), len(terminalNotifs))
broadcastSnapshot = append(broadcastSnapshot, terminalNotifs...)
// Sort for stable comparison in broadcastPendingSnapshot.
sortPendingSnapshot(broadcastSnapshot)
}

p.pendingMutex.Unlock()
Expand Down
Loading