Skip to content

Commit bafd905

Browse files
authored
Merge pull request #1729 from lightninglabs/sync-supply-commit
universe/supplycommit: add done channel to enable sync processing of events
2 parents abb378e + 7438d5c commit bafd905

File tree

5 files changed

+306
-11
lines changed

5 files changed

+306
-11
lines changed

rpcserver.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4085,7 +4085,9 @@ func (r *rpcServer) IgnoreAssetOutPoint(ctx context.Context,
40854085
ignoreEvent := supplycommit.NewIgnoreEvent{
40864086
SignedIgnoreTuple: signedIgnore,
40874087
}
4088-
err = r.cfg.SupplyCommitManager.SendEvent(ctx, assetSpec, &ignoreEvent)
4088+
err = r.cfg.SupplyCommitManager.SendEventSync(
4089+
ctx, assetSpec, &ignoreEvent,
4090+
)
40894091
if err != nil {
40904092
return nil, fmt.Errorf("failed to upsert ignore tuple: %w", err)
40914093
}

universe/supplycommit/multi_sm_manager.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,46 @@ func (m *MultiStateMachineManager) SendEvent(ctx context.Context,
225225
return nil
226226
}
227227

228+
// SendEventSync sends an event to the state machine and waits for it to be
229+
// processed and written to disk. This method provides synchronous confirmation
230+
// that the event has been durably persisted. If the event doesn't support
231+
// synchronous processing (i.e., it's not a SupplyUpdateEvent), this method will
232+
// return an error.
233+
func (m *MultiStateMachineManager) SendEventSync(ctx context.Context,
234+
assetSpec asset.Specifier, event SyncSupplyUpdateEvent) error {
235+
236+
// Only SupplyUpdateEvents can be processed synchronously.
237+
supplyEvent, ok := event.(SupplyUpdateEvent)
238+
if !ok {
239+
return fmt.Errorf("event type %T does not support "+
240+
"synchronous processing", event)
241+
}
242+
243+
// We'll use this channel to signal when the event has been processed.
244+
done := make(chan error, 1)
245+
246+
// As the event has already been created, we'll set the done channel
247+
// directly.
248+
switch e := supplyEvent.(type) {
249+
case *NewMintEvent:
250+
e.Done = done
251+
case *NewBurnEvent:
252+
e.Done = done
253+
case *NewIgnoreEvent:
254+
e.Done = done
255+
default:
256+
return fmt.Errorf("unexpected supply update event type: %T",
257+
supplyEvent)
258+
}
259+
260+
// Send the event to the state machine, the pause and wait until it
261+
// signals that the event has been fully processed.
262+
if err := m.SendEvent(ctx, assetSpec, supplyEvent); err != nil {
263+
return err
264+
}
265+
return event.WaitForDone(ctx)
266+
}
267+
228268
// CanHandle determines if the state machine associated with the given asset
229269
// specifier can handle the given message. If a state machine for the asset
230270
// group does not exist, it will be created and started.

universe/supplycommit/state_machine_test.go

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func newTestMintEvent(t *testing.T, scriptKey *btcec.PublicKey,
9393
LeafKey: leafKey,
9494
IssuanceProof: issuanceProof,
9595
MintHeight: 1000,
96+
Done: make(chan error, 1),
9697
}
9798
}
9899

@@ -326,6 +327,32 @@ func (h *supplyCommitTestHarness) sendEvent(event Event) {
326327
h.t.Helper()
327328

328329
h.stateMachine.SendEvent(context.Background(), event)
330+
331+
// If this is a SyncSupplyUpdateEvent with a done channel, wait for it.
332+
if syncEvent, ok := event.(SyncSupplyUpdateEvent); ok {
333+
// Check if the event has a done channel by attempting to wait.
334+
// We use a short timeout to avoid blocking indefinitely.
335+
ctx, cancel := context.WithTimeout(
336+
context.Background(), testTimeout,
337+
)
338+
defer cancel()
339+
340+
err := syncEvent.WaitForDone(ctx)
341+
342+
switch {
343+
// If the done channel is nil, this is async mode - not an
344+
// error.
345+
case errors.Is(err, ErrNilDoneChannel):
346+
return
347+
348+
// If we got a timeout, that's a real error.
349+
case errors.Is(err, ErrEventTimeout):
350+
require.NoError(h.t, err, "sync event timed out")
351+
}
352+
353+
// For other errors (e.g., database errors), tests should verify
354+
// them through other means (e.g., assertNoStateTransitions).
355+
}
329356
}
330357

331358
func (h *supplyCommitTestHarness) expectInsertPendingUpdate(
@@ -1418,7 +1445,9 @@ func TestSupplyUpdateEventTypes(t *testing.T) {
14181445

14191446
// Finally, we assert that the RawProof field of the decoded
14201447
// event matches the original serialized proof bytes, confirming
1421-
// a successful round trip.
1448+
// a successful round trip. Note: We don't encode/decode the Done
1449+
// channel, so we clear it before comparison.
1450+
originalMintEvent.Done = nil
14221451
require.Equal(t, originalMintEvent, decodedMintEvent)
14231452

14241453
require.Equal(
@@ -1903,3 +1932,81 @@ func TestStateAndEventMethods(t *testing.T) {
19031932
require.False(t, (&DefaultState{}).IsTerminal())
19041933
})
19051934
}
1935+
1936+
// TestSyncSupplyUpdateEventEdgeCases tests edge cases for synchronous event
1937+
// handling.
1938+
func TestSyncSupplyUpdateEventEdgeCases(t *testing.T) {
1939+
t.Parallel()
1940+
1941+
testScriptKey := test.RandPubKey(t)
1942+
defaultAssetSpec := asset.NewSpecifierFromId(testAssetID)
1943+
1944+
// Test that if we get an error during processing, it propagates..
1945+
t.Run("error_propagation", func(t *testing.T) {
1946+
h := newSupplyCommitTestHarness(t, &harnessCfg{
1947+
initialState: &DefaultState{},
1948+
assetSpec: defaultAssetSpec,
1949+
})
1950+
h.start()
1951+
defer h.stopAndAssert()
1952+
1953+
mintEvent := newTestMintEvent(
1954+
t, testScriptKey, randOutPoint(t),
1955+
)
1956+
1957+
// Expect the insert to fail.
1958+
insertErr := errors.New("database error")
1959+
h.mockStateLog.On(
1960+
"InsertPendingUpdate", mock.Anything, h.cfg.assetSpec,
1961+
mintEvent,
1962+
).Return(insertErr).Once()
1963+
1964+
h.expectFailure(insertErr)
1965+
h.sendEvent(mintEvent)
1966+
1967+
// The sendEvent method should have already waited and checked
1968+
// the error. We're verifying that the test harness correctly
1969+
// propagated the error.
1970+
h.assertNoStateTransitions()
1971+
})
1972+
1973+
// If we cancel the context, WaitForDone should return an error.
1974+
t.Run("context_cancellation", func(t *testing.T) {
1975+
mintEvent := newTestMintEvent(
1976+
t, testScriptKey, randOutPoint(t),
1977+
)
1978+
1979+
ctx, cancel := context.WithCancel(context.Background())
1980+
cancel()
1981+
1982+
err := mintEvent.WaitForDone(ctx)
1983+
require.Error(t, err, "expected context cancellation error")
1984+
require.ErrorIs(t, err, ErrEventTimeout)
1985+
require.ErrorIs(t, err, context.Canceled)
1986+
})
1987+
1988+
// Passing a nil done channel should return an error.
1989+
t.Run("nil_done_channel", func(t *testing.T) {
1990+
mintEvent := &NewMintEvent{
1991+
LeafKey: universe.AssetLeafKey{
1992+
BaseLeafKey: universe.BaseLeafKey{
1993+
OutPoint: randOutPoint(t),
1994+
ScriptKey: &asset.ScriptKey{
1995+
PubKey: testScriptKey,
1996+
},
1997+
},
1998+
AssetID: testAssetID,
1999+
},
2000+
IssuanceProof: universe.Leaf{
2001+
GenesisWithGroup: universe.GenesisWithGroup{
2002+
Genesis: testGenesis,
2003+
},
2004+
},
2005+
MintHeight: 1000,
2006+
}
2007+
2008+
err := mintEvent.WaitForDone(context.Background())
2009+
require.Error(t, err, "expected error for nil done channel")
2010+
require.ErrorIs(t, err, ErrNilDoneChannel)
2011+
})
2012+
}

universe/supplycommit/states.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package supplycommit
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io"
78

@@ -20,6 +21,14 @@ var (
2021
// ErrInvalidStateTransition is returned when we receive an unexpected
2122
// event for a given state.
2223
ErrInvalidStateTransition = fmt.Errorf("invalid state transition")
24+
25+
// ErrEventTimeout is returned when waiting for a synchronous event
26+
// times out due to context cancellation.
27+
ErrEventTimeout = fmt.Errorf("event processing timeout")
28+
29+
// ErrNilDoneChannel is returned when attempting to wait for a
30+
// synchronous event that doesn't have a done channel.
31+
ErrNilDoneChannel = fmt.Errorf("done channel is nil")
2332
)
2433

2534
// Event is a special interface used to create the equivalent of a sum-type, but
@@ -96,15 +105,68 @@ type SupplyUpdateEvent interface {
96105
Encode(io.Writer) error
97106
}
98107

108+
// SyncSupplyUpdateEvent is an interface that extends SupplyUpdateEvent with
109+
// the ability to signal completion when the event has been processed.
110+
type SyncSupplyUpdateEvent interface {
111+
SupplyUpdateEvent
112+
113+
// SignalDone signals completion on the done channel if it exists.
114+
// It sends the error (or nil for success) and does not block.
115+
SignalDone(error)
116+
117+
// WaitForDone waits for the event to be processed by waiting on its
118+
// done channel. This is a helper method that can be used by callers who
119+
// want synchronous confirmation that their event has been persisted.
120+
WaitForDone(context.Context) error
121+
}
122+
123+
// waitForDone is an internal helper function that waits for an event to be
124+
// processed by waiting on its done channel.
125+
func waitForDone(ctx context.Context, done chan error) error {
126+
if done == nil {
127+
return ErrNilDoneChannel
128+
}
129+
130+
select {
131+
case err := <-done:
132+
return err
133+
case <-ctx.Done():
134+
return fmt.Errorf("%w: %w", ErrEventTimeout, ctx.Err())
135+
}
136+
}
137+
99138
// NewIgnoreEvent signals that a caller wishes to update the ignore portion of
100139
// the supply tree with a new outpoint + script key combo.
101140
type NewIgnoreEvent struct {
102141
universe.SignedIgnoreTuple
142+
143+
// Done is an optional channel that will receive an error (or nil for
144+
// success) when the event has been processed and written to disk. If
145+
// nil, the event is processed asynchronously.
146+
Done chan error
103147
}
104148

105149
// eventSealed is a special method that is used to seal the interface.
106150
func (n *NewIgnoreEvent) eventSealed() {}
107151

152+
// SignalDone signals completion on the done channel if it exists. It sends the
153+
// error (or nil for success) and does not block.
154+
func (n *NewIgnoreEvent) SignalDone(err error) {
155+
if n.Done != nil {
156+
select {
157+
case n.Done <- err:
158+
default:
159+
}
160+
}
161+
}
162+
163+
// WaitForDone waits for the event to be processed by waiting on its done
164+
// channel. This is a helper method that can be used by callers who want
165+
// synchronous confirmation that their event has been persisted.
166+
func (n *NewIgnoreEvent) WaitForDone(ctx context.Context) error {
167+
return waitForDone(ctx, n.Done)
168+
}
169+
108170
// BlockHeight returns the block height of the update.
109171
func (n *NewIgnoreEvent) BlockHeight() uint32 {
110172
return n.SignedIgnoreTuple.IgnoreTuple.Val.BlockHeight
@@ -141,15 +203,42 @@ func (n *NewIgnoreEvent) Encode(w io.Writer) error {
141203
// SupplyUpdateEvent interface.
142204
var _ SupplyUpdateEvent = (*NewIgnoreEvent)(nil)
143205

206+
// A compile time assertion to ensure that NewIgnoreEvent implements the
207+
// SyncSupplyUpdateEvent interface.
208+
var _ SyncSupplyUpdateEvent = (*NewIgnoreEvent)(nil)
209+
144210
// NewBurnEvent signals that a caller wishes to update the burn portion of
145211
// the supply tree with a new burnt asset.
146212
type NewBurnEvent struct {
147213
universe.BurnLeaf
214+
215+
// Done is an optional channel that will receive an error (or nil for
216+
// success) when the event has been processed and written to disk.
217+
// If nil, the event is processed asynchronously.
218+
Done chan error
148219
}
149220

150221
// eventSealed is a special method that is used to seal the interface.
151222
func (n *NewBurnEvent) eventSealed() {}
152223

224+
// SignalDone signals completion on the done channel if it exists. It sends
225+
// the error (or nil for success) and does not block.
226+
func (n *NewBurnEvent) SignalDone(err error) {
227+
if n.Done != nil {
228+
select {
229+
case n.Done <- err:
230+
default:
231+
}
232+
}
233+
}
234+
235+
// WaitForDone waits for the event to be processed by waiting on its done
236+
// channel. This is a helper method that can be used by callers who want
237+
// synchronous confirmation that their event has been persisted.
238+
func (n *NewBurnEvent) WaitForDone(ctx context.Context) error {
239+
return waitForDone(ctx, n.Done)
240+
}
241+
153242
// BlockHeight returns the block height of the update.
154243
func (n *NewBurnEvent) BlockHeight() uint32 {
155244
return n.BurnLeaf.BurnProof.BlockHeight
@@ -187,6 +276,10 @@ func (n *NewBurnEvent) Encode(w io.Writer) error {
187276
// SupplyUpdateEvent interface.
188277
var _ SupplyUpdateEvent = (*NewBurnEvent)(nil)
189278

279+
// A compile time assertion to ensure that NewBurnEvent implements the
280+
// SyncSupplyUpdateEvent interface.
281+
var _ SyncSupplyUpdateEvent = (*NewBurnEvent)(nil)
282+
190283
// NewMintEvent signals that a caller wishes to update the mint portion of the
191284
// supply tree with a new minted asset.
192285
type NewMintEvent struct {
@@ -198,6 +291,11 @@ type NewMintEvent struct {
198291

199292
// MintHeight is the height of the block that contains the mint.
200293
MintHeight uint32
294+
295+
// Done is an optional channel that will receive an error (or nil for
296+
// success) when the event has been processed and written to disk.
297+
// If nil, the event is processed asynchronously.
298+
Done chan error
201299
}
202300

203301
// BlockHeight returns the block height of the update.
@@ -208,6 +306,24 @@ func (n *NewMintEvent) BlockHeight() uint32 {
208306
// eventSealed is a special method that is used to seal the interface.
209307
func (n *NewMintEvent) eventSealed() {}
210308

309+
// SignalDone signals completion on the done channel if it exists. It sends
310+
// the error (or nil for success) and does not block.
311+
func (n *NewMintEvent) SignalDone(err error) {
312+
if n.Done != nil {
313+
select {
314+
case n.Done <- err:
315+
default:
316+
}
317+
}
318+
}
319+
320+
// WaitForDone waits for the event to be processed by waiting on its done
321+
// channel. This is a helper method that can be used by callers who want
322+
// synchronous confirmation that their event has been persisted.
323+
func (n *NewMintEvent) WaitForDone(ctx context.Context) error {
324+
return waitForDone(ctx, n.Done)
325+
}
326+
211327
// ScriptKey returns the script key that is used to identify the target
212328
// asset.
213329
func (n *NewMintEvent) ScriptKey() asset.SerializedKey {
@@ -279,6 +395,10 @@ func (n *NewMintEvent) Decode(r io.Reader) error {
279395
// SupplyUpdateEvent interface.
280396
var _ SupplyUpdateEvent = (*NewMintEvent)(nil)
281397

398+
// A compile time assertion to ensure that NewMintEvent implements the
399+
// SyncSupplyUpdateEvent interface.
400+
var _ SyncSupplyUpdateEvent = (*NewMintEvent)(nil)
401+
282402
// DefaultState is the idle state of the state machine. We start in this state
283403
// when there are no pending changes that need to committed.
284404
//

0 commit comments

Comments
 (0)