Skip to content

Commit f41322b

Browse files
committed
processArchive: use scrutinizer and add initial scan
Instead of subscribe directly to the vochain events, subscribe to the scrutinizer, this way we have access to a more standarized Process details and the full results (including weight). In addition perform an initial scan over all existing processes to ensure all of them are saved into the process archive. Signed-off-by: p4u <[email protected]>
1 parent c5db5fb commit f41322b

File tree

6 files changed

+99
-84
lines changed

6 files changed

+99
-84
lines changed

oracle/oracle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (o *Oracle) NewProcess(process *models.Process) error {
103103
// OnComputeResults is called once a process result is computed by the scrutinizer.
104104
// The Oracle will build and send a RESULTS transaction to the Vochain.
105105
// The transaction includes the final results for the process.
106-
func (o *Oracle) OnComputeResults(results *indexertypes.Results) {
106+
func (o *Oracle) OnComputeResults(results *indexertypes.Results, proc *indexertypes.Process, h uint32) {
107107
log.Infof("launching on computeResults callback for process %x", results.ProcessID)
108108
// check vochain process status
109109
vocProcessData, err := o.VochainApp.State.Process(results.ProcessID, true)

service/vochain.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,13 +129,17 @@ func Vochain(vconfig *config.VochainCfg, waitForSync bool,
129129

130130
// Process Archiver
131131
if vconfig.ProcessArchive {
132+
if sc == nil {
133+
err = fmt.Errorf("process archive needs indexer enabled")
134+
return
135+
}
132136
ipfs, ok := storage.(*data.IPFSHandle)
133137
if !ok {
134138
log.Warnf("storage is not IPFS, archive publishing disabled")
135139
}
136140
log.Infof("starting process archiver on %s", vconfig.ProcessArchiveDataDir)
137141
processarchive.NewProcessArchive(
138-
vnode,
142+
sc,
139143
ipfs,
140144
vconfig.ProcessArchiveDataDir,
141145
vconfig.ProcessArchiveKey,

vochain/processarchive/processarchive.go

Lines changed: 82 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package processarchive
33
import (
44
"encoding/json"
55
"fmt"
6-
"math/big"
76
"os"
87
"path/filepath"
98
"sync"
@@ -12,24 +11,22 @@ import (
1211
"go.vocdoni.io/dvote/data"
1312
"go.vocdoni.io/dvote/log"
1413
"go.vocdoni.io/dvote/types"
15-
"go.vocdoni.io/dvote/vochain"
16-
"go.vocdoni.io/proto/build/go/models"
14+
"go.vocdoni.io/dvote/vochain/scrutinizer"
15+
"go.vocdoni.io/dvote/vochain/scrutinizer/indexertypes"
1716
)
1817

1918
type ProcessArchive struct {
20-
vochain *vochain.BaseApplication
19+
indexer *scrutinizer.Scrutinizer
2120
ipfs *data.IPFSHandle
2221
storage *jsonStorage
23-
pprocs []*Process
2422
publish chan (bool)
2523
lastUpdate time.Time
2624
close chan (bool)
2725
}
2826

2927
type Process struct {
30-
Process *models.Process `json:"process"`
31-
Votes uint32 `json:"votes"`
32-
Weight *big.Int `json:"weight,omitempty"`
28+
ProcessInfo *indexertypes.Process `json:"process"`
29+
Results *indexertypes.Results `json:"results"`
3330
}
3431

3532
type jsonStorage struct {
@@ -48,7 +45,7 @@ func NewJsonStorage(datadir string) (*jsonStorage, error) {
4845

4946
// AddProcess adds an entire process to js
5047
func (js *jsonStorage) AddProcess(p *Process) error {
51-
if p == nil || p.Process == nil || len(p.Process.ProcessId) != types.ProcessIDsize {
48+
if p == nil || p.ProcessInfo == nil || len(p.ProcessInfo.ID) != types.ProcessIDsize {
5249
return fmt.Errorf("process not valid")
5350
}
5451
data, err := json.MarshalIndent(p, "", "\t")
@@ -58,7 +55,20 @@ func (js *jsonStorage) AddProcess(p *Process) error {
5855
js.lock.Lock()
5956
defer js.lock.Unlock()
6057
// TO-DO: use https://github.com/google/renameio
61-
return os.WriteFile(filepath.Join(js.datadir, fmt.Sprintf("%x", p.Process.ProcessId)), data, 0o644)
58+
return os.WriteFile(filepath.Join(js.datadir, fmt.Sprintf("%x", p.ProcessInfo.ID)), data, 0o644)
59+
}
60+
61+
// ProcessExist returns true if a process already existin in the storage
62+
func (js *jsonStorage) ProcessExist(pid []byte) (bool, error) {
63+
js.lock.Lock()
64+
defer js.lock.Unlock()
65+
if _, err := os.Stat(filepath.Join(js.datadir, fmt.Sprintf("%x", pid))); err != nil {
66+
if os.IsNotExist(err) {
67+
return false, nil
68+
}
69+
return false, err
70+
}
71+
return true, nil
6272
}
6373

6474
// NewProcessArchive creates a new instance of the process archiver.
@@ -67,19 +77,29 @@ func (js *jsonStorage) AddProcess(p *Process) error {
6777
// The key parameter must be either a valid IPFS base64 encoded private key
6878
// or empty (a new key will be generated).
6979
// If ipfs is nil, only JSON archive storage will be performed.
70-
func NewProcessArchive(v *vochain.BaseApplication, ipfs *data.IPFSHandle,
80+
func NewProcessArchive(s *scrutinizer.Scrutinizer, ipfs *data.IPFSHandle,
7181
datadir, key string) (*ProcessArchive, error) {
7282
js, err := NewJsonStorage(datadir)
7383
if err != nil {
7484
return nil, fmt.Errorf("could not create process archive: %w", err)
7585
}
7686
ir := &ProcessArchive{
77-
vochain: v,
87+
indexer: s,
7888
ipfs: ipfs,
7989
storage: js,
8090
publish: make(chan (bool), 1),
8191
close: make(chan (bool), 1), // TO-DO: use a context
8292
}
93+
94+
// Perform an initial scan to add previous processes
95+
// This might not be required since fast-sync should be able to find all processes,
96+
// but for security reasons we perform this redundant scan over the previous processes.
97+
// For instance, if the process archive format is changed, without this initial scan
98+
// the node should synchronize the blockchain from scratch.
99+
if err := ir.ProcessScan(0); err != nil {
100+
return nil, err
101+
}
102+
83103
if ipfs != nil {
84104
if err := ir.AddKey(key); err != nil {
85105
return nil, err
@@ -93,78 +113,68 @@ func NewProcessArchive(v *vochain.BaseApplication, ipfs *data.IPFSHandle,
93113
ir.publish <- true
94114
go ir.publishLoop()
95115
}
96-
v.State.AddEventListener(ir)
97-
return ir, nil
98-
}
116+
// Subscribe to events for new processes
117+
s.AddEventListener(ir)
99118

100-
// Rollback resets the pending process list
101-
func (i *ProcessArchive) Rollback() {
102-
i.pprocs = []*Process{}
119+
return ir, nil
103120
}
104121

105-
// Commit adds pending processes to the process archive
106-
func (i *ProcessArchive) Commit(height uint32) error {
107-
for _, p := range i.pprocs {
108-
if err := i.storage.AddProcess(p); err != nil {
109-
log.Errorf("cannot add json process: %v", err)
122+
// ProcessScan search for previous (not added) processes to the archive and adds them.
123+
// This method is build for being executed only once (when bootstraping) since new
124+
// processes will automatically be added to the archive by event callbacks.
125+
func (pa *ProcessArchive) ProcessScan(fromBlock int) error {
126+
pids, err := pa.indexer.ProcessList(nil, fromBlock,
127+
int(pa.indexer.ProcessCount(nil)), "", 0, "", "RESULTS", true)
128+
if err != nil {
129+
return err
130+
}
131+
log.Infof("scanning blockchain processes from block %d", fromBlock)
132+
added := 0
133+
startTime := time.Now()
134+
for _, p := range pids {
135+
exists, err := pa.storage.ProcessExist(p)
136+
if err != nil {
137+
log.Warnf("processScan: %v", err)
110138
continue
111139
}
112-
log.Infof("stored json process %x", p.Process.ProcessId)
113-
}
114-
// publish to IPFS if there is a new process with results
115-
if len(i.pprocs) > 0 && i.ipfs != nil {
116-
log.Debugf("sending archive publish signal for height %d", height)
117-
select {
118-
case i.publish <- true:
119-
default: // do nothing
140+
if exists {
141+
continue
142+
}
143+
procInfo, err := pa.indexer.ProcessInfo(p)
144+
if err != nil {
145+
return err
146+
}
147+
results, err := pa.indexer.GetResults(p)
148+
if err != nil {
149+
return err
120150
}
151+
if err := pa.storage.AddProcess(&Process{ProcessInfo: procInfo, Results: results}); err != nil {
152+
log.Warnf("processScan: %v", err)
153+
}
154+
added++
121155
}
156+
log.Infof("archive scan added %d archive processes, took %s", added, time.Since(startTime))
122157
return nil
123158
}
124159

125-
// OnProcessResults adds the process & results to the pending process list
126-
func (i *ProcessArchive) OnProcessResults(pid []byte,
127-
results []*models.QuestionResult, txindex int32) error {
128-
process, err := i.vochain.State.Process(pid, false)
129-
if err != nil {
130-
return fmt.Errorf("cannot get process %x info: %w", pid, err)
160+
// OnComputeResults implements the indexer event callback
161+
func (pa *ProcessArchive) OnComputeResults(results *indexertypes.Results,
162+
proc *indexertypes.Process, height uint32) {
163+
if err := pa.storage.AddProcess(&Process{ProcessInfo: proc, Results: results}); err != nil {
164+
log.Errorf("cannot add json process: %v", err)
165+
return
131166
}
132-
process.Results = &models.ProcessResult{Votes: results, ProcessId: pid}
133-
votes := i.vochain.State.CountVotes(pid, false)
134-
i.pprocs = append(i.pprocs, &Process{
135-
Votes: votes,
136-
Process: process,
137-
Weight: new(big.Int).SetUint64(uint64(votes)), // TODO: return the correct weight
138-
})
139-
return nil
140-
}
167+
log.Infof("stored json process %x", proc.ID)
141168

142-
// Close closes the process archive
143-
func (i *ProcessArchive) Close() {
144-
i.close <- true
169+
// send publish signal
170+
log.Debugf("sending archive publish signal for height %d", height)
171+
select {
172+
case pa.publish <- true:
173+
default: // do nothing
174+
}
145175
}
146176

147-
// NOT USED but required for implementing the interface
148-
149-
// OnCancel does nothing
150-
func (i *ProcessArchive) OnCancel(pid []byte, txindex int32) {}
151-
152-
// OnVote does nothing
153-
func (i *ProcessArchive) OnVote(v *models.Vote, txindex int32) {}
154-
155-
// OnNewTx does nothing
156-
func (i *ProcessArchive) OnNewTx(blockHeight uint32, txIndex int32) {}
157-
158-
// OnProcessKeys does nothing
159-
func (i *ProcessArchive) OnProcessKeys(pid []byte, pub, com string, txindex int32) {}
160-
161-
// OnRevealKeys does nothing
162-
func (i *ProcessArchive) OnRevealKeys(pid []byte, priv, rev string, txindex int32) {}
163-
164-
// OnProcessStatusChange does nothing
165-
func (i *ProcessArchive) OnProcessStatusChange(pid []byte,
166-
status models.ProcessStatus, txindex int32) {
177+
// Close closes the process archive
178+
func (pa *ProcessArchive) Close() {
179+
pa.close <- true
167180
}
168-
169-
// OnProcess does nothing
170-
func (i *ProcessArchive) OnProcess(pid, eid []byte, censusRoot, censusURI string, txindex int32) {}

vochain/scrutinizer/indexertypes/results.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ const (
1919

2020
// Results holds the final results and relevant process info for a vochain process
2121
type Results struct {
22-
ProcessID types.HexBytes `badgerholdKey:"ProcessID"`
23-
Votes [][]*big.Int
24-
Weight *big.Int
25-
EnvelopeHeight uint64
22+
ProcessID types.HexBytes `badgerholdKey:"ProcessID" json:"processId"`
23+
Votes [][]*big.Int `json:"votes"`
24+
Weight *big.Int `json:"weight"`
25+
EnvelopeHeight uint64 `json:"envelopeHeight"`
2626
EnvelopeType *models.EnvelopeType `json:"envelopeType"`
2727
VoteOpts *models.ProcessVoteOptions `json:"voteOptions"`
28-
Signatures []types.HexBytes
29-
Final bool
30-
BlockHeight uint32
28+
Signatures []types.HexBytes `json:"signatures"`
29+
Final bool `json:"final"`
30+
BlockHeight uint32 `json:"blockHeight"`
3131
}
3232

3333
// String formats the results in a human-readable string

vochain/scrutinizer/scrutinizer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const (
3333
// EventListener is an interface used for executing custom functions during the
3434
// events of the tally of a process.
3535
type EventListener interface {
36-
OnComputeResults(results *indexertypes.Results)
36+
OnComputeResults(results *indexertypes.Results, process *indexertypes.Process, height uint32)
3737
}
3838

3939
// AddEventListener adds a new event listener, to receive method calls on block

vochain/scrutinizer/vote.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,8 @@ func (s *Scrutinizer) GetEnvelopeHeight(processID []byte) (uint64, error) {
236236
// ComputeResult process a finished voting, compute the results and saves it in the Storage.
237237
// Once this function is called, any future live vote event for the processId will be discarted.
238238
func (s *Scrutinizer) ComputeResult(processID []byte) error {
239-
log.Debugf("computing results for %x", processID)
239+
height := s.App.Height()
240+
log.Debugf("computing results on height %d for %x", height, processID)
240241

241242
// Get process from database
242243
p, err := s.ProcessInfo(processID)
@@ -277,7 +278,7 @@ func (s *Scrutinizer) ComputeResult(processID []byte) error {
277278

278279
// Execute callbacks
279280
for _, l := range s.eventListeners {
280-
go l.OnComputeResults(results)
281+
go l.OnComputeResults(results, p, height)
281282
}
282283
return nil
283284
}

0 commit comments

Comments
 (0)