Skip to content

Commit e10840a

Browse files
tests
1 parent 54e8d3c commit e10840a

File tree

1 file changed

+292
-0
lines changed

1 file changed

+292
-0
lines changed

provider/buffered/provider_test.go

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
//go:build go1.25
2+
// +build go1.25
3+
4+
package buffered
5+
6+
import (
7+
"bytes"
8+
"sync"
9+
"testing"
10+
"testing/synctest"
11+
"time"
12+
13+
"github.com/ipfs/go-datastore"
14+
"github.com/ipfs/go-test/random"
15+
"github.com/libp2p/go-libp2p-kad-dht/provider/internal"
16+
mh "github.com/multiformats/go-multihash"
17+
)
18+
19+
var _ internal.Provider = (*fakeProvider)(nil)
20+
21+
type fakeProvider struct {
22+
mu sync.Mutex
23+
provideOnceCalls [][]mh.Multihash
24+
startProvidingCalls []startProvidingCall
25+
stopProvidingCalls [][]mh.Multihash
26+
27+
// Signal when operations are processed
28+
processed chan struct{}
29+
}
30+
31+
type startProvidingCall struct {
32+
force bool
33+
keys []mh.Multihash
34+
}
35+
36+
func (f *fakeProvider) ProvideOnce(keys ...mh.Multihash) error {
37+
f.mu.Lock()
38+
defer f.mu.Unlock()
39+
if len(keys) > 0 {
40+
f.provideOnceCalls = append(f.provideOnceCalls, keys)
41+
if f.processed != nil {
42+
select {
43+
case f.processed <- struct{}{}:
44+
default:
45+
}
46+
}
47+
}
48+
return nil
49+
}
50+
51+
func (f *fakeProvider) StartProviding(force bool, keys ...mh.Multihash) error {
52+
f.mu.Lock()
53+
defer f.mu.Unlock()
54+
if len(keys) > 0 {
55+
f.startProvidingCalls = append(f.startProvidingCalls, startProvidingCall{
56+
force: force,
57+
keys: keys,
58+
})
59+
if f.processed != nil {
60+
select {
61+
case f.processed <- struct{}{}:
62+
default:
63+
}
64+
}
65+
}
66+
return nil
67+
}
68+
69+
func (f *fakeProvider) StopProviding(keys ...mh.Multihash) error {
70+
f.mu.Lock()
71+
defer f.mu.Unlock()
72+
if len(keys) > 0 {
73+
f.stopProvidingCalls = append(f.stopProvidingCalls, keys)
74+
if f.processed != nil {
75+
select {
76+
case f.processed <- struct{}{}:
77+
default:
78+
}
79+
}
80+
}
81+
return nil
82+
}
83+
84+
func (f *fakeProvider) Clear() int {
85+
// Unused
86+
return 0
87+
}
88+
89+
func (f *fakeProvider) RefreshSchedule() error {
90+
// Unused
91+
return nil
92+
}
93+
94+
func (f *fakeProvider) Close() error {
95+
// Unused
96+
return nil
97+
}
98+
99+
func newFakeProvider() *fakeProvider {
100+
return &fakeProvider{
101+
processed: make(chan struct{}, 10), // Buffered channel for test signaling
102+
}
103+
}
104+
105+
func TestQueueingMechanism(t *testing.T) {
106+
synctest.Test(t, func(t *testing.T) {
107+
fake := newFakeProvider()
108+
ds := datastore.NewMapDatastore()
109+
provider := New(fake, ds,
110+
WithDsName("test1"),
111+
WithIdleWriteTime(time.Millisecond),
112+
WithBatchSize(10))
113+
defer provider.Close()
114+
115+
keys := random.Multihashes(3)
116+
117+
// Queue various operations
118+
if err := provider.ProvideOnce(keys[0]); err != nil {
119+
t.Fatalf("ProvideOnce failed: %v", err)
120+
}
121+
if err := provider.StartProviding(false, keys[1]); err != nil {
122+
t.Fatalf("StartProviding failed: %v", err)
123+
}
124+
if err := provider.StartProviding(true, keys[2]); err != nil {
125+
t.Fatalf("StartProviding (force) failed: %v", err)
126+
}
127+
if err := provider.StopProviding(keys[0]); err != nil {
128+
t.Fatalf("StopProviding failed: %v", err)
129+
}
130+
131+
// Wait for operations to be processed by expecting 4 signals
132+
for i := 0; i < 4; i++ {
133+
select {
134+
case <-fake.processed:
135+
case <-time.After(time.Second):
136+
t.Fatalf("Timeout waiting for operation %d to be processed", i+1)
137+
}
138+
}
139+
140+
// Verify all operations were dequeued and processed
141+
if len(fake.provideOnceCalls) != 1 {
142+
t.Errorf("Expected 1 ProvideOnce call, got %d", len(fake.provideOnceCalls))
143+
} else if len(fake.provideOnceCalls[0]) != 1 || !bytes.Equal(fake.provideOnceCalls[0][0], keys[0]) {
144+
t.Errorf("Expected ProvideOnce call with keys[0], got %v", fake.provideOnceCalls[0])
145+
}
146+
147+
if len(fake.startProvidingCalls) != 2 {
148+
t.Errorf("Expected 2 StartProviding calls, got %d", len(fake.startProvidingCalls))
149+
} else {
150+
// Check that we have one force=true call and one force=false call
151+
foundForce := false
152+
foundRegular := false
153+
for _, call := range fake.startProvidingCalls {
154+
if call.force {
155+
foundForce = true
156+
if len(call.keys) != 1 || !bytes.Equal(call.keys[0], keys[2]) {
157+
t.Errorf("Expected force StartProviding call with keys[2], got %v", call.keys)
158+
}
159+
} else {
160+
foundRegular = true
161+
if len(call.keys) != 1 || !bytes.Equal(call.keys[0], keys[1]) {
162+
t.Errorf("Expected regular StartProviding call with keys[1], got %v", call.keys)
163+
}
164+
}
165+
}
166+
if !foundForce {
167+
t.Errorf("Expected to find a StartProviding call with force=true")
168+
}
169+
if !foundRegular {
170+
t.Errorf("Expected to find a StartProviding call with force=false")
171+
}
172+
}
173+
174+
if len(fake.stopProvidingCalls) != 1 {
175+
t.Errorf("Expected 1 StopProviding call, got %d", len(fake.stopProvidingCalls))
176+
} else if len(fake.stopProvidingCalls[0]) != 1 || !bytes.Equal(fake.stopProvidingCalls[0][0], keys[0]) {
177+
t.Errorf("Expected StopProviding call with keys[0], got %v", fake.stopProvidingCalls[0])
178+
}
179+
})
180+
}
181+
182+
func TestStartProvidingAfterStopProvidingRemovesStopOperation(t *testing.T) {
183+
// Test the core logic directly by calling getOperations with known data
184+
t.Run("DirectTest", func(t *testing.T) {
185+
key := random.Multihashes(1)[0]
186+
187+
// Create batch data that simulates StopProviding followed by StartProviding
188+
stopData := toBytes(stopProvidingOp, key)
189+
startData := toBytes(startProvidingOp, key)
190+
191+
dequeued := [][]byte{stopData, startData}
192+
ops, err := getOperations(dequeued) // We need to create this helper
193+
if err != nil {
194+
t.Fatalf("getOperations failed: %v", err)
195+
}
196+
197+
// StartProviding should be present
198+
if len(ops[startProvidingOp]) != 1 || !bytes.Equal(ops[startProvidingOp][0], key) {
199+
t.Errorf("Expected StartProviding operation with key, got %v", ops[startProvidingOp])
200+
}
201+
202+
// StopProviding should be canceled (empty)
203+
if len(ops[stopProvidingOp]) != 0 {
204+
t.Errorf("Expected StopProviding operations to be canceled, got %v", ops[stopProvidingOp])
205+
}
206+
})
207+
}
208+
209+
func TestMultipleOperationsOnSameKey(t *testing.T) {
210+
// Test the core batch processing logic directly
211+
t.Run("DirectTest", func(t *testing.T) {
212+
key := random.Multihashes(1)[0]
213+
214+
// Create batch data with multiple operations on same key
215+
ops := [][]byte{
216+
toBytes(stopProvidingOp, key), // StopProviding
217+
toBytes(forceStartProvidingOp, key), // StartProviding(force=true)
218+
toBytes(stopProvidingOp, key), // StopProviding again
219+
toBytes(startProvidingOp, key), // StartProviding(force=false)
220+
}
221+
222+
processed, err := getOperations(ops)
223+
if err != nil {
224+
t.Fatalf("getOperations failed: %v", err)
225+
}
226+
227+
// Should have 2 StartProviding operations
228+
if len(processed[startProvidingOp]) != 1 {
229+
t.Errorf("Expected 1 StartProviding (force=false) operation, got %d", len(processed[startProvidingOp]))
230+
}
231+
if len(processed[forceStartProvidingOp]) != 1 {
232+
t.Errorf("Expected 1 StartProviding (force=true) operation, got %d", len(processed[forceStartProvidingOp]))
233+
}
234+
235+
// StopProviding should be canceled (empty) because StartProviding operations were in same batch
236+
if len(processed[stopProvidingOp]) != 0 {
237+
t.Errorf("Expected 0 StopProviding operations (should be canceled), got %d", len(processed[stopProvidingOp]))
238+
}
239+
})
240+
}
241+
242+
func TestBatchProcessing(t *testing.T) {
243+
synctest.Test(t, func(t *testing.T) {
244+
fake := newFakeProvider()
245+
ds := datastore.NewMapDatastore()
246+
provider := New(fake, ds,
247+
WithDsName("test4"),
248+
WithBatchSize(3), // Process 3 operations at once
249+
WithIdleWriteTime(time.Second))
250+
defer provider.Close()
251+
252+
// Queue multiple keys - total of 3 operations (2 from ProvideOnce + 1 from StartProviding)
253+
keys := random.Multihashes(3)
254+
255+
if err := provider.ProvideOnce(keys[0], keys[1]); err != nil {
256+
t.Fatalf("ProvideOnce failed: %v", err)
257+
}
258+
if err := provider.StartProviding(false, keys[2]); err != nil {
259+
t.Fatalf("StartProviding failed: %v", err)
260+
}
261+
262+
// Wait for batch to be triggered (should process all 3 operations in one batch)
263+
// Expect 2 signals: 1 for ProvideOnce (with 2 keys), 1 for StartProviding (with 1 key)
264+
for i := 0; i < 2; i++ {
265+
select {
266+
case <-fake.processed:
267+
case <-time.After(time.Second):
268+
t.Fatalf("Timeout waiting for operation %d to be processed", i+1)
269+
}
270+
}
271+
272+
// Close to ensure all operations are flushed
273+
provider.Close()
274+
275+
// Verify operations were batched correctly
276+
totalProvideOnceCalls := 0
277+
for _, call := range fake.provideOnceCalls {
278+
totalProvideOnceCalls += len(call)
279+
}
280+
if totalProvideOnceCalls != 2 {
281+
t.Errorf("Expected 2 total keys in ProvideOnce calls, got %d", totalProvideOnceCalls)
282+
}
283+
284+
totalStartProvidingCalls := 0
285+
for _, call := range fake.startProvidingCalls {
286+
totalStartProvidingCalls += len(call.keys)
287+
}
288+
if totalStartProvidingCalls != 1 {
289+
t.Errorf("Expected 1 total key in StartProviding calls, got %d", totalStartProvidingCalls)
290+
}
291+
})
292+
}

0 commit comments

Comments
 (0)