Skip to content

Commit 4b24198

Browse files
authored
Adds context handling to concurrent core
1 parent b083696 commit 4b24198

File tree

4 files changed

+154
-134
lines changed

4 files changed

+154
-134
lines changed

core/concurrent_cache/concurrent_cache.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package concurrent_cache
88

99
import (
10+
"context"
1011
"fmt"
1112
"slices"
1213
"sync"
@@ -196,7 +197,7 @@ func Query(query ...Subquery) []Subquery {
196197
// 5. Merge all queries
197198
// Returns sorted list of subqueries
198199
// 6. Acquire locks and fill in Results
199-
func Lock(queries ...[]Subquery) ([]Result, func(), error) {
200+
func Lock(ctx context.Context, queries ...[]Subquery) ([]Result, func(), error) {
200201
roots := make([][]int, len(queries))
201202
cachesPresent := make(map[resource]struct{}, resourceCount)
202203
// phase 1, requires no locks, check errors, dedupe, and build trees
@@ -211,7 +212,11 @@ func Lock(queries ...[]Subquery) ([]Result, func(), error) {
211212

212213
// phase 3, takes per-resource locks
213214
merged := mergeQueries(queries)
214-
return lockQuery(merged, len(queries))
215+
results, unlocker, err := lockQuery(merged, len(queries))
216+
if err == nil && ctx.Err() != nil {
217+
err = ctx.Err()
218+
}
219+
return results, unlocker, err
215220
}
216221

217222
func assembleQueries(queries [][]Subquery, roots [][]int, cachesPresent map[resource]struct{}) error {

core/concurrent_cache/concurrent_cache_test.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package concurrent_cache
22

33
import (
4+
"context"
45
"fmt"
56
"math/rand"
67
"reflect"
@@ -266,7 +267,7 @@ func TestLock(t *testing.T) {
266267
for i := 0; i < n; i++ {
267268
go func() {
268269
defer wg.Done()
269-
results, unlocker, err := Lock(makeQueries()...)
270+
results, unlocker, err := Lock(context.Background(), makeQueries()...)
270271
defer unlocker()
271272
assert.NoError(t, err)
272273
assert.NotNil(t, results[2].VolumePublication.Upsert)
@@ -479,7 +480,7 @@ func TestLockNeverDeadlocks(t *testing.T) {
479480
for i := 0; i < tc.numQueries; i++ {
480481
queries[i] = generateRandomQuery(queryGenerators[rand.Intn(len(queryGenerators))], tc.maxSubqueriesPerQuery)
481482
}
482-
results, unlock, err := Lock(queries...)
483+
results, unlock, err := Lock(context.Background(), queries...)
483484
defer unlock()
484485
time.Sleep(time.Duration(10+rand.Intn(40)) * time.Millisecond) // Simulate some processing time
485486
for _, operation := range []string{"Upsert", "Delete"} {
@@ -616,7 +617,7 @@ func TestLockWithDependencyChains(t *testing.T) {
616617
var err error
617618

618619
go func() {
619-
results, unlock, err = Lock(tc.querySets...)
620+
results, unlock, err = Lock(context.Background(), tc.querySets...)
620621
time.Sleep(10 * time.Millisecond) // Simulate some processing time
621622
for _, operation := range []string{"Upsert", "Delete"} {
622623
doOperation(t, operation, results)
@@ -776,3 +777,11 @@ func cleanupTestData() {
776777
volumePublications.data = make(map[string]SmartCopier)
777778
volumePublications.unlock()
778779
}
780+
781+
func TestLockCancel(t *testing.T) {
782+
ctx, cancel := context.WithTimeout(context.Background(), -1*time.Second)
783+
defer cancel()
784+
_, unlocker, err := Lock(ctx, Query(UpsertNode("node1")))
785+
defer unlocker()
786+
assert.ErrorContains(t, err, "context deadline exceeded")
787+
}

0 commit comments

Comments
 (0)