Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 66 additions & 9 deletions chainnotifier_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ type ChainNotifierClient interface {
chan error, error)

RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error)
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
optFuncs ...NotifierOption) (chan *chainntnfs.SpendDetail,
chan error, error)
}

type chainNotifierClient struct {
Expand Down Expand Up @@ -111,8 +112,18 @@ func (s *chainNotifierClient) RawClientWithMacAuth(
}

func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
outpoint *wire.OutPoint, pkScript []byte, heightHint int32) (
chan *chainntnfs.SpendDetail, chan error, error) {
outpoint *wire.OutPoint, pkScript []byte, heightHint int32,
optFuncs ...NotifierOption) (chan *chainntnfs.SpendDetail, chan error,
error) {

opts := DefaultNotifierOptions()
for _, optFunc := range optFuncs {
optFunc(opts)
}
if opts.IncludeBlock {
return nil, nil, fmt.Errorf("option IncludeBlock is not " +
"supported by RegisterSpendNtfn")
}

var rpcOutpoint *chainrpc.Outpoint
if outpoint != nil {
Expand Down Expand Up @@ -148,7 +159,7 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
if err != nil {
return err
}
spendChan <- &chainntnfs.SpendDetail{
spend := &chainntnfs.SpendDetail{
SpentOutPoint: &wire.OutPoint{
Hash: *outpointHash,
Index: d.SpendingOutpoint.Index,
Expand All @@ -159,7 +170,24 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
SpendingHeight: int32(d.SpendingHeight),
}

return nil
select {
case spendChan <- spend:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

processReorg := func() {
if opts.ReOrgChan == nil {
return
}

select {
case opts.ReOrgChan <- struct{}{}:
case <-ctx.Done():
return
}
}

s.wg.Add(1)
Expand All @@ -172,12 +200,35 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
return
}

c, ok := spendEvent.Event.(*chainrpc.SpendEvent_Spend)
if ok {
switch c := spendEvent.Event.(type) {
case *chainrpc.SpendEvent_Spend:
err := processSpendDetail(c.Spend)
if err != nil {
errChan <- err

return
}

// If we're running in re-org aware mode, then
// we don't return here, since we might want to
// be informed about the new block we got
// confirmed in after a re-org.
if opts.ReOrgChan == nil {
return
}

case *chainrpc.SpendEvent_Reorg:
processReorg()

// Nil event, should never happen.
case nil:
errChan <- fmt.Errorf("spend event empty")
return

// Unexpected type.
default:
errChan <- fmt.Errorf("spend event has " +
"unexpected type")
return
}
}
Expand Down Expand Up @@ -256,14 +307,20 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
return
}

confChan <- &chainntnfs.TxConfirmation{
conf := &chainntnfs.TxConfirmation{
BlockHeight: c.Conf.BlockHeight,
BlockHash: blockHash,
Tx: tx,
TxIndex: c.Conf.TxIndex,
Block: block,
}

select {
case confChan <- conf:
case <-ctx.Done():
return
}

// If we're running in re-org aware mode, then
// we don't return here, since we might want to
// be informed about the new block we got
Expand Down