Skip to content
This repository was archived by the owner on Sep 6, 2022. It is now read-only.

Commit 461ca5e

Browse files
Implement computeplan events (#98)
* Implement computeplan events * Initialize event struct * Add Cancel situation and renaming * Rename after review
1 parent 964445f commit 461ca5e

File tree

10 files changed

+80
-31
lines changed

10 files changed

+80
-31
lines changed

chaincode/compute_plan.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ func cancelComputePlan(db *LedgerDB, args []string) (resp outputComputePlan, err
296296
return outputComputePlan{}, err
297297
}
298298

299+
db.AddComputePlanEvent(inp.Key, computeplan.State.Status)
299300
resp.Fill(inp.Key, computeplan)
300301
return resp, nil
301302
}
@@ -334,10 +335,10 @@ func (cp *ComputePlan) SaveState(db *LedgerDB) error {
334335
return db.Put(cp.StateKey, cp.State)
335336
}
336337

337-
// CheckNewTupleStatus check the tuple status (from an updated tuple or a new one)
338+
// UpdateStatus check the tuple status (from an updated tuple or a new one)
338339
// and, if required, it updates the compute plan' status and/or its doneCount.
339340
// It returns true if there is any change to the compute plan, false otherwise.
340-
func (cp *ComputePlan) CheckNewTupleStatus(tupleStatus string) bool {
341+
func (cp *ComputePlan) UpdateStatus(tupleStatus string) bool {
341342
switch cp.State.Status {
342343
case StatusFailed, StatusCanceled:
343344
case StatusDone:
@@ -388,7 +389,7 @@ func (cp *ComputePlan) AddTuple(tupleType AssetType, key, status string) {
388389
cp.TesttupleKeys = append(cp.TesttupleKeys, key)
389390
}
390391
cp.State.TupleCount++
391-
cp.CheckNewTupleStatus(status)
392+
cp.UpdateStatus(status)
392393
}
393394

394395
// UpdateComputePlanState retreive the compute plan if the ID is not empty,
@@ -401,7 +402,8 @@ func UpdateComputePlanState(db *LedgerDB, ComputePlanID, tupleStatus, tupleKey s
401402
if err != nil {
402403
return err
403404
}
404-
if cp.CheckNewTupleStatus(tupleStatus) {
405+
if cp.UpdateStatus(tupleStatus) {
406+
db.AddComputePlanEvent(ComputePlanID, cp.State.Status)
405407
return cp.SaveState(db)
406408
}
407409
return nil

chaincode/compute_plan_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ func TestModelCompositionComputePlanWorkflow(t *testing.T) {
164164

165165
out, err := createComputePlanInternal(db, modelCompositionComputePlan, tag)
166166
assert.NoError(t, err)
167-
assert.NotNil(t, db.tuplesEvent)
168-
assert.Len(t, db.tuplesEvent.CompositeTraintuples, 2)
167+
assert.NotNil(t, db.event)
168+
assert.Len(t, db.event.CompositeTraintuples, 2)
169169

170170
// ensure the returned ranks are correct
171171
validateTupleRank(t, db, 0, out.CompositeTraintupleKeys[0], CompositeTraintupleType)
@@ -179,7 +179,7 @@ func TestModelCompositionComputePlanWorkflow(t *testing.T) {
179179
_, err = logStartCompositeTrain(db, assetToArgs(inputKey{out.CompositeTraintupleKeys[1]}))
180180
assert.NoError(t, err)
181181

182-
db.tuplesEvent = &TuplesEvent{}
182+
db.event = &Event{}
183183
inpLogCompo := inputLogSuccessCompositeTrain{}
184184
inpLogCompo.fillDefaults()
185185
inpLogCompo.Key = out.CompositeTraintupleKeys[0]
@@ -189,12 +189,12 @@ func TestModelCompositionComputePlanWorkflow(t *testing.T) {
189189
inpLogCompo.Key = out.CompositeTraintupleKeys[1]
190190
_, err = logSuccessCompositeTrain(db, assetToArgs(inpLogCompo))
191191
assert.NoError(t, err)
192-
assert.Len(t, db.tuplesEvent.Testtuples, 2)
193-
for _, test := range db.tuplesEvent.Testtuples {
192+
assert.Len(t, db.event.Testtuples, 2)
193+
for _, test := range db.event.Testtuples {
194194
assert.Equalf(t, StatusTodo, test.Status, "blame it on %+v", test)
195195
}
196-
require.Len(t, db.tuplesEvent.Aggregatetuples, 1)
197-
assert.Equal(t, StatusTodo, db.tuplesEvent.Aggregatetuples[0].Status)
196+
require.Len(t, db.event.Aggregatetuples, 1)
197+
assert.Equal(t, StatusTodo, db.event.Aggregatetuples[0].Status)
198198

199199
_, err = logStartAggregate(db, assetToArgs(inputKey{out.AggregatetupleKeys[0]}))
200200
assert.NoError(t, err)
@@ -211,16 +211,16 @@ func TestModelCompositionComputePlanWorkflow(t *testing.T) {
211211
_, err = logStartCompositeTrain(db, assetToArgs(inputKey{out.CompositeTraintupleKeys[3]}))
212212
assert.NoError(t, err)
213213

214-
db.tuplesEvent = &TuplesEvent{}
214+
db.event = &Event{}
215215
inpLogCompo.Key = out.CompositeTraintupleKeys[2]
216216
_, err = logSuccessCompositeTrain(db, assetToArgs(inpLogCompo))
217217
assert.NoError(t, err)
218218

219219
inpLogCompo.Key = out.CompositeTraintupleKeys[3]
220220
_, err = logSuccessCompositeTrain(db, assetToArgs(inpLogCompo))
221221
assert.NoError(t, err)
222-
assert.Len(t, db.tuplesEvent.Testtuples, 2)
223-
for _, test := range db.tuplesEvent.Testtuples {
222+
assert.Len(t, db.event.Testtuples, 2)
223+
for _, test := range db.event.Testtuples {
224224
assert.Equalf(t, StatusTodo, test.Status, "blame it on %+v", test)
225225
}
226226
}
@@ -487,8 +487,8 @@ func TestCancelComputePlan(t *testing.T) {
487487

488488
out, err := createComputePlanInternal(db, modelCompositionComputePlan, tag)
489489
assert.NoError(t, err)
490-
assert.NotNil(t, db.tuplesEvent)
491-
assert.Len(t, db.tuplesEvent.CompositeTraintuples, 2)
490+
assert.NotNil(t, db.event)
491+
assert.Len(t, db.event.CompositeTraintuples, 2)
492492

493493
_, err = cancelComputePlan(db, assetToArgs(inputKey{Key: out.ComputePlanID}))
494494
assert.NoError(t, err)

chaincode/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ require (
1818
github.com/hashicorp/go-version v1.2.0 // indirect
1919
github.com/hyperledger/fabric v1.4.1
2020
github.com/hyperledger/fabric-amcl v0.0.0-20181230093703-5ccba6eab8d6 // indirect
21+
github.com/karrick/godirwalk v1.15.3 // indirect
2122
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
2223
github.com/kr/pretty v0.1.0 // indirect
2324
github.com/leodido/go-urn v1.1.0 // indirect
@@ -35,6 +36,8 @@ require (
3536
github.com/stretchr/objx v0.2.0 // indirect
3637
github.com/stretchr/testify v1.3.0
3738
github.com/sykesm/zap-logfmt v0.0.2 // indirect
39+
github.com/uudashr/gopkgs v2.0.1+incompatible // indirect
40+
github.com/uudashr/gopkgs/v2 v2.1.2 // indirect
3841
go.uber.org/atomic v1.4.0 // indirect
3942
go.uber.org/zap v1.10.0 // indirect
4043
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f // indirect

chaincode/go.sum

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ github.com/hyperledger/fabric-amcl v0.0.0-20181230093703-5ccba6eab8d6 h1:URjjUy3
8484
github.com/hyperledger/fabric-amcl v0.0.0-20181230093703-5ccba6eab8d6/go.mod h1:X+DIyUsaTmalOpmpQfIvFZjKHQedrURQ5t4YqquX7lE=
8585
github.com/ijc/Gotty v0.0.0-20170406111628-a8b993ba6abd h1:anPrsicrIi2ColgWTVPk+TrN42hJIWlfPHSBP9S0ZkM=
8686
github.com/ijc/Gotty v0.0.0-20170406111628-a8b993ba6abd/go.mod h1:3LVOLeyx9XVvwPgrt2be44XgSqndprz1G18rSk8KD84=
87+
github.com/karrick/godirwalk v1.12.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
88+
github.com/karrick/godirwalk v1.15.3 h1:0a2pXOgtB16CqIqXTiT7+K9L73f74n/aNQUnH6Ortew=
89+
github.com/karrick/godirwalk v1.15.3/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
8790
github.com/kisielk/errcheck v1.1.0 h1:ZqfnKyx9KGpRcW04j5nnPDgRgoXUeLh2YFBeFzphcA0=
8891
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
8992
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
@@ -163,6 +166,10 @@ github.com/sykesm/zap-logfmt v0.0.2 h1:czSzn+PIXCOAP/4NAIHTTziIKB8201PzoDkKTn+VR
163166
github.com/sykesm/zap-logfmt v0.0.2/go.mod h1:TerDJT124HaO8UTpZ2wJCipJRAKQ9XONM1mzUabIh6M=
164167
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8 h1:3SVOIvH7Ae1KRYyQWRjXWJEA9sS/c/pjvH++55Gr648=
165168
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
169+
github.com/uudashr/gopkgs v2.0.1+incompatible h1:SuNs9p/XbGcQezR7SguZrZzxqCQozxtd/N8UKBWbWjk=
170+
github.com/uudashr/gopkgs v2.0.1+incompatible/go.mod h1:MtCdKVJkxW7hNKWXPNWfpaeEp8+Ml3Q8myb4yWhn2Hg=
171+
github.com/uudashr/gopkgs/v2 v2.1.2 h1:A0+QH6wqNRHORJnxmqfeuBEsK4nYQ7pgcOHhqpqcrpo=
172+
github.com/uudashr/gopkgs/v2 v2.1.2/go.mod h1:O9VKOuPWrUpVhaxcg7N3QiTrlDhgJb/84Y7b3qaX1rI=
166173
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
167174
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
168175
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=

chaincode/ledger_db.go

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type State struct {
3030
// LedgerDB to access the chaincode database during the lifetime of a SmartContract
3131
type LedgerDB struct {
3232
cc shim.ChaincodeStubInterface
33-
tuplesEvent *TuplesEvent
33+
event *Event
3434
transactionState State
3535
mutex *sync.RWMutex
3636
}
@@ -443,17 +443,17 @@ func (db *LedgerDB) GetNode(key string) (Node, error) {
443443
// High-level functions for events
444444
// ----------------------------------------------
445445

446-
// SendTuplesEvent sends an event with updated tuples if there is any
446+
// SendEvent sends an event with updated tuples if there is any
447447
// Only one event can be sent per transaction
448-
func (db *LedgerDB) SendTuplesEvent() error {
449-
if db.tuplesEvent == nil {
448+
func (db *LedgerDB) SendEvent() error {
449+
if db.event == nil {
450450
return nil
451451
}
452-
payload, err := json.Marshal(*(db.tuplesEvent))
452+
payload, err := json.Marshal(*(db.event))
453453
if err != nil {
454454
return err
455455
}
456-
err = db.cc.SetEvent("tuples-updated", payload)
456+
err = db.cc.SetEvent("chaincode-updates", payload)
457457
if err != nil {
458458
return err
459459
}
@@ -472,8 +472,8 @@ func (db *LedgerDB) AddTupleEvent(tupleKey string) error {
472472
if genericTuple.Status != StatusTodo {
473473
return nil
474474
}
475-
if db.tuplesEvent == nil {
476-
db.tuplesEvent = &TuplesEvent{}
475+
if db.event == nil {
476+
db.event = &Event{}
477477
}
478478
switch genericTuple.AssetType {
479479
case TraintupleType:
@@ -483,31 +483,52 @@ func (db *LedgerDB) AddTupleEvent(tupleKey string) error {
483483
}
484484
out := outputTraintuple{}
485485
out.Fill(db, tuple, tupleKey)
486-
db.tuplesEvent.Traintuples = append(db.tuplesEvent.Traintuples, out)
486+
db.event.Traintuples = append(db.event.Traintuples, out)
487487
case CompositeTraintupleType:
488488
tuple, err := db.GetCompositeTraintuple(tupleKey)
489489
if err != nil {
490490
return err
491491
}
492492
out := outputCompositeTraintuple{}
493493
out.Fill(db, tuple, tupleKey)
494-
db.tuplesEvent.CompositeTraintuples = append(db.tuplesEvent.CompositeTraintuples, out)
494+
db.event.CompositeTraintuples = append(db.event.CompositeTraintuples, out)
495495
case AggregatetupleType:
496496
tuple, err := db.GetAggregatetuple(tupleKey)
497497
if err != nil {
498498
return err
499499
}
500500
out := outputAggregatetuple{}
501501
out.Fill(db, tuple, tupleKey)
502-
db.tuplesEvent.Aggregatetuples = append(db.tuplesEvent.Aggregatetuples, out)
502+
db.event.Aggregatetuples = append(db.event.Aggregatetuples, out)
503503
case TesttupleType:
504504
tuple, err := db.GetTesttuple(tupleKey)
505505
if err != nil {
506506
return err
507507
}
508508
out := outputTesttuple{}
509509
out.Fill(db, tupleKey, tuple)
510-
db.tuplesEvent.Testtuples = append(db.tuplesEvent.Testtuples, out)
510+
db.event.Testtuples = append(db.event.Testtuples, out)
511511
}
512512
return nil
513513
}
514+
515+
// AddComputePlanEvent add the compute plan matching the ID to the event struct
516+
func (db *LedgerDB) AddComputePlanEvent(ComputePlanID, status string) error {
517+
if !stringInSlice(status, []string{StatusCanceled, StatusFailed, StatusDone}) {
518+
return nil
519+
}
520+
if db.event == nil {
521+
db.event = &Event{}
522+
}
523+
cp := eventComputePlan{
524+
ComputePlanID: ComputePlanID,
525+
Status: status,
526+
}
527+
algokeys, err := db.GetIndexKeys("algo~computeplanid~key", []string{"algo", ComputePlanID})
528+
if err != nil {
529+
return err
530+
}
531+
cp.AlgoKeys = algokeys
532+
db.event.ComputePlans = append(db.event.ComputePlans, cp)
533+
return nil
534+
}

chaincode/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func (t *SubstraChaincode) Invoke(stub shim.ChaincodeStubInterface) peer.Respons
189189
}
190190
// Send event if there is any. It's done in one batch since we can only send
191191
// one event per call
192-
err = db.SendTuplesEvent()
192+
err = db.SendEvent()
193193
if err != nil {
194194
return formatErrorResponse(errors.Internal("could not send event: %s", err.Error()))
195195
}

chaincode/output.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,19 @@ type outputModel struct {
283283
Testtuple outputTesttuple `json:"testtuple"`
284284
}
285285

286-
// TuplesEvent is the collection of tuples sent in an event
287-
type TuplesEvent struct {
286+
// Event is the collection of tuples sent in an event
287+
type Event struct {
288288
Testtuples []outputTesttuple `json:"testtuple"`
289289
Traintuples []outputTraintuple `json:"traintuple"`
290290
CompositeTraintuples []outputCompositeTraintuple `json:"compositeTraintuple"`
291291
Aggregatetuples []outputAggregatetuple `json:"aggregatetuple"`
292+
ComputePlans []eventComputePlan `json:"computePlan"`
293+
}
294+
295+
type eventComputePlan struct {
296+
ComputePlanID string `json:"computePlanID"`
297+
Status string `json:"status"`
298+
AlgoKeys []string `json:"algoKeys"`
292299
}
293300

294301
type outputComputePlan struct {

chaincode/traintuple.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ func (traintuple *Traintuple) Save(db *LedgerDB, traintupleKey string) error {
193193
if err := db.CreateIndex("computePlan~computeplanid~worker~rank~key", []string{"computePlan", traintuple.ComputePlanID, traintuple.Dataset.Worker, strconv.Itoa(traintuple.Rank), traintupleKey}); err != nil {
194194
return err
195195
}
196+
if err := db.CreateIndex("algo~computeplanid~key", []string{"algo", traintuple.ComputePlanID, traintuple.AlgoKey}); err != nil {
197+
return err
198+
}
196199
}
197200
if traintuple.Tag != "" {
198201
err := db.CreateIndex("traintuple~tag~key", []string{"traintuple", traintuple.Tag, traintupleKey})

chaincode/traintuple_composite.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,9 @@ func (traintuple *CompositeTraintuple) Save(db *LedgerDB, traintupleKey string)
240240
if err := db.CreateIndex("computePlan~computeplanid~worker~rank~key", []string{"computePlan", traintuple.ComputePlanID, traintuple.Dataset.Worker, strconv.Itoa(traintuple.Rank), traintupleKey}); err != nil {
241241
return err
242242
}
243+
if err := db.CreateIndex("algo~computeplanid~key", []string{"algo", traintuple.ComputePlanID, traintuple.AlgoKey}); err != nil {
244+
return err
245+
}
243246
}
244247
if traintuple.Tag != "" {
245248
err := db.CreateIndex("compositeTraintuple~tag~key", []string{"compositeTraintuple", traintuple.Tag, traintupleKey})

chaincode/tuple_aggregate.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@ func (tuple *Aggregatetuple) Save(db *LedgerDB, aggregatetupleKey string) error
203203
if err := db.CreateIndex("computePlan~computeplanid~worker~rank~key", []string{"computePlan", tuple.ComputePlanID, tuple.Worker, strconv.Itoa(tuple.Rank), aggregatetupleKey}); err != nil {
204204
return err
205205
}
206+
if err := db.CreateIndex("algo~computeplanid~key", []string{"algo", tuple.ComputePlanID, tuple.AlgoKey}); err != nil {
207+
return err
208+
}
206209
}
207210
if tuple.Tag != "" {
208211
err := db.CreateIndex("aggregatetuple~tag~key", []string{"aggregatetuple", tuple.Tag, aggregatetupleKey})

0 commit comments

Comments
 (0)