Skip to content

Commit b4e50a7

Browse files
nmarcetic0xF0D0
andauthored
[CP-608] Adds data-pipeline work into base. (#68)
* chore: apply pipeline patch Signed-off-by: 0x0f0d0 <goodbumsu@gmail.com> * Fix failing CI tests Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> --------- Signed-off-by: 0x0f0d0 <goodbumsu@gmail.com> Signed-off-by: Nikola Marcetic <n.marcetic86@gmail.com> Co-authored-by: 0x0f0d0 <goodbumsu@gmail.com>
1 parent 615721d commit b4e50a7

File tree

14 files changed

+484
-18
lines changed

14 files changed

+484
-18
lines changed

baseapp/abci.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -711,6 +711,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.VerifyVoteExtensionRequest) (r
711711
// must be used.
712712
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error) {
713713
var events []abci.Event
714+
var publishEvents sdk.PublishEvents
714715

715716
if err := app.checkHalt(req.Height, req.Time); err != nil {
716717
return nil, err
@@ -803,6 +804,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
803804
// continue
804805
}
805806

807+
publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)
806808
events = append(events, beginBlock.Events...)
807809

808810
// Reset the gas meter so that the AnteHandlers aren't required to
@@ -814,7 +816,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
814816
//
815817
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
816818
// vote extensions, so skip those.
817-
txResults, err := app.executeTxs(ctx, req.Txs)
819+
txResults, txEventSet, err := app.executeTxs(ctx, req.Txs)
818820
if err != nil {
819821
return nil, err
820822
}
@@ -823,6 +825,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
823825
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
824826
}
825827

828+
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))
829+
826830
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
827831
if err != nil {
828832
return nil, err
@@ -836,9 +840,23 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
836840
// continue
837841
}
838842

843+
publishEvents = append(publishEvents, app.finalizeBlockState.Context().PublishEventManager().Events()...)
844+
839845
events = append(events, endBlock.Events...)
840846
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
841847

848+
events, trueOrder := filterOutPublishEvents(events)
849+
app.flushData = PublishEventFlush{
850+
Height: header.Height,
851+
PrevAppHash: header.AppHash,
852+
BlockEvents: EventSet{
853+
AbciEvents: events,
854+
PublishEvents: publishEvents,
855+
TrueOrder: trueOrder,
856+
},
857+
TxEvents: txEventSet,
858+
}
859+
842860
return &abci.FinalizeBlockResponse{
843861
Events: events,
844862
TxResults: txResults,
@@ -847,11 +865,15 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
847865
}, nil
848866
}
849867

850-
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
868+
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, []EventSet, error) {
851869
txResults := make([]*abci.ExecTxResult, 0, len(txs))
870+
txEventSet := make([]EventSet, 0)
871+
852872
for txIdx, rawTx := range txs {
853873
var response *abci.ExecTxResult
854874

875+
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithPublishEventManager(sdk.NewPublishEventManager()))
876+
855877
if memTx, err := app.txDecoder(rawTx); err == nil {
856878
response = app.deliverTx(rawTx, memTx, txIdx)
857879
} else {
@@ -870,14 +892,36 @@ func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecT
870892
// check after every tx if we should abort
871893
select {
872894
case <-ctx.Done():
873-
return nil, ctx.Err()
895+
return nil, nil, ctx.Err()
874896
default:
875897
// continue
876898
}
877-
899+
filtered, order := filterOutPublishEvents(response.Events)
900+
response.Events = filtered
878901
txResults = append(txResults, response)
902+
txEventSet = append(txEventSet, EventSet{
903+
AbciEvents: response.Events,
904+
PublishEvents: app.finalizeBlockState.Context().PublishEventManager().Events(),
905+
TrueOrder: order,
906+
})
879907
}
880-
return txResults, nil
908+
return txResults, txEventSet, nil
909+
}
910+
911+
func filterOutPublishEvents(events []abci.Event) ([]abci.Event, []EventType) {
912+
var filteredEvents []abci.Event
913+
var trueOrder []EventType
914+
915+
for _, e := range events {
916+
if e.Type == sdk.PlaceholderEventType {
917+
trueOrder = append(trueOrder, EventTypePublish)
918+
continue
919+
}
920+
filteredEvents = append(filteredEvents, e)
921+
trueOrder = append(trueOrder, EventTypeAbci)
922+
}
923+
924+
return filteredEvents, trueOrder
881925
}
882926

883927
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
@@ -967,7 +1011,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
9671011
rms.SetCommitHeader(header)
9681012
}
9691013

970-
app.cms.Commit()
1014+
commitId := app.cms.Commit()
1015+
1016+
app.flushData.NewAppHash = commitId.Hash
1017+
app.PublishBlockEvents(app.flushData)
9711018

9721019
resp := &abci.CommitResponse{
9731020
RetainHeight: retainHeight,

baseapp/abci_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -564,7 +564,7 @@ func TestABCI_CheckTx(t *testing.T) {
564564
anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, counterKey)) }
565565
suite := NewBaseAppSuite(t, anteOpt)
566566

567-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey})
567+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, counterKey, false})
568568

569569
nTxs := int64(5)
570570
_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
@@ -618,7 +618,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {
618618
require.NoError(t, err)
619619

620620
deliverKey := []byte("deliver-key")
621-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
621+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})
622622

623623
nBlocks := 3
624624
txPerHeight := 5
@@ -668,10 +668,10 @@ func TestABCI_FinalizeBlock_MultiMsg(t *testing.T) {
668668
require.NoError(t, err)
669669

670670
deliverKey := []byte("deliver-key")
671-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
671+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})
672672

673673
deliverKey2 := []byte("deliver-key2")
674-
baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2})
674+
baseapptestutil.RegisterCounter2Server(suite.baseApp.MsgServiceRouter(), Counter2ServerImpl{t, capKey1, deliverKey2, false})
675675

676676
// run a multi-msg tx
677677
// with all msgs the same route

baseapp/baseapp.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ type BaseApp struct {
205205
EnableStreamer bool
206206
StreamEvents chan StreamEvents
207207

208+
EnablePublish bool
209+
PublishEvents chan PublishEventFlush
210+
flushData PublishEventFlush
211+
208212
traceFlightRecorder *metrics.TraceRecorder
209213
}
210214

@@ -227,6 +231,7 @@ func NewBaseApp(
227231
sigverifyTx: true,
228232
queryGasLimit: math.MaxUint64,
229233
StreamEvents: make(chan StreamEvents),
234+
PublishEvents: make(chan PublishEventFlush),
230235
}
231236

232237
for _, option := range options {

baseapp/baseapp_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ func TestAnteHandlerGasMeter(t *testing.T) {
219219
require.NoError(t, err)
220220

221221
deliverKey := []byte("deliver-key")
222-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
222+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})
223223

224224
tx := newTxCounter(t, suite.txConfig, 0, 0)
225225
txBytes, err := suite.txConfig.TxEncoder()(tx)
@@ -562,7 +562,7 @@ func TestBaseAppAnteHandler(t *testing.T) {
562562
suite := NewBaseAppSuite(t, anteOpt)
563563

564564
deliverKey := []byte("deliver-key")
565-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey})
565+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey, false})
566566

567567
_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
568568
ConsensusParams: &cmtproto.ConsensusParams{},
@@ -636,7 +636,7 @@ func TestBaseAppPostHandler(t *testing.T) {
636636
}
637637

638638
suite := NewBaseAppSuite(t, anteOpt)
639-
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo")})
639+
baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, []byte("foo"), false})
640640

641641
_, err := suite.baseApp.InitChain(&abci.InitChainRequest{
642642
ConsensusParams: &cmtproto.ConsensusParams{},

baseapp/publish_event.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package baseapp
2+
3+
import (
4+
abci "github.com/cometbft/cometbft/abci/types"
5+
types "github.com/cosmos/cosmos-sdk/types"
6+
)
7+
8+
type EventType byte
9+
10+
const (
11+
EventTypeAbci EventType = iota
12+
EventTypePublish
13+
)
14+
15+
type EventSet struct {
16+
AbciEvents []abci.Event
17+
PublishEvents types.PublishEvents
18+
TrueOrder []EventType
19+
}
20+
21+
type PublishEventFlush struct {
22+
Height int64
23+
PrevAppHash []byte
24+
NewAppHash []byte
25+
BlockEvents EventSet
26+
TxEvents []EventSet
27+
}
28+
29+
func (app *BaseApp) PublishBlockEvents(flush PublishEventFlush) {
30+
if app.EnablePublish {
31+
app.PublishEvents <- flush
32+
}
33+
}

0 commit comments

Comments
 (0)