-
-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathprocessor.go
More file actions
2072 lines (1830 loc) · 84.5 KB
/
processor.go
File metadata and controls
2072 lines (1830 loc) · 84.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// processor.go
package processor
import (
"context"
"crypto/rand"
"fmt"
"math"
"os"
"os/exec"
"path/filepath"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/tphakala/birdnet-go/internal/analysis/jobqueue"
"github.com/tphakala/birdnet-go/internal/analysis/species"
"github.com/tphakala/birdnet-go/internal/birdnet"
"github.com/tphakala/birdnet-go/internal/birdweather"
"github.com/tphakala/birdnet-go/internal/conf"
"github.com/tphakala/birdnet-go/internal/datastore"
"github.com/tphakala/birdnet-go/internal/detection"
"github.com/tphakala/birdnet-go/internal/imageprovider"
"github.com/tphakala/birdnet-go/internal/logger"
"github.com/tphakala/birdnet-go/internal/mqtt"
"github.com/tphakala/birdnet-go/internal/myaudio"
"github.com/tphakala/birdnet-go/internal/notification"
"github.com/tphakala/birdnet-go/internal/observability"
"github.com/tphakala/birdnet-go/internal/privacy"
"github.com/tphakala/birdnet-go/internal/securefs"
"github.com/tphakala/birdnet-go/internal/spectrogram"
"github.com/tphakala/birdnet-go/internal/suncalc"
)
// Compile-time assertion to ensure *spectrogram.PreRenderer implements PreRendererSubmit
var _ PreRendererSubmit = (*spectrogram.PreRenderer)(nil)
// Species identification constants for filtering
const (
speciesDog = "dog"
speciesHuman = "human"
)
// DefaultFlushInterval is the interval for checking and flushing pending detections
const DefaultFlushInterval = 1 * time.Second
// Processor represents the main processing unit for audio analysis.
type Processor struct {
Settings *conf.Settings
Ds datastore.Interface // Legacy - to be removed after migration
Repo datastore.DetectionRepository // New - preferred for detection operations
Bn *birdnet.BirdNET
log logger.Logger // Logger inherited from analysis package with "processor" child module
BwClient *birdweather.BwClient
bwClientMutex sync.RWMutex // Mutex to protect BwClient access
MqttClient mqtt.Client
mqttMutex sync.RWMutex // Mutex to protect MQTT client access
BirdImageCache *imageprovider.BirdImageCache
EventTracker *EventTracker
eventTrackerMu sync.RWMutex // Mutex to protect EventTracker access
NewSpeciesTracker *species.SpeciesTracker // Tracks new species detections
speciesTrackerMu sync.RWMutex // Mutex to protect NewSpeciesTracker access
lastSyncAttempt time.Time // Last time sync was attempted
syncMutex sync.Mutex // Mutex to protect sync operations
syncInProgress atomic.Bool // Flag to prevent overlapping syncs
LastDogDetection map[string]time.Time // keep track of dog barks per audio source
LastHumanDetection map[string]time.Time // keep track of human vocal per audio source
Metrics *observability.Metrics
DynamicThresholds map[string]*DynamicThreshold
thresholdsMutex sync.RWMutex // Mutex to protect access to DynamicThresholds
pendingDetections map[string]PendingDetection
pendingMutex sync.RWMutex // RWMutex to protect access to pendingDetections (RLock for snapshots)
lastDogDetectionLog map[string]time.Time
dogDetectionMutex sync.Mutex
detectionMutex sync.RWMutex // Mutex to protect LastDogDetection and LastHumanDetection maps
controlChan chan string
JobQueue *jobqueue.JobQueue // Queue for managing job retries
workerCancel context.CancelFunc // Function to cancel worker goroutines
thresholdsCtx context.Context // Context for threshold persistence/cleanup goroutines
thresholdsCancel context.CancelFunc // Function to cancel threshold persistence/cleanup goroutines
flusherCtx context.Context // Context for pending detections flusher goroutine
flusherCancel context.CancelFunc // Function to cancel flusher goroutine
preRenderer PreRendererSubmit // Spectrogram pre-renderer for background generation
preRendererOnce sync.Once // Ensures pre-renderer is initialized only once
// SSE related fields
SSEBroadcaster func(note *datastore.Note, birdImage *imageprovider.BirdImage) error // Function to broadcast detection via SSE
sseBroadcasterMutex sync.RWMutex // Mutex to protect SSE broadcaster access
// Pending detection broadcast fields
PendingBroadcaster func(snapshot []SSEPendingDetection) // Function to broadcast pending detections via SSE
pendingBroadcasterMu sync.RWMutex // Mutex to protect PendingBroadcaster access
pendingFlushNotifs []SSEPendingDetection // Terminal-state notifications from last flush cycle
pendingFlushNotifsMu sync.Mutex // Mutex to protect pendingFlushNotifs
lastBroadcastSnapshot []SSEPendingDetection // Last broadcast snapshot for change detection
lastBroadcastSnapshotMu sync.Mutex // Mutex to protect lastBroadcastSnapshot
// Backup system fields (optional)
backupManager any // Use interface{} to avoid import cycle
backupScheduler any // Use interface{} to avoid import cycle
backupMutex sync.RWMutex
// Log deduplication (extracted to separate type for SRP)
logDedup *LogDeduplicator // Handles log deduplication logic
// Extended capture fields
extendedCaptureSpecies map[string]bool // Resolved set of scientific names eligible for extended capture
extendedCaptureAll bool // True when all species qualify (empty species list)
extendedCaptureMu sync.RWMutex // Protects extendedCaptureSpecies and extendedCaptureAll
// Daylight filter fields
daylightFilterSpecies map[string]bool // Resolved set of scientific names to filter during daylight
daylightFilterAll bool // Currently unused; empty species list resolves to filter-nothing
daylightFilterMu sync.RWMutex // Protects daylightFilterSpecies and daylightFilterAll
sunCalc *suncalc.SunCalc // Injected sun calculator for daylight determination
// Cached taxonomy database (lazy-loaded, shared across init functions)
taxonomyDB *birdnet.TaxonomyDatabase
taxonomyDBOnce sync.Once
}
type Detections struct {
CorrelationID string // Unique detection identifier for log correlation
pcmData3s []byte // 3s PCM data containing the detection
Result detection.Result // Detection result containing highest match
Results []detection.AdditionalResult // Additional BirdNET prediction results
}
// PendingDetection struct represents a single detection held in memory,
// including its last updated timestamp and a deadline for flushing it to the worker queue.
type PendingDetection struct {
Detection Detections // The detection data
Confidence float64 // Confidence level of the detection
Source string // Audio source of the detection, RTSP URL or audio card name
FirstDetected time.Time // Back-dated time for audio clip extraction (startTime from analysis buffer)
CreatedAt time.Time // Real wall-clock time when detection was first created (for display)
LastUpdated time.Time // Last time this detection was updated
FlushDeadline time.Time // Deadline by which the detection must be processed
Count int // Number of times this detection has been updated
ExtendedCapture bool // Whether this detection uses extended capture
MaxDeadline time.Time // Absolute max flush time (FirstDetected + maxDuration)
}
// pendingDetectionKey creates a composite key for the pendingDetections map
// that includes the source ID to prevent cross-source data corruption when
// multiple audio sources detect the same species concurrently.
func pendingDetectionKey(sourceID, speciesName string) string {
return sourceID + ":" + speciesName
}
// suggestLevelForDisabledFilter provides smart recommendations for filter levels
// when filtering is disabled (level 0). It analyzes current overlap settings
// and suggests an appropriate filter level that matches the user's configuration.
func suggestLevelForDisabledFilter(overlap float64) {
recommendedLevel, _ := getRecommendedLevelForOverlap(overlap)
if recommendedLevel > 0 {
GetLogger().Info("False positive filtering is disabled",
logger.Int("current_level", 0),
logger.Float64("current_overlap", overlap),
logger.Int("recommended_level", recommendedLevel),
logger.String("recommended_level_name", getLevelName(recommendedLevel)),
logger.String("recommendation", fmt.Sprintf("Consider enabling filtering with level %d (%s) which matches your current overlap %.1f",
recommendedLevel, getLevelName(recommendedLevel), overlap)),
logger.String("operation", "false_positive_filter_config"))
// Notify users through the web UI
notification.NotifyInfo(
"False Positive Filtering Disabled",
fmt.Sprintf("Your system can support Level %d (%s) filtering with your current overlap of %.1f. Enable it in settings to reduce false detections from wind, cars, and other noise.",
recommendedLevel, getLevelName(recommendedLevel), overlap),
)
} else {
GetLogger().Info("False positive filtering is disabled",
logger.Int("current_level", 0),
logger.String("operation", "false_positive_filter_config"))
}
}
// validateOverlapForLevel checks if the current overlap is sufficient for the
// configured filter level and provides warnings/recommendations if not optimal.
func validateOverlapForLevel(level int, overlap, minOverlap float64, minDetections int) {
if overlap < minOverlap {
// Overlap is too low for this level
recommendedForCurrent, _ := getRecommendedLevelForOverlap(overlap)
GetLogger().Warn("Overlap below recommended minimum for filtering level",
logger.Int("level", level),
logger.String("level_name", getLevelName(level)),
logger.Float64("min_overlap", minOverlap),
logger.Float64("current_overlap", overlap),
logger.Int("min_detections", minDetections),
logger.String("hardware_req", getHardwareRequirementForLevel(level)),
logger.Int("recommended_level_for_overlap", recommendedForCurrent),
logger.String("operation", "false_positive_filter_config"))
// Warn users through the web UI
notification.NotifyWarning(
"analysis",
"Filter Level May Not Work Optimally",
fmt.Sprintf("Level %d (%s) filtering requires overlap %.1f or higher, but current overlap is %.1f. Consider increasing overlap to %.1f or using Level %d (%s) instead.",
level, getLevelName(level), minOverlap, overlap, minOverlap, recommendedForCurrent, getLevelName(recommendedForCurrent)),
)
} else {
// Configuration is good
GetLogger().Info("False positive filtering configured",
logger.Int("level", level),
logger.String("level_name", getLevelName(level)),
logger.Float64("overlap", overlap),
logger.Float64("min_overlap", minOverlap),
logger.Int("min_detections", minDetections),
logger.String("hardware_req", getHardwareRequirementForLevel(level)),
logger.String("operation", "false_positive_filter_config"))
}
}
// warnAboutHardwareRequirements checks if high filter levels (4-5) have
// sufficient hardware performance based on overlap settings and inference time.
func warnAboutHardwareRequirements(level int, overlap float64) {
if level >= 4 {
// Check if overlap is within valid range for calculation
if overlap >= 3.0 {
GetLogger().Warn("Overlap value too high for hardware calculation",
logger.Float64("overlap", overlap),
logger.Float64("max_valid", 2.9),
logger.String("operation", "false_positive_filter_config"))
} else {
stepSize := 3.0 - overlap
maxInferenceTime := stepSize * 1000 // Convert to ms
GetLogger().Warn("High filtering level requires fast hardware",
logger.Int("level", level),
logger.Float64("required_inference_ms", maxInferenceTime),
logger.String("operation", "false_positive_filter_config"))
}
}
}
// validateAndLogFilterConfig validates false positive filter configuration,
// logs appropriate messages, and sends UI notifications. This function handles
// all validation, logging, and user notification for the false positive filter.
func validateAndLogFilterConfig(settings *conf.Settings) {
// Validate configuration
if err := settings.Realtime.FalsePositiveFilter.Validate(); err != nil {
GetLogger().Error("Invalid false positive filter configuration, falling back to level 0",
logger.Error(err),
logger.Int("fallback_level", 0),
logger.String("operation", "false_positive_filter_validation"))
// Reset to safe default
settings.Realtime.FalsePositiveFilter.Level = 0
}
level := settings.Realtime.FalsePositiveFilter.Level
overlap := settings.BirdNET.Overlap
minOverlap := getMinimumOverlapForLevel(level)
// Calculate what minDetections will be with current settings
minDetections := calculateMinDetectionsFromSettings(settings)
if level == 0 {
// Smart migration: suggest a level based on current overlap
suggestLevelForDisabledFilter(overlap)
} else {
// Filtering is enabled - validate overlap and warn about hardware if needed
validateOverlapForLevel(level, overlap, minOverlap, minDetections)
warnAboutHardwareRequirements(level, overlap)
}
}
// initLogDeduplicator creates and configures the log deduplicator.
func initLogDeduplicator(settings *conf.Settings) *LogDeduplicator {
healthCheckInterval := 60 * time.Second
if settings.Realtime.LogDeduplication.HealthCheckIntervalSeconds > 0 {
if settings.Realtime.LogDeduplication.HealthCheckIntervalSeconds > 3600 {
healthCheckInterval = time.Hour
GetLogger().Warn("Log deduplication health check interval capped at 1 hour",
logger.Int("requested_seconds", settings.Realtime.LogDeduplication.HealthCheckIntervalSeconds),
logger.Int("capped_seconds", 3600),
logger.String("operation", "config_validation"))
} else {
healthCheckInterval = time.Duration(settings.Realtime.LogDeduplication.HealthCheckIntervalSeconds) * time.Second
}
}
return NewLogDeduplicator(DeduplicationConfig{
HealthCheckInterval: healthCheckInterval,
Enabled: settings.Realtime.LogDeduplication.Enabled,
})
}
// initSpeciesTracker initializes the species tracker if enabled.
// Returns nil if tracking is disabled or configuration is invalid.
func initSpeciesTracker(settings *conf.Settings, ds datastore.Interface) *species.SpeciesTracker {
if !settings.Realtime.SpeciesTracking.Enabled {
return nil
}
if err := settings.Realtime.SpeciesTracking.Validate(); err != nil {
GetLogger().Error("Invalid species tracking configuration, disabling tracking",
logger.Error(err),
logger.String("operation", "species_tracking_validation"))
settings.Realtime.SpeciesTracking.Enabled = false
return nil
}
hemisphereAwareTracking := settings.Realtime.SpeciesTracking
if hemisphereAwareTracking.SeasonalTracking.Enabled {
hemisphereAwareTracking.SeasonalTracking = conf.GetSeasonalTrackingWithHemisphere(
hemisphereAwareTracking.SeasonalTracking,
settings.BirdNET.Latitude,
)
}
tracker := species.NewTrackerFromSettings(ds, &hemisphereAwareTracking)
if err := tracker.InitFromDatabase(); err != nil {
GetLogger().Error("Failed to initialize species tracker from database, continuing with new detections",
logger.Error(err),
logger.String("operation", "species_tracker_init"))
}
hemisphere := conf.DetectHemisphere(settings.BirdNET.Latitude)
GetLogger().Info("Species tracking enabled",
logger.Int("window_days", settings.Realtime.SpeciesTracking.NewSpeciesWindowDays),
logger.Int("sync_interval_minutes", settings.Realtime.SpeciesTracking.SyncIntervalMinutes),
logger.String("hemisphere", hemisphere),
logger.Float64("latitude", settings.BirdNET.Latitude),
logger.String("operation", "species_tracking_config"))
return tracker
}
// initBirdWeatherClient initializes the BirdWeather client if enabled.
func (p *Processor) initBirdWeatherClient(settings *conf.Settings) {
if !settings.Realtime.Birdweather.Enabled {
return
}
bwClient, err := birdweather.New(settings)
if err != nil {
GetLogger().Error("Failed to create BirdWeather client",
logger.Error(err),
logger.String("operation", "birdweather_client_init"),
logger.String("integration", "birdweather"))
return
}
p.SetBwClient(bwClient)
}
// initDynamicThresholds loads and starts persistence for dynamic thresholds if enabled.
func (p *Processor) initDynamicThresholds(settings *conf.Settings) {
if !settings.Realtime.DynamicThreshold.Enabled {
return
}
if err := p.loadDynamicThresholdsFromDB(); err != nil {
GetLogger().Debug("Starting with fresh dynamic thresholds",
logger.String("reason", err.Error()),
logger.String("operation", "load_dynamic_thresholds"))
}
p.startThresholdPersistence()
p.startThresholdCleanup()
}
// New creates a new Processor with the given dependencies.
// The parentLog parameter should be the analysis package logger, which will be used to create
// a child logger with ".processor" suffix for hierarchical logging (e.g., "analysis.processor").
func New(settings *conf.Settings, ds datastore.Interface, bn *birdnet.BirdNET, metrics *observability.Metrics, birdImageCache *imageprovider.BirdImageCache, parentLog logger.Logger) *Processor {
// Create child logger from parent for hierarchical logging
var procLog logger.Logger
if parentLog != nil {
procLog = parentLog.Module("processor")
} else {
// Fallback to global logger if parent not provided
procLog = logger.Global().Module("analysis.processor")
}
p := &Processor{
Settings: settings,
Ds: ds,
Repo: datastore.NewDetectionRepository(ds, nil), // Bridge to new domain model
Bn: bn,
log: procLog,
BirdImageCache: birdImageCache,
EventTracker: NewEventTrackerWithConfig(
time.Duration(settings.Realtime.Interval)*time.Second,
settings.Realtime.Species.Config,
),
Metrics: metrics,
LastDogDetection: make(map[string]time.Time),
LastHumanDetection: make(map[string]time.Time),
DynamicThresholds: make(map[string]*DynamicThreshold),
pendingDetections: make(map[string]PendingDetection),
lastDogDetectionLog: make(map[string]time.Time),
controlChan: make(chan string, 10), // Buffered channel to prevent blocking
JobQueue: jobqueue.NewJobQueue(), // Initialize the job queue
}
// Initialize log deduplicator with configuration from settings
p.logDedup = initLogDeduplicator(settings)
// Validate detection window configuration
captureLength := time.Duration(settings.Realtime.Audio.Export.Length) * time.Second
preCaptureLength := time.Duration(settings.Realtime.Audio.Export.PreCapture) * time.Second
detectionWindow := max(time.Duration(0), captureLength-preCaptureLength)
// Warn if detection window is very short (may affect overlap-based filtering)
minRecommendedWindow := 3 * time.Second
if detectionWindow < minRecommendedWindow {
GetLogger().Warn("Detection window very short, may affect accuracy",
logger.Float64("window_seconds", detectionWindow.Seconds()),
logger.Float64("capture_length_seconds", captureLength.Seconds()),
logger.Float64("pre_capture_seconds", preCaptureLength.Seconds()),
logger.Float64("min_recommended_seconds", minRecommendedWindow.Seconds()),
logger.String("operation", "config_validation"))
}
// Validate that audio export length fits within the capture buffer.
// The capture buffer is a ring buffer that holds recent audio; requesting more
// audio than the buffer can hold results in silent truncation.
captureBufferSeconds := conf.DefaultCaptureBufferSeconds
if settings.Realtime.ExtendedCapture.Enabled && settings.Realtime.ExtendedCapture.CaptureBufferSeconds > 0 {
captureBufferSeconds = settings.Realtime.ExtendedCapture.CaptureBufferSeconds
}
if settings.Realtime.Audio.Export.Length > captureBufferSeconds {
GetLogger().Error("Audio export length exceeds capture buffer size, audio will be truncated",
logger.Int("export_length_seconds", settings.Realtime.Audio.Export.Length),
logger.Int("capture_buffer_seconds", captureBufferSeconds),
logger.String("recommendation", fmt.Sprintf("Reduce audio.export.length to %d or less, or increase the capture buffer", captureBufferSeconds)),
logger.String("operation", "config_validation"))
notification.NotifyWarning(
"analysis",
"Audio Export Length Exceeds Buffer",
fmt.Sprintf("Audio export length (%ds) exceeds the capture buffer (%ds). Exported audio clips will be truncated. Reduce export length to %d seconds or less.",
settings.Realtime.Audio.Export.Length, captureBufferSeconds, captureBufferSeconds),
)
}
// Validate and log false positive filter configuration
validateAndLogFilterConfig(settings)
// Initialize species tracker if enabled
p.NewSpeciesTracker = initSpeciesTracker(settings, ds)
// Start the detection processor
p.startDetectionProcessor()
// Start the worker pool for action processing
p.startWorkerPool()
// Create context for pending detections flusher
p.flusherCtx, p.flusherCancel = context.WithCancel(context.Background())
// Start the held detection flusher
p.pendingDetectionsFlusher()
// Initialize BirdWeather client if enabled
p.initBirdWeatherClient(settings)
// Initialize MQTT client if enabled in settings
p.initializeMQTT(settings)
// Start the job queue
p.JobQueue.Start()
// Initialize dynamic thresholds if enabled
p.initDynamicThresholds(settings)
// Validate extended capture configuration
if settings.Realtime.ExtendedCapture.Enabled {
preCapture := settings.Realtime.Audio.Export.PreCapture
if err := settings.Realtime.ExtendedCapture.Validate(preCapture); err != nil {
GetLogger().Error("Invalid extended capture configuration, disabling",
logger.Any("error", err),
logger.String("operation", "extended_capture_validation"))
settings.Realtime.ExtendedCapture.Enabled = false
}
// Warn about memory usage for large buffers
if settings.Realtime.ExtendedCapture.Enabled && settings.Realtime.ExtendedCapture.MaxDuration > conf.DefaultExtendedCaptureMaxDuration {
bytesPerSecond := conf.SampleRate * (conf.BitDepth / 8) * conf.NumChannels
effectiveBufferSeconds := settings.Realtime.ExtendedCapture.CaptureBufferSeconds
if effectiveBufferSeconds <= 0 {
effectiveBufferSeconds = conf.DefaultCaptureBufferSeconds
}
bufferMB := effectiveBufferSeconds * bytesPerSecond / (1024 * 1024)
GetLogger().Warn("Extended capture with large buffer configured",
logger.Int("buffer_mb_per_source", bufferMB),
logger.Int("capture_buffer_seconds", effectiveBufferSeconds),
logger.Int("max_duration_seconds", settings.Realtime.ExtendedCapture.MaxDuration),
logger.String("operation", "extended_capture_memory_check"))
}
}
// Initialize extended capture species filter if enabled
p.initExtendedCapture()
// Initialize spectrogram pre-renderer if mode is "prerender"
if settings.Realtime.Dashboard.Spectrogram.IsPreRenderEnabled() {
p.initPreRenderer()
}
return p
}
// Start goroutine to process detections from the queue
func (p *Processor) startDetectionProcessor() {
// Add structured logging for detection processor startup
GetLogger().Info("Starting detection processor",
logger.String("operation", "detection_processor_startup"))
go func() {
// ResultsQueue is fed by myaudio.ProcessData()
for item := range birdnet.ResultsQueue {
// Pass by value since we own the data (see queue.go ownership comment)
p.processDetections(item)
}
// Add structured logging when processor stops
GetLogger().Info("Detection processor stopped",
logger.String("operation", "detection_processor_shutdown"))
}()
}
// processDetections examines each detection from the queue, updating held detections
// with new or higher-confidence instances and setting an appropriate flush deadline.
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) processDetections(item birdnet.Results) {
// Add structured logging for detection pipeline entry
GetLogger().Debug("Processing detections from queue",
logger.String("source", item.Source.DisplayName),
logger.Time("start_time", item.StartTime),
logger.Int("results_count", len(item.Results)),
logger.Int64("elapsed_time_ms", item.ElapsedTime.Milliseconds()),
logger.String("operation", "process_detections_entry"))
// Detection window sets wait time before a detection is considered final and is flushed.
// This represents the duration to wait from NOW (detection creation time) before flushing,
// allowing overlapping analyses to accumulate confirmations for false positive filtering.
captureLength := time.Duration(p.Settings.Realtime.Audio.Export.Length) * time.Second
preCaptureLength := time.Duration(p.Settings.Realtime.Audio.Export.PreCapture) * time.Second
// Ensure detectionWindow is non-negative to prevent early flushes
detectionWindow := max(time.Duration(0), captureLength-preCaptureLength)
// processResults() returns a slice of detections, we iterate through each and process them
// detections are put into pendingDetections map where they are held until flush deadline is reached
// once deadline is reached detections are delivered to workers for actions (save to db etc) processing
detectionResults := p.processResults(item)
// Log processing results with deduplication to prevent spam
p.logDetectionResults(item.Source.ID, len(item.Results), len(detectionResults))
for i := range detectionResults {
det := detectionResults[i]
commonName := strings.ToLower(det.Result.Species.CommonName)
confidence := det.Result.Confidence
// Lock the mutex to ensure thread-safe access to shared resources
p.pendingMutex.Lock()
now := time.Now()
mapKey := pendingDetectionKey(item.Source.ID, commonName)
if existing, exists := p.pendingDetections[mapKey]; exists {
// Update the existing detection if it's already in pendingDetections map
oldConfidence := existing.Confidence
existing.LastUpdated = now
if confidence > existing.Confidence {
existing.Detection = det
existing.Confidence = confidence
existing.Source = item.Source.ID
// Add structured logging for confidence update
GetLogger().Debug("Updated pending detection with higher confidence",
logger.String("species", commonName),
logger.Float64("old_confidence", oldConfidence),
logger.Float64("new_confidence", confidence),
logger.Int("count", existing.Count+1),
logger.String("operation", "update_pending_detection"))
}
existing.Count++
p.pendingDetections[mapKey] = existing
} else {
// Create a new pending detection if it doesn't exist
// Add structured logging for new pending detection
GetLogger().Info("Created new pending detection",
logger.String("species", commonName),
logger.Float64("confidence", confidence),
logger.String("source", item.Source.DisplayName),
logger.Time("flush_deadline", now.Add(detectionWindow)),
logger.String("operation", "create_pending_detection"))
p.pendingDetections[mapKey] = PendingDetection{
Detection: det,
Confidence: confidence,
Source: item.Source.ID,
FirstDetected: item.StartTime,
CreatedAt: now,
LastUpdated: item.StartTime,
// FlushDeadline is relative to NOW (not startTime) to ensure it's always in the future.
// startTime is backdated for audio extraction, but FlushDeadline needs to be a future deadline.
FlushDeadline: now.Add(detectionWindow),
Count: 1,
}
}
// Apply extended capture if species qualifies
if p.isExtendedCaptureSpecies(det.Result.Species.ScientificName) {
p.applyExtendedCapture(mapKey, now, detectionWindow)
}
// Update the dynamic threshold for this species if enabled
p.updateDynamicThreshold(commonName, confidence)
// Unlock the mutex to allow other goroutines to access shared resources
p.pendingMutex.Unlock()
}
// Broadcast updated pending detections snapshot for "currently hearing" UI.
// This runs after all new detections are incorporated into pendingDetections.
minDet := p.calculateMinDetections()
snapshot := p.SnapshotVisiblePending(minDet)
p.broadcastPendingSnapshot(snapshot)
}
// processResults processes the results from the BirdNET prediction and returns a list of detections.
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) processResults(item birdnet.Results) []Detections {
// Pre-allocate slice with capacity for all results
detections := make([]Detections, 0, len(item.Results))
// Collect processing time metric
if p.Settings.Realtime.Telemetry.Enabled && p.Metrics != nil && p.Metrics.BirdNET != nil {
p.Metrics.BirdNET.SetProcessTime(float64(item.ElapsedTime.Milliseconds()))
}
// Sync species tracker if needed
p.syncSpeciesTrackerIfNeeded()
// Process each result in item.Results
for _, result := range item.Results {
// Parse and validate species information
scientificName, commonName, speciesCode, speciesLowercase := p.parseAndValidateSpecies(result, item)
// Skip if either scientific or common name is missing (partial/invalid parsing)
if scientificName == "" || commonName == "" {
if p.Settings.Debug {
GetLogger().Debug("Skipping partially parsed species",
logger.String("scientific_name", scientificName),
logger.String("common_name", commonName),
logger.String("species_code", speciesCode),
logger.String("species_lowercase", speciesLowercase),
logger.String("original_species", result.Species),
logger.Float32("confidence", result.Confidence),
logger.String("operation", "validate_species"))
}
continue // Skip invalid or partially parsed species
}
// Handle dog and human detection, this sets LastDogDetection and LastHumanDetection which is
// later used to discard detection if privacy filter or dog bark filters are enabled in settings.
p.handleDogDetection(item, speciesLowercase, result)
p.handleHumanDetection(item, speciesLowercase, result)
// Determine confidence threshold and check filters
baseThreshold := p.getBaseConfidenceThreshold(commonName, scientificName)
// Check if detection should be filtered
shouldSkip, _ := p.shouldFilterDetection(result, commonName, scientificName, speciesLowercase, baseThreshold, item.Source.ID)
if shouldSkip {
continue
}
// Create the detection
det := p.createDetection(item, result, scientificName, commonName, speciesCode)
detections = append(detections, det)
}
return detections
}
// parseAndValidateSpecies parses species information and validates it
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) parseAndValidateSpecies(result datastore.Results, item birdnet.Results) (scientificName, commonName, speciesCode, speciesLowercase string) {
// Use BirdNET's EnrichResultWithTaxonomy to get species information
scientificName, commonName, speciesCode = p.Bn.EnrichResultWithTaxonomy(result.Species)
// Skip processing if we couldn't parse the species properly (either name missing)
if commonName == "" || scientificName == "" {
if p.Settings.Debug {
GetLogger().Debug("Skipping species with invalid format",
logger.String("species", result.Species),
logger.Float32("confidence", result.Confidence),
logger.String("operation", "species_format_validation"))
}
return "", "", "", ""
}
// Log placeholder taxonomy codes if using custom model
if p.Settings.BirdNET.ModelPath != "" && p.Settings.Debug && speciesCode != "" {
if len(speciesCode) == 8 && (speciesCode[:2] == "XX" || (speciesCode[0] >= 'A' && speciesCode[0] <= 'Z' && speciesCode[1] >= 'A' && speciesCode[1] <= 'Z')) {
GetLogger().Debug("using placeholder taxonomy code",
logger.String("taxonomy_code", speciesCode),
logger.String("scientific_name", scientificName),
logger.String("common_name", commonName),
logger.String("operation", "taxonomy_code_assignment"))
}
}
// Convert species to lowercase for case-insensitive comparison
speciesLowercase = strings.ToLower(commonName)
if speciesLowercase == "" && scientificName != "" {
speciesLowercase = strings.ToLower(scientificName)
}
return
}
// shouldFilterDetection checks if a detection should be filtered out
func (p *Processor) shouldFilterDetection(result datastore.Results, commonName, scientificName, speciesLowercase string, baseThreshold float32, source string) (shouldFilter bool, confidenceThreshold float32) {
// Check human detection privacy filter
if strings.Contains(strings.ToLower(commonName), speciesHuman) && result.Confidence > baseThreshold {
return true, 0 // Filter out human detections for privacy
}
// Determine confidence threshold
if p.Settings.Realtime.DynamicThreshold.Enabled {
// Check if this species has a custom user-configured threshold (> 0)
// Species may be in Config only for custom actions/interval without threshold set
// Use lookupSpeciesConfig to support both common name and scientific name lookups
config, exists := lookupSpeciesConfig(p.Settings.Realtime.Species.Config, commonName, scientificName)
isCustomThreshold := exists && config.Threshold > 0
confidenceThreshold = p.getAdjustedConfidenceThreshold(speciesLowercase, baseThreshold, isCustomThreshold)
} else {
confidenceThreshold = baseThreshold
}
// Check confidence threshold
if result.Confidence <= confidenceThreshold {
if p.Settings.Debug {
GetLogger().Debug("Detection filtered out due to low confidence",
logger.String("species", result.Species),
logger.Float32("confidence", result.Confidence),
logger.Float32("threshold", confidenceThreshold),
logger.String("source", p.getDisplayNameForSource(source)),
logger.String("operation", "confidence_filter"))
}
return true, confidenceThreshold
}
// Check species inclusion filter
if !p.Settings.IsSpeciesIncluded(result.Species) {
if p.Settings.Debug {
GetLogger().Debug("species not on included list",
logger.String("species", result.Species),
logger.Float32("confidence", result.Confidence),
logger.String("operation", "species_inclusion_filter"))
}
return true, confidenceThreshold
}
return false, confidenceThreshold
}
// createDetection creates a detection object with all necessary information
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) createDetection(item birdnet.Results, result datastore.Results, scientificName, commonName, speciesCode string) Detections {
// Create file name for audio clip
clipName := p.generateClipName(scientificName, result.Confidence)
// Get capture length and pre-capture length for detection end time calculation
captureLength := time.Duration(p.Settings.Realtime.Audio.Export.Length) * time.Second
preCaptureLength := time.Duration(p.Settings.Realtime.Audio.Export.PreCapture) * time.Second
// Set begin and end time for note
beginTime := item.StartTime
endTime := item.StartTime.Add(captureLength - preCaptureLength)
// Get occurrence probability for this species at detection time
occurrence := p.Bn.GetSpeciesOccurrenceAtTime(result.Species, item.StartTime)
// Compute detection time once to ensure Result has consistent timestamp
// This prevents date mismatch around midnight when time.Now() would be called separately
detectionTime := time.Now().Add(-detection.DetectionTimeOffset)
// Create the detection.Result
detectionResult := p.createDetectionResult(
detectionTime,
beginTime, endTime,
scientificName, commonName, speciesCode,
float64(result.Confidence),
item.Source, clipName,
item.ElapsedTime, occurrence)
// Convert additional results from datastore.Results to detection.AdditionalResult.
// Exclude the primary species since it's already stored as Detection.LabelID.
additionalResults := convertToAdditionalResults(item.Results, scientificName)
// Update species tracker if enabled
p.speciesTrackerMu.RLock()
tracker := p.NewSpeciesTracker
p.speciesTrackerMu.RUnlock()
if tracker != nil {
tracker.UpdateSpecies(scientificName, item.StartTime)
}
// Generate unique correlation ID for detection tracking
correlationID := p.generateCorrelationID(commonName, item.StartTime)
return Detections{
CorrelationID: correlationID,
pcmData3s: item.PCMdata,
Result: detectionResult,
Results: additionalResults,
}
}
// createDetectionResult creates a detection.Result from the given parameters.
// detectionTime should be pre-computed by the caller to ensure timestamp consistency
// with other detection artifacts (e.g., Note) created from the same analysis.
func (p *Processor) createDetectionResult(
detectionTime time.Time,
beginTime, endTime time.Time,
scientificName, commonName, speciesCode string,
confidence float64,
source datastore.AudioSource, clipName string,
elapsedTime time.Duration, occurrence float64) detection.Result {
// Resolve audio source info from registry
audioSource := p.resolveAudioSource(source)
return detection.Result{
Timestamp: detectionTime,
SourceNode: p.Settings.Main.Name,
AudioSource: audioSource,
BeginTime: beginTime,
EndTime: endTime,
Species: detection.Species{
ScientificName: scientificName,
CommonName: commonName,
Code: speciesCode,
},
Confidence: math.Round(confidence*100) / 100,
Latitude: p.Settings.BirdNET.Latitude,
Longitude: p.Settings.BirdNET.Longitude,
Threshold: p.Settings.BirdNET.Threshold,
Sensitivity: p.Settings.BirdNET.Sensitivity,
ClipName: clipName,
ProcessingTime: elapsedTime,
Occurrence: math.Max(0.0, math.Min(1.0, occurrence)),
Model: detection.DefaultModelInfo(),
}
}
// resolveAudioSource resolves the audio source details from the registry.
// Mirrors NewWithSpeciesInfo lookup order: connection string first, then ID.
func (p *Processor) resolveAudioSource(source datastore.AudioSource) detection.AudioSource {
// Default to using the source directly, including type determination
audioSource := detection.AudioSource{
ID: source.ID,
SafeString: source.SafeString,
DisplayName: source.DisplayName,
Type: detection.DetermineSourceType(source.SafeString),
}
// Try to get additional details from registry
// Use same lookup order as NewWithSpeciesInfo: connection string first, then ID
registry := myaudio.GetRegistry()
if registry != nil {
if existingSource, exists := registry.GetSourceByConnection(source.ID); exists {
audioSource.ID = existingSource.ID
audioSource.SafeString = existingSource.SafeString
audioSource.DisplayName = existingSource.DisplayName
audioSource.Type = detection.DetermineSourceType(existingSource.SafeString)
} else if existingSource, exists := registry.GetSourceByID(source.ID); exists {
audioSource.ID = existingSource.ID
audioSource.SafeString = existingSource.SafeString
audioSource.DisplayName = existingSource.DisplayName
audioSource.Type = detection.DetermineSourceType(existingSource.SafeString)
}
}
return audioSource
}
// convertToAdditionalResults converts a slice of datastore.Results to detection.AdditionalResult,
// deduplicating by scientific name and keeping the highest confidence for each species.
// The primary species is excluded since it's already stored as Detection.LabelID.
// Custom BirdNET classifiers can have the same species at multiple positions in the label file,
// producing duplicate entries in prediction results that would cause UNIQUE constraint violations.
func convertToAdditionalResults(results []datastore.Results, primaryScientificName string) []detection.AdditionalResult {
additional := make([]detection.AdditionalResult, 0, len(results))
seen := make(map[string]int, len(results)) // scientificName → index in additional
for _, r := range results {
sp := detection.ParseSpeciesString(r.Species)
if sp.ScientificName == primaryScientificName {
continue
}
if idx, exists := seen[sp.ScientificName]; exists {
// Keep the higher confidence entry
if float64(r.Confidence) > additional[idx].Confidence {
additional[idx] = detection.AdditionalResult{
Species: sp,
Confidence: float64(r.Confidence),
}
}
continue
}
seen[sp.ScientificName] = len(additional)
additional = append(additional, detection.AdditionalResult{
Species: sp,
Confidence: float64(r.Confidence),
})
}
return additional
}
// syncSpeciesTrackerIfNeeded syncs the species tracker if conditions are met
func (p *Processor) syncSpeciesTrackerIfNeeded() {
p.speciesTrackerMu.RLock()
tracker := p.NewSpeciesTracker
p.speciesTrackerMu.RUnlock()
if tracker != nil {
// Rate limit sync operations to avoid excessive goroutines
p.syncMutex.Lock()
if time.Since(p.lastSyncAttempt) >= time.Minute {
// Check if sync is already in progress
if !p.syncInProgress.Load() {
p.lastSyncAttempt = time.Now()
p.syncInProgress.Store(true) // Mark sync as in progress
go func() {
defer p.syncInProgress.Store(false) // Always clear the flag when done
if err := tracker.SyncIfNeeded(); err != nil {
GetLogger().Error("failed to sync species tracker",
logger.Error(err),
logger.String("operation", "species_tracker_sync"))
}
}()
}
}
p.syncMutex.Unlock()
}
}
// handleDogDetection handles the detection of dog barks and updates the last detection timestamp.
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) handleDogDetection(item birdnet.Results, speciesLowercase string, result datastore.Results) {
if p.Settings.Realtime.DogBarkFilter.Enabled && strings.Contains(speciesLowercase, speciesDog) &&
result.Confidence > p.Settings.Realtime.DogBarkFilter.Confidence {
GetLogger().Info("dog detection filtered",
logger.Float32("confidence", result.Confidence),
logger.Float32("threshold", float32(p.Settings.Realtime.DogBarkFilter.Confidence)),
logger.String("source", item.Source.DisplayName),
logger.String("operation", "dog_bark_filter"))
p.detectionMutex.Lock()
p.LastDogDetection[item.Source.ID] = item.StartTime
p.detectionMutex.Unlock()
}
}
// handleHumanDetection handles the detection of human vocalizations and updates the last detection timestamp.
//
//nolint:gocritic // hugeParam: Pass by value is intentional - avoids pointer dereferencing in hot path
func (p *Processor) handleHumanDetection(item birdnet.Results, speciesLowercase string, result datastore.Results) {
// only check this if privacy filter is enabled
if p.Settings.Realtime.PrivacyFilter.Enabled && strings.Contains(speciesLowercase, "human ") &&
result.Confidence > p.Settings.Realtime.PrivacyFilter.Confidence {
GetLogger().Info("human detection filtered",
logger.Float32("confidence", result.Confidence),
logger.Float32("threshold", float32(p.Settings.Realtime.PrivacyFilter.Confidence)),
logger.String("source", item.Source.DisplayName),
logger.String("operation", "privacy_filter"))
// put human detection timestamp into LastHumanDetection map. This is used to discard
// bird detections if a human vocalization is detected after the first detection
p.detectionMutex.Lock()
p.LastHumanDetection[item.Source.ID] = item.StartTime
p.detectionMutex.Unlock()
}
}
// getBaseConfidenceThreshold retrieves the confidence threshold for a species, using custom or global thresholds.
// It supports lookup by both common name and scientific name for consistency with include/exclude matching.
func (p *Processor) getBaseConfidenceThreshold(commonName, scientificName string) float32 {
// Check if species has a custom threshold using both common and scientific name lookup
if config, exists := lookupSpeciesConfig(p.Settings.Realtime.Species.Config, commonName, scientificName); exists {
if p.Settings.Debug {
GetLogger().Debug("using custom confidence threshold",
logger.String("commonName", commonName),
logger.String("scientificName", scientificName),
logger.Float64("threshold", config.Threshold),
logger.String("operation", "custom_threshold_lookup"))
}
return float32(config.Threshold)
}
// Fall back to global threshold
return float32(p.Settings.BirdNET.Threshold)
}