Skip to content

Commit 5c5f9ca

Browse files
committed
fix: add runJob helper for non-blocking semaphore pattern
1 parent 717f268 commit 5c5f9ca

File tree

1 file changed

+31
-34
lines changed

1 file changed

+31
-34
lines changed

p2p/database/datastore.go

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,20 @@ type DatastoreOptions struct {
129129
TTL time.Duration
130130
}
131131

132+
// runJob executes fn in a goroutine with semaphore-based concurrency control.
133+
// It will wait for a semaphore slot without blocking the caller, and respects context cancellation.
134+
func (d *Datastore) runJob(ctx context.Context, fn func()) {
135+
go func() {
136+
select {
137+
case d.jobs <- struct{}{}:
138+
fn()
139+
<-d.jobs
140+
case <-ctx.Done():
141+
return
142+
}
143+
}()
144+
}
145+
132146
// NewDatastore connects to datastore and creates the client. This should
133147
// only be called once unless trying to write to different databases.
134148
func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
@@ -159,19 +173,15 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
159173
}
160174

161175
if d.ShouldWriteBlockEvents() {
162-
d.jobs <- struct{}{}
163-
go func() {
176+
d.runJob(ctx, func() {
164177
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
165-
<-d.jobs
166-
}()
178+
})
167179
}
168180

169181
if d.ShouldWriteBlocks() {
170-
d.jobs <- struct{}{}
171-
go func() {
182+
d.runJob(ctx, func() {
172183
d.writeBlock(ctx, block, td, tfs)
173-
<-d.jobs
174-
}()
184+
})
175185
}
176186
}
177187

@@ -185,11 +195,10 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
185195
}
186196

187197
for _, h := range headers {
188-
d.jobs <- struct{}{}
189-
go func(header *types.Header) {
198+
header := h
199+
d.runJob(ctx, func() {
190200
d.writeBlockHeader(ctx, header, tfs)
191-
<-d.jobs
192-
}(h)
201+
})
193202
}
194203
}
195204

@@ -203,11 +212,9 @@ func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, has
203212
return
204213
}
205214

206-
d.jobs <- struct{}{}
207-
go func() {
215+
d.runJob(ctx, func() {
208216
d.writeBlockBody(ctx, body, hash, tfs)
209-
<-d.jobs
210-
}()
217+
})
211218
}
212219

213220
// WriteBlockHashes will write the block events to datastore.
@@ -216,11 +223,9 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
216223
return
217224
}
218225

219-
d.jobs <- struct{}{}
220-
go func() {
226+
d.runJob(ctx, func() {
221227
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs)
222-
<-d.jobs
223-
}()
228+
})
224229
}
225230

226231
// WriteTransactions will write the transactions and transaction events to datastore.
@@ -230,11 +235,9 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
230235
}
231236

232237
if d.ShouldWriteTransactions() {
233-
d.jobs <- struct{}{}
234-
go func() {
238+
d.runJob(ctx, func() {
235239
d.writeTransactions(ctx, txs, tfs)
236-
<-d.jobs
237-
}()
240+
})
238241
}
239242

240243
if d.ShouldWriteTransactionEvents() {
@@ -243,11 +246,9 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs
243246
hashes = append(hashes, tx.Hash())
244247
}
245248

246-
d.jobs <- struct{}{}
247-
go func() {
249+
d.runJob(ctx, func() {
248250
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs)
249-
<-d.jobs
250-
}()
251+
})
251252
}
252253
}
253254

@@ -257,9 +258,7 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.
257258
return
258259
}
259260

260-
d.jobs <- struct{}{}
261-
go func() {
262-
261+
d.runJob(ctx, func() {
263262
keys := make([]*datastore.Key, 0, len(peers))
264263
dsPeers := make([]*DatastorePeer, 0, len(peers))
265264

@@ -279,9 +278,7 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.
279278
if err != nil {
280279
log.Error().Err(err).Msg("Failed to write peers")
281280
}
282-
283-
<-d.jobs
284-
}()
281+
})
285282
}
286283

287284
func (d *Datastore) MaxConcurrentWrites() int {

0 commit comments

Comments
 (0)