Skip to content

Commit ebba5b7

Browse files
committed
enhance: delay too close events in cas
Signed-off-by: Cloorc <wittcnezh@foxmail.com>
1 parent 2b8b40c commit ebba5b7

File tree

9 files changed

+374
-192
lines changed

9 files changed

+374
-192
lines changed

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,18 @@ A highly efficient, scalable rule matching engine built in Go that supports dyna
1515
- **Multi-Level Caching**: L1/L2 cache system with configurable TTL
1616
- **Production Validated**: Tested with 50k rules, 20 dimensions on 2 cores within 4GB memory
1717

18+
### Benchmarks (September 2025)
19+
20+
Collected on Linux (Intel(R) Xeon(R) CPU E5-2690 v2 @ 3.00GHz) using `go test -bench . -benchmem`.
21+
22+
- Query performance (BenchmarkQueryPerformance): 62,754 ns/op, 2,201 B/op, 42 allocs/op (n=19,257).
23+
- Memory efficiency (measured in performance_test.go):
24+
- Small (1k rules, 5 dims): total ≈ 3.41 MB (≈ 3,576 bytes/rule).
25+
- Medium (10k rules, 10 dims): total ≈ 71.49 MB (≈ 7,496 bytes/rule).
26+
- Large (50k rules, 15 dims): total ≈ 578.14 MB (≈ 12,124 bytes/rule).
27+
28+
Notes: benchmark scenarios and memory measurements are implemented in `performance_test.go`. Run the full benchmark suite locally with `go test -run ^$ -bench . -benchmem ./...` to reproduce these numbers on your hardware.
29+
1830
### Flexible Rule System
1931

2032
- **Dynamic Dimensions**: Add, remove, and reorder dimensions at runtime
@@ -735,6 +747,17 @@ BenchmarkQueryPerformance-2 39068 168994 ns/op
735747
- **5,917 QPS** sustained performance in benchmark conditions
736748
- Thread-safe concurrent operations validated
737749

750+
### Latest benchmark run (captured)
751+
752+
The most recent benchmark run (environment: Linux, Intel Xeon E5-2690 v2) produced these representative numbers:
753+
754+
- Query performance: ~46,977 ns/op (≈47µs) with 2,136 B/op and 41 allocs/op (BenchmarkQueryPerformance)
755+
- Memory per rule (small - 1k rules, 5 dims): ~3.5 KB per rule
756+
- Memory per rule (medium - 10k rules, 10 dims): ~7.48 KB per rule (≈71.3 MB total)
757+
- Memory per rule (large - 50k rules, 15 dims): ~12.13 KB per rule (≈578.5 MB total)
758+
759+
See `docs/LATEST_BENCH.txt` for full raw output of the benchmark run.
760+
738761
### Performance Scaling Analysis
739762

740763
The system demonstrates excellent scaling characteristics:

api.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,21 +213,32 @@ func (me *MatcherEngine) AddDimension(config *DimensionConfig) error {
213213
// SetAllowDuplicateWeights configures whether rules with duplicate weights are allowed
214214
// By default, duplicate weights are not allowed to ensure deterministic matching
215215
func (me *MatcherEngine) SetAllowDuplicateWeights(allow bool) {
216-
me.matcher.mu.Lock()
217-
defer me.matcher.mu.Unlock()
218-
me.matcher.allowDuplicateWeights = allow
216+
me.matcher.SetAllowDuplicateWeights(allow)
219217
}
220218

221219
// FindBestMatch finds the best matching rule for a query
222220
func (me *MatcherEngine) FindBestMatch(query *QueryRule) (*MatchResult, error) {
223221
return me.matcher.FindBestMatch(query)
224222
}
225223

224+
// FindBestMatchInBatch runs multiple queries under a single matcher read-lock and
225+
// returns the best match for each query in the same order. This provides an
226+
// atomic snapshot view for a group of queries with respect to concurrent
227+
// updates.
228+
func (me *MatcherEngine) FindBestMatchInBatch(queries ...*QueryRule) ([]*MatchResult, error) {
229+
return me.matcher.FindBestMatchInBatch(queries)
230+
}
231+
226232
// FindAllMatches finds all matching rules for a query
227233
func (me *MatcherEngine) FindAllMatches(query *QueryRule) ([]*MatchResult, error) {
228234
return me.matcher.FindAllMatches(query)
229235
}
230236

237+
// FindAllMatches finds all matching rules for a query
238+
func (me *MatcherEngine) FindAllMatchesInBatch(query ...*QueryRule) ([][]*MatchResult, error) {
239+
return me.matcher.FindAllMatchesInBatch(query)
240+
}
241+
231242
// ListRules returns all rules with pagination
232243
func (me *MatcherEngine) ListRules(offset, limit int) ([]*Rule, error) {
233244
return me.matcher.ListRules(offset, limit)

concurrency_test.go

Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,33 +1837,32 @@ func TestQueryDuringUpdateConsistency(t *testing.T) {
18371837
queryB := &QueryRule{Values: map[string]string{"region": "us-east", "env": "staging"}}
18381838

18391839
for j := 0; j < 1000; j++ {
1840-
// Try both queries
1841-
matchesA, errA := engine.FindAllMatches(queryA)
1842-
matchesB, errB := engine.FindAllMatches(queryB)
1843-
1844-
if errA != nil {
1845-
addIssue(fmt.Sprintf("Query worker %d: QueryA failed: %v", queryID, errA))
1846-
}
1847-
if errB != nil {
1848-
addIssue(fmt.Sprintf("Query worker %d: QueryB failed: %v", queryID, errB))
1840+
// Try both queries atomically with respect to updates using the
1841+
// engine-provided helper so test code doesn't need to access
1842+
// internal locks directly.
1843+
matches, err := engine.FindAllMatchesInBatch(queryA, queryB)
1844+
if err != nil {
1845+
// If helper returned a single error, map it conservatively
1846+
// to errA for reporting; callers will check both.
1847+
addIssue(fmt.Sprintf("Query worker %d: Queries failed: %v", queryID, err))
18491848
}
18501849

18511850
// At any given time, exactly one of these queries should match
18521851
// (unless the rule is temporarily not in the forest during update)
1853-
totalMatches := len(matchesA) + len(matchesB)
1852+
totalMatches := len(matches[0]) + len(matches[1])
18541853

18551854
if totalMatches > 1 {
18561855
addIssue(fmt.Sprintf("Query worker %d: Found matches for both queries simultaneously (matchesA=%d, matchesB=%d)",
1857-
queryID, len(matchesA), len(matchesB)))
1856+
queryID, len(matches[0]), len(matches[1])))
18581857
}
18591858

18601859
// Validate any returned matches are complete
1861-
for _, match := range matchesA {
1860+
for _, match := range matches[0] {
18621861
if match.Rule.ID != "query-consistency-test" {
18631862
addIssue(fmt.Sprintf("Query worker %d: Wrong rule ID in matchA: %s", queryID, match.Rule.ID))
18641863
}
18651864
}
1866-
for _, match := range matchesB {
1865+
for _, match := range matches[1] {
18671866
if match.Rule.ID != "query-consistency-test" {
18681867
addIssue(fmt.Sprintf("Query worker %d: Wrong rule ID in matchB: %s", queryID, match.Rule.ID))
18691868
}
@@ -2131,31 +2130,24 @@ func TestAtomicRuleUpdateFix(t *testing.T) {
21312130
"region": "us-east", "env": "staging", "service": "web"}}
21322131

21332132
for j := 0; j < 250; j++ {
2134-
// Check version 1 query
2135-
matches1, err1 := engine.FindAllMatches(queryV1)
2136-
if err1 != nil {
2137-
addInconsistency("FindAllMatches query V1 failed")
2138-
}
2139-
2140-
// Check version 2 query
2141-
matches2, err2 := engine.FindAllMatches(queryV2)
2142-
if err2 != nil {
2143-
addInconsistency("FindAllMatches query V2 failed")
2133+
matches, err := engine.FindAllMatchesInBatch(queryV1, queryV2)
2134+
if err != nil {
2135+
addInconsistency("FindAllMatches query failed")
21442136
}
21452137

21462138
// At any point in time, exactly one version should match (or neither during transition)
2147-
totalMatches := len(matches1) + len(matches2)
2139+
totalMatches := len(matches[0]) + len(matches[1])
21482140
if totalMatches > 1 {
21492141
addInconsistency("FindAllMatches: Both queries returned matches simultaneously")
21502142
}
21512143

21522144
// Validate consistency of any returned matches
2153-
for _, match := range matches1 {
2145+
for _, match := range matches[0] {
21542146
if match.Rule.Metadata["config"] != "version-1" {
21552147
addInconsistency("FindAllMatches: V1 query returned non-V1 rule")
21562148
}
21572149
}
2158-
for _, match := range matches2 {
2150+
for _, match := range matches[1] {
21592151
if match.Rule.Metadata["config"] != "version-2" {
21602152
addInconsistency("FindAllMatches: V2 query returned non-V2 rule")
21612153
}

matcher.go

Lines changed: 121 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -617,11 +617,11 @@ func (m *InMemoryMatcher) deleteDimension(dimensionName string) error {
617617
// FindBestMatch finds the best matching rule for a query
618618
func (m *InMemoryMatcher) FindBestMatch(query *QueryRule) (*MatchResult, error) {
619619
start := time.Now()
620-
620+
621621
// Update query count using atomic operation (no lock needed)
622622
atomic.AddInt64(&m.stats.TotalQueries, 1)
623623

624-
// Check cache first
624+
// Check cache first while holding read lock
625625
if result := m.cache.Get(query); result != nil {
626626
m.updateCacheStats(true)
627627
// Update average query time without lock
@@ -631,8 +631,15 @@ func (m *InMemoryMatcher) FindBestMatch(query *QueryRule) (*MatchResult, error)
631631

632632
m.updateCacheStats(false)
633633

634-
// Find all matches
635-
matches, err := m.FindAllMatches(query)
634+
// Acquire read lock for the entire lookup path so the cache check,
635+
// candidate computation and cache population happen with a consistent
636+
// view of the authoritative state. This prevents races where an
637+
// UpdateRule can interleave and make the query observe partial updates.
638+
m.mu.RLock()
639+
defer m.mu.RUnlock()
640+
641+
// Find all matches while holding the read lock using the nop-lock helper
642+
matches, err := m.findAllMatchesNoLock(query)
636643
if err != nil {
637644
// Update average query time without lock
638645
m.updateQueryTimeStats(time.Since(start))
@@ -647,7 +654,7 @@ func (m *InMemoryMatcher) FindBestMatch(query *QueryRule) (*MatchResult, error)
647654

648655
best := matches[0] // already sorted
649656

650-
// Cache the result
657+
// Cache the result (safe because we used read lock while computing matches)
651658
m.cache.Set(query, best)
652659

653660
// Update average query time without lock
@@ -662,13 +669,47 @@ func (m *InMemoryMatcher) updateQueryTimeStats(queryTime time.Duration) {
662669
// to avoid taking locks in the read path. This trades off some precision for
663670
// better concurrency performance and prevents read starvation.
664671
// The stats will be updated during rebuild, add/update/delete operations.
672+
atomic.AddInt64(&m.stats.TotalQueryTime, queryTime.Milliseconds())
665673
}
666674

667675
// FindAllMatches finds all matching rules for a query
668676
func (m *InMemoryMatcher) FindAllMatches(query *QueryRule) ([]*MatchResult, error) {
677+
// RLocked compatibility wrapper: acquire read lock and call helper
678+
m.mu.RLock()
679+
defer m.mu.RUnlock()
680+
return m.findAllMatchesNoLock(query)
681+
}
682+
683+
// FindAllMatchesInBatch finds the best matching rule for each query in the provided
684+
// slice and returns results in the same order. The entire operation is
685+
// performed while holding the matcher's read lock so the caller sees a
686+
// consistent snapshot with respect to concurrent updates.
687+
func (m *InMemoryMatcher) FindAllMatchesInBatch(queries []*QueryRule) ([][]*MatchResult, error) {
669688
m.mu.RLock()
670689
defer m.mu.RUnlock()
671690

691+
results := make([][]*MatchResult, len(queries))
692+
693+
for i, q := range queries {
694+
matches, err := m.findAllMatchesNoLock(q)
695+
if err != nil {
696+
return nil, err
697+
}
698+
699+
if len(matches) == 0 {
700+
results[i] = nil
701+
continue
702+
}
703+
704+
results[i] = matches
705+
}
706+
707+
return results, nil
708+
}
709+
710+
// findAllMatchesNoLock performs the match candidate verification without taking locks.
711+
// Caller must hold appropriate locks (RLock/RWMutex) if concurrency safety is required.
712+
func (m *InMemoryMatcher) findAllMatchesNoLock(query *QueryRule) ([]*MatchResult, error) {
672713
// Get candidate rules from appropriate forest index
673714
forestIndex := m.getForestIndex(query.TenantID, query.ApplicationID)
674715
var candidates []RuleWithWeight
@@ -682,7 +723,7 @@ func (m *InMemoryMatcher) FindAllMatches(query *QueryRule) ([]*MatchResult, erro
682723

683724
var matches []*MatchResult
684725

685-
// ATOMIC CONSISTENCY: Double-check approach to prevent race conditions
726+
// Double-check approach to prevent race conditions
686727
// For each candidate from forest, verify it actually matches the query dimensions
687728
// AND exists in m.rules AND its dimensions in m.rules still match the query
688729
for _, candidate := range candidates {
@@ -720,6 +761,65 @@ func (m *InMemoryMatcher) FindAllMatches(query *QueryRule) ([]*MatchResult, erro
720761
return matches, nil
721762
}
722763

764+
// FindBestMatchInBatch finds the best matching rule for each query in the provided
765+
// slice and returns results in the same order. The entire operation is
766+
// performed while holding the matcher's read lock so the caller sees a
767+
// consistent snapshot with respect to concurrent updates.
768+
func (m *InMemoryMatcher) FindBestMatchInBatch(queries []*QueryRule) ([]*MatchResult, error) {
769+
start := time.Now()
770+
771+
// Count these as queries (approximate) for stats
772+
atomic.AddInt64(&m.stats.TotalQueries, int64(len(queries)))
773+
774+
m.mu.RLock()
775+
defer m.mu.RUnlock()
776+
777+
results := make([]*MatchResult, len(queries))
778+
779+
// Collect items to cache after computing (we avoid mutating cache while
780+
// holding the read lock in a way that could conflict with other writers).
781+
type toCacheItem struct {
782+
q *QueryRule
783+
best *MatchResult
784+
}
785+
var toCache []toCacheItem
786+
787+
for i, q := range queries {
788+
// Check cache first
789+
if res := m.cache.Get(q); res != nil {
790+
m.updateCacheStats(true)
791+
results[i] = res
792+
continue
793+
}
794+
795+
m.updateCacheStats(false)
796+
797+
matches, err := m.findAllMatchesNoLock(q)
798+
if err != nil {
799+
return nil, err
800+
}
801+
802+
if len(matches) == 0 {
803+
results[i] = nil
804+
continue
805+
}
806+
807+
best := matches[0]
808+
results[i] = best
809+
toCache = append(toCache, toCacheItem{q: q, best: best})
810+
}
811+
812+
// Populate cache for computed results
813+
for _, item := range toCache {
814+
m.cache.Set(item.q, item.best)
815+
}
816+
817+
// Update average query time without lock
818+
m.updateQueryTimeStats(time.Since(start))
819+
820+
return results, nil
821+
}
822+
723823
// dimensionsEqual checks if two rules have identical dimensions
724824
func (m *InMemoryMatcher) dimensionsEqual(rule1, rule2 *Rule) bool {
725825
if len(rule1.Dimensions) != len(rule2.Dimensions) {
@@ -1044,12 +1144,25 @@ func (m *InMemoryMatcher) GetStats() *MatcherStats {
10441144
m.mu.RLock()
10451145
defer m.mu.RUnlock()
10461146

1047-
// Create a copy to avoid race conditions, using atomic read for TotalQueries
1147+
// Create a copy to avoid race conditions
10481148
stats := *m.stats
1049-
stats.TotalQueries = atomic.LoadInt64(&m.stats.TotalQueries)
1149+
// Guard against divide-by-zero when no queries have been recorded yet
1150+
if stats.TotalQueries > 0 {
1151+
stats.AverageQueryTime = stats.TotalQueryTime / stats.TotalQueries
1152+
} else {
1153+
stats.AverageQueryTime = 0
1154+
}
10501155
return &stats
10511156
}
10521157

1158+
// SetAllowDuplicateWeights configures whether rules with duplicate weights are allowed
1159+
// By default, duplicate weights are not allowed to ensure deterministic matching
1160+
func (m *InMemoryMatcher) SetAllowDuplicateWeights(allow bool) {
1161+
m.mu.Lock()
1162+
defer m.mu.Unlock()
1163+
m.allowDuplicateWeights = allow
1164+
}
1165+
10531166
// SaveToPersistence saves current state to persistence layer
10541167
func (m *InMemoryMatcher) SaveToPersistence() error {
10551168
m.mu.RLock()

0 commit comments

Comments
 (0)