Skip to content

Commit d8c83c6

Browse files
committed
processArchive: set process status to RESULTS when oracle transaction is received
Signed-off-by: p4u <[email protected]>
1 parent f41322b commit d8c83c6

File tree

8 files changed

+109
-22
lines changed

8 files changed

+109
-22
lines changed

oracle/oracle.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,7 @@ func (o *Oracle) OnComputeResults(results *indexertypes.Results, proc *indexerty
170170
}
171171
log.Infof("oracle transaction sent, hash:%x", res.Hash)
172172
}
173+
174+
// OnOracleResults does nothing. Required for implementing the scrutinizer EventListener interface
175+
func (o *Oracle) OnOracleResults(procResults *models.ProcessResult, pid []byte, height uint32) {
176+
}

vochain/censusdownloader/censusdownloader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,6 @@ func (c *CensusDownloader) OnProcessStatusChange(pid []byte,
8989
}
9090

9191
func (c *CensusDownloader) OnProcessResults(pid []byte,
92-
results []*models.QuestionResult, txindex int32) error {
92+
results *models.ProcessResult, txindex int32) error {
9393
return nil
9494
}

vochain/keykeeper/keykeeper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (k *KeyKeeper) OnRevealKeys(pid []byte, priv, rev string, txindex int32) {
292292

293293
// OnProcessResults does nothing
294294
func (k *KeyKeeper) OnProcessResults(pid []byte,
295-
results []*models.QuestionResult, txindex int32) error {
295+
results *models.ProcessResult, txindex int32) error {
296296
// do nothing
297297
return nil
298298
}

vochain/process.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ func (v *State) SetProcessResults(pid []byte, result *models.ProcessResult, comm
222222
}
223223
// Call event listeners
224224
for _, l := range v.eventListeners {
225-
if err := l.OnProcessResults(process.ProcessId, result.Votes, v.TxCounter()); err != nil {
225+
if err := l.OnProcessResults(process.ProcessId, result, v.TxCounter()); err != nil {
226226
log.Warnf("onProcessResults callback error: %v", err)
227227
}
228228
}

vochain/processarchive/processarchive.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"go.vocdoni.io/dvote/types"
1414
"go.vocdoni.io/dvote/vochain/scrutinizer"
1515
"go.vocdoni.io/dvote/vochain/scrutinizer/indexertypes"
16+
"go.vocdoni.io/proto/build/go/models"
1617
)
1718

1819
type ProcessArchive struct {
@@ -58,6 +59,21 @@ func (js *jsonStorage) AddProcess(p *Process) error {
5859
return os.WriteFile(filepath.Join(js.datadir, fmt.Sprintf("%x", p.ProcessInfo.ID)), data, 0o644)
5960
}
6061

62+
// GetProcess retreives a process from the js storage
63+
func (js *jsonStorage) GetProcess(pid []byte) (*Process, error) {
64+
if len(pid) != types.ProcessIDsize {
65+
return nil, fmt.Errorf("process not valid")
66+
}
67+
js.lock.Lock()
68+
defer js.lock.Unlock()
69+
data, err := os.ReadFile(filepath.Join(js.datadir, fmt.Sprintf("%x", pid)))
70+
if err != nil {
71+
return nil, err
72+
}
73+
p := &Process{}
74+
return p, json.Unmarshal(data, p)
75+
}
76+
6177
// ProcessExist returns true if a process already existin in the storage
6278
func (js *jsonStorage) ProcessExist(pid []byte) (bool, error) {
6379
js.lock.Lock()
@@ -157,14 +173,27 @@ func (pa *ProcessArchive) ProcessScan(fromBlock int) error {
157173
return nil
158174
}
159175

160-
// OnComputeResults implements the indexer event callback
176+
// OnComputeResults implements the indexer event callback.
177+
// On this event the results are set always and the process info only if it
178+
// does not exist yet in the json storage.
161179
func (pa *ProcessArchive) OnComputeResults(results *indexertypes.Results,
162180
proc *indexertypes.Process, height uint32) {
163-
if err := pa.storage.AddProcess(&Process{ProcessInfo: proc, Results: results}); err != nil {
181+
// Get the process (if exist)
182+
jsProc, err := pa.storage.GetProcess(results.ProcessID)
183+
if err != nil {
184+
if os.IsNotExist(err) { // if it does not exist yet, we create it
185+
jsProc = &Process{ProcessInfo: proc, Results: results}
186+
} else {
187+
log.Errorf("cannot get json store process: %v", err)
188+
return
189+
}
190+
}
191+
jsProc.Results = results
192+
if err := pa.storage.AddProcess(jsProc); err != nil {
164193
log.Errorf("cannot add json process: %v", err)
165194
return
166195
}
167-
log.Infof("stored json process %x", proc.ID)
196+
log.Infof("stored json process %x for compute results event", proc.ID)
168197

169198
// send publish signal
170199
log.Debugf("sending archive publish signal for height %d", height)
@@ -174,6 +203,44 @@ func (pa *ProcessArchive) OnComputeResults(results *indexertypes.Results,
174203
}
175204
}
176205

206+
// OnOracleResults implements the indexer event callback.
207+
// On this event the process status is set to Results.
208+
func (pa *ProcessArchive) OnOracleResults(oracleResults *models.ProcessResult, pid []byte, height uint32) {
209+
jsProc, err := pa.storage.GetProcess(pid)
210+
if err != nil {
211+
if os.IsNotExist(err) { // if it does not exist yet, we create it
212+
proc, err := pa.indexer.ProcessInfo(pid)
213+
if err != nil {
214+
log.Errorf("cannot get process info %x from indexer: %v", pid, err)
215+
return
216+
}
217+
jsProc = &Process{ProcessInfo: proc, Results: nil}
218+
} else {
219+
log.Errorf("cannot get json store process: %v", err)
220+
return
221+
}
222+
}
223+
// Ensure the status is set to RESULTS since OnOracleResults event is called on setProcessResultsTx
224+
jsProc.ProcessInfo.Status = int32(models.ProcessStatus_RESULTS)
225+
jsProc.ProcessInfo.FinalResults = true
226+
// TODO: add signatures from oracles
227+
//jsProc.Results.Signatures = append(jsProc.results.Signatures, oracleResults.Signature)
228+
229+
// Store the process
230+
if err := pa.storage.AddProcess(jsProc); err != nil {
231+
log.Errorf("cannot add json process: %v", err)
232+
return
233+
}
234+
log.Infof("stored json process %x for oracle results transaction event", pid)
235+
236+
// Send publish signal
237+
log.Debugf("sending archive publish signal for height %d", height)
238+
select {
239+
case pa.publish <- true:
240+
default: // do nothing
241+
}
242+
}
243+
177244
// Close closes the process archive
178245
func (pa *ProcessArchive) Close() {
179246
pa.close <- true

vochain/scrutinizer/scrutinizer.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ const (
3434
// events of the tally of a process.
3535
type EventListener interface {
3636
OnComputeResults(results *indexertypes.Results, process *indexertypes.Process, height uint32)
37+
OnOracleResults(oracleResults *models.ProcessResult, pid []byte, height uint32)
3738
}
3839

3940
// AddEventListener adds a new event listener, to receive method calls on block
4041
// events as documented in EventListener.
4142
func (s *Scrutinizer) AddEventListener(l EventListener) {
42-
s.eventListeners = append(s.eventListeners, l)
43+
s.eventOnResults = append(s.eventOnResults, l)
4344
}
4445

4546
// Scrutinizer is the component which makes the accounting of the voting processes
@@ -60,8 +61,8 @@ type Scrutinizer struct {
6061
newTxPool []*indexertypes.TxReference
6162
// list of live processes (those on which the votes will be computed on arrival)
6263
liveResultsProcs sync.Map
63-
// eventListeners is the list of external callbacks that will be executed by the scrutinizer
64-
eventListeners []EventListener
64+
// eventOnResults is the list of external callbacks that will be executed by the scrutinizer
65+
eventOnResults []EventListener
6566
db *badgerhold.Store
6667
// envelopeHeightCache and countTotalEnvelopes are in memory counters that helps reducing the
6768
// access time when GenEnvelopeHeight() is called.
@@ -481,9 +482,14 @@ func (s *Scrutinizer) OnRevealKeys(pid []byte, priv, reveal string, txIndex int3
481482
s.updateProcessPool = append(s.updateProcessPool, pid)
482483
}
483484

484-
// OnProcessResults verifies the results for a process and appends it to the updateProcessPool
485-
func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionResult,
485+
// OnProcessResults verifies the results for a process and appends it to the updateProcessPool
486+
func (s *Scrutinizer) OnProcessResults(pid []byte, results *models.ProcessResult,
486487
txIndex int32) error {
488+
// Execute callbacks
489+
for _, l := range s.eventOnResults {
490+
go l.OnOracleResults(results, pid, s.App.Height())
491+
}
492+
487493
// We don't execute any action if the blockchain is being syncronized
488494
if s.App.IsSynchronizing() {
489495
return nil
@@ -498,6 +504,10 @@ func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionRes
498504
// This code must be run async in order to not delay the consensus. The results retreival
499505
// could require some time.
500506
go func() {
507+
if results == nil || results.Votes == nil {
508+
log.Errorf("results are nil")
509+
return
510+
}
501511
var myResults *indexertypes.Results
502512
var err error
503513
retries := 50
@@ -519,24 +529,30 @@ func (s *Scrutinizer) OnProcessResults(pid []byte, results []*models.QuestionRes
519529
return
520530
}
521531

522-
myVotes := BuildProcessResult(myResults, nil).GetVotes()
523-
if len(myVotes) != len(results) {
524-
log.Errorf("results validation failed: wrong result questions size")
525-
return
526-
}
527-
for i, q := range results {
532+
myVotes := BuildProcessResult(myResults, results.EntityId).GetVotes()
533+
correct := len(myVotes) != len(results.Votes)
534+
for i, q := range results.GetVotes() {
535+
if !correct {
536+
break
537+
}
528538
if len(q.Question) != len(myVotes[i].Question) {
529539
log.Errorf("results validation failed: wrong question size")
530-
return
540+
correct = false
541+
break
531542
}
532543
for j, v := range q.Question {
533544
if !bytes.Equal(v, myVotes[i].Question[j]) {
534545
log.Errorf("results validation failed: wrong question result")
535-
return
546+
correct = false
547+
break
536548
}
537549
}
538550
}
539-
log.Infof("published results for process %x are correct!", pid)
551+
if correct {
552+
log.Infof("published results for process %x are correct", pid)
553+
} else {
554+
log.Warnf("published results for process %x are not correct", pid)
555+
}
540556
}()
541557
s.updateProcessPool = append(s.updateProcessPool, pid)
542558
return nil

vochain/scrutinizer/vote.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ func (s *Scrutinizer) ComputeResult(processID []byte) error {
277277
}
278278

279279
// Execute callbacks
280-
for _, l := range s.eventListeners {
280+
for _, l := range s.eventOnResults {
281281
go l.OnComputeResults(results, p, height)
282282
}
283283
return nil

vochain/state.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type EventListener interface {
4444
OnCancel(pid []byte, txIndex int32)
4545
OnProcessKeys(pid []byte, encryptionPub, commitment string, txIndex int32)
4646
OnRevealKeys(pid []byte, encryptionPriv, reveal string, txIndex int32)
47-
OnProcessResults(pid []byte, results []*models.QuestionResult, txIndex int32) error
47+
OnProcessResults(pid []byte, results *models.ProcessResult, txIndex int32) error
4848
Commit(height uint32) (err error)
4949
Rollback()
5050
}

0 commit comments

Comments
 (0)