@@ -13,6 +13,7 @@ import (
1313 "github.com/lightninglabs/taproot-assets/fn"
1414 "github.com/lightninglabs/taproot-assets/mssmt"
1515 "github.com/lightninglabs/taproot-assets/tapgarden"
16+ "github.com/lightninglabs/taproot-assets/universe"
1617 "github.com/lightninglabs/taproot-assets/universe/supplycommit"
1718 "github.com/lightningnetwork/lnd/msgmux"
1819 "github.com/lightningnetwork/lnd/protofsm"
@@ -42,6 +43,10 @@ type IssuanceSubscriptions interface {
4243 // issuance events.
4344 RegisterSubscriber (receiver * fn.EventReceiver [fn.Event ],
4445 deliverExisting bool , _ bool ) error
46+
47+ // RemoveSubscriber removes the given subscriber and also stops it from
48+ // processing events.
49+ RemoveSubscriber (subscriber * fn.EventReceiver [fn.Event ]) error
4550}
4651
4752// ManagerCfg is the configuration for the
@@ -216,6 +221,14 @@ func (m *Manager) Start() error {
216221 "state machines: %v" , err )
217222 return
218223 }
224+
225+ // Start a goroutine to handle universe syncer issuance events.
226+ m .ContextGuard .Goroutine (
227+ m .MonitorUniSyncEvents , func (err error ) {
228+ log .Errorf ("MonitorUniIssuanceSyncEvents: %v" ,
229+ err )
230+ },
231+ )
219232 })
220233 if startErr != nil {
221234 return fmt .Errorf ("unable to start manager: %w" , startErr )
@@ -224,6 +237,121 @@ func (m *Manager) Start() error {
224237 return nil
225238}
226239
240+ // handleUniSyncEvent handles a single universe syncer event. If the event is an
241+ // issuance event for an asset group that supports supply commitments, it will
242+ // ensure that a state machine for the asset group exists, creating and
243+ // starting it if necessary.
244+ func (m * Manager ) handleUniSyncEvent (event fn.Event ) error {
245+ // Disregard event if it is not of type
246+ // universe.SyncDiffEvent.
247+ syncDiffEvent , ok := event .(* universe.SyncDiffEvent )
248+ if ! ok {
249+ return nil
250+ }
251+
252+ // If the sync diff is not a new issuance, we disregard it.
253+ universeID := syncDiffEvent .SyncDiff .NewUniverseRoot .ID
254+ if universeID .ProofType != universe .ProofTypeIssuance {
255+ return nil
256+ }
257+
258+ // If the asset is not a group key asset, we
259+ // disregard it.
260+ if universeID .GroupKey == nil {
261+ return nil
262+ }
263+
264+ // If there are no new leaf proofs, we disregard the sync event.
265+ if len (syncDiffEvent .SyncDiff .NewLeafProofs ) == 0 {
266+ return nil
267+ }
268+
269+ // Get genesis asset ID from the first synced leaf and formulate an
270+ // asset specifier.
271+ //
272+ // TODO(ffranr): Revisit this. We select any asset ID to aid in metdata
273+ // retrieval, but we should be able to do this with just the group key.
274+ // However, QueryAssetGroupByGroupKey currently fails for the asset
275+ // group.
276+ assetID := syncDiffEvent .SyncDiff .NewLeafProofs [0 ].Genesis .ID ()
277+
278+ assetSpec := asset .NewSpecifierOptionalGroupPubKey (
279+ assetID , universeID .GroupKey ,
280+ )
281+
282+ // Check that the asset group supports supply
283+ // commitments.
284+ ctx , cancelCtx := m .WithCtxQuitNoTimeout ()
285+ isSupported , err := supplycommit .IsSupplySupported (
286+ ctx , m .cfg .AssetLookup , assetSpec , false ,
287+ )
288+ if err != nil {
289+ return fmt .Errorf ("failed to check supply support: %w" , err )
290+ }
291+ cancelCtx ()
292+
293+ if ! isSupported {
294+ return nil
295+ }
296+
297+ // Fetch the state machine for the asset group, creating and starting it
298+ // if it doesn't exist.
299+ log .Debugf ("Ensure supply verifier state machine for asset " +
300+ "group due to universe syncer issuance event (asset=%s)" ,
301+ assetSpec .String ())
302+ _ , err = m .fetchStateMachine (assetSpec )
303+ if err != nil {
304+ return fmt .Errorf ("unable to get or create state machine: %w" ,
305+ err )
306+ }
307+
308+ return nil
309+ }
310+
311+ // MonitorUniSyncEvents registers an event receiver to receive universe
312+ // syncer issuance events.
313+ //
314+ // NOTE: This method must be run as a goroutine.
315+ func (m * Manager ) MonitorUniSyncEvents () error {
316+ // Register an event receiver to receive universe syncer events. These
317+ // events relate to asset issuance proofs.
318+ eventReceiver := fn.NewEventReceiver [fn.Event ](
319+ fn .DefaultQueueSize ,
320+ )
321+ err := m .cfg .IssuanceSubscriptions .RegisterSubscriber (
322+ eventReceiver , false , true ,
323+ )
324+ if err != nil {
325+ return fmt .Errorf ("unable to register universe syncer " +
326+ "issuance event subscriber: %w" , err )
327+ }
328+
329+ // Ensure we remove the subscriber when we exit.
330+ defer func () {
331+ err := m .cfg .IssuanceSubscriptions .RemoveSubscriber (
332+ eventReceiver ,
333+ )
334+ if err != nil {
335+ log .Errorf ("unable to remove universe syncer " +
336+ "issuance event subscriber: %v" , err )
337+ }
338+ }()
339+
340+ for {
341+ select {
342+ case <- m .Quit :
343+ return nil
344+
345+ case event := <- eventReceiver .NewItemCreated .ChanOut ():
346+ err := m .handleUniSyncEvent (event )
347+ if err != nil {
348+ return fmt .Errorf ("unable to handle " +
349+ "universe issuance sync event: %w" , err )
350+ }
351+ }
352+ }
353+ }
354+
227355// Stop stops the multi state machine manager, which in turn stops all asset
228356// group key specific supply verifier state machines.
229357func (m * Manager ) Stop () error {
0 commit comments