Skip to content

Commit 4039928

Browse files
committed
Merge branch 'dev' into feature/hash
2 parents 3153014 + 9ba176b commit 4039928

File tree

27 files changed

+1146
-306
lines changed

27 files changed

+1146
-306
lines changed

.gitignore

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
*.dylib
1010

1111
openmesh-core
12-
core
1312

1413
# Test binary, built with `go test -c`
1514
*.test
@@ -36,8 +35,13 @@ go.work
3635
.DS_Store
3736
executable-file
3837
/ignore
38+
/collector/.env
3939
log.txt
4040

41+
# Default cometbft state (config.toml is important)
42+
/default-cometbft-home/data
43+
/default-cometbft-home/config/node_key.json
44+
/default-cometbft-home/config/priv_validator_key.json
4145
# Get rid of logs
4246
logs
4347

collector/collector.go

Lines changed: 25 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import (
44
"context"
55

66
"github.com/ipfs/go-cid"
7-
"github.com/multiformats/go-multicodec"
87
log "github.com/openmesh-network/core/internal/logger"
8+
"github.com/openmesh-network/core/resourcepool"
99
"github.com/sourcegraph/conc"
1010
)
1111

@@ -15,16 +15,14 @@ type Request struct {
1515
}
1616

1717
type Summary struct {
18-
// XXX: This might not be efficient, array of pointers means many cache misses.
19-
// Not sure if the Go compiler will realize we want these sequentially in memory.
20-
// But whatever man we're only doing a handfull of these a second.
2118
DataHashes []cid.Cid
2219
}
2320

2421
type CollectorWorker struct {
25-
summary *Summary
26-
request Request
27-
message chan []byte
22+
summary *Summary
23+
request Request
24+
message chan []byte
25+
rpStream *resourcepool.Stream
2826

2927
// Could make these into the same function.
3028
pause chan bool
@@ -43,7 +41,7 @@ type CollectorInstance struct {
4341
subscriptionsCancel context.CancelFunc
4442
}
4543

46-
const WORKER_COUNT = 5
44+
const WORKER_COUNT = 1
4745

4846
// const BUFFER_SIZE_MAX = 1024
4947
// const BUFFER_MAX = 1024
@@ -93,6 +91,7 @@ func (ci *CollectorInstance) SubmitRequests(requestsSortedByPriority []Request)
9391

9492
// Have to declare variable here otherwise go will pass i as value and cause problems.
9593
index := i
94+
9695
subscribeFunc := func() {
9796
r := requestsSortedByPriority[index]
9897

@@ -102,13 +101,14 @@ func (ci *CollectorInstance) SubmitRequests(requestsSortedByPriority []Request)
102101

103102
log.Info("Subscribing ", index)
104103
messageChannel, err := Subscribe(ci.subscriptionsContext, r.Source, r.Source.Topics[r.Topic])
104+
105+
// XXX: Handle this case by skipping this request.
106+
// Worker will do nothing for this period. Maybe optimize this?
105107
if err != nil {
106-
// XXX: Handle this case by skipping this request.
107-
// Panicking now to highlight this.
108-
panic(err)
108+
log.Warn("Couldnt connect to source: ", r.Source.Name, "-", r.Source.Topics[r.Topic], ", skipping instead. Reason: ", err)
109+
} else {
110+
ci.workers[index].message = messageChannel
109111
}
110-
111-
ci.workers[index].message = messageChannel
112112
}
113113

114114
subscribeWaitGroup.Go(subscribeFunc)
@@ -127,42 +127,9 @@ func (ci *CollectorInstance) SubmitRequests(requestsSortedByPriority []Request)
127127
return ci.summariesOld[:maxSummaries]
128128
}
129129

130-
func (cw *CollectorWorker) run(ctx context.Context, buffer []byte) {
130+
func (cw *CollectorWorker) run(ctx context.Context) {
131131
log.Info("Started worker.")
132132

133-
if buffer == nil {
134-
panic("Buffer is nil dummy.")
135-
}
136-
if len(buffer) < 100 {
137-
panic("Buffer is too small, is this an error?")
138-
}
139-
140-
// XXX: Maybe move this function to RP? Also it will crash if length == 0
141-
// Also I could move this to another function.
142-
summaryAppend := func(summary *Summary, buffer []byte, length int) {
143-
// TODO: Consider adding:
144-
// - Timestamp.
145-
// - Fragmentation flag (Whether there is a half message or not).
146-
// - Message count.
147-
148-
cidBuilder := cid.V1Builder{
149-
Codec: uint64(multicodec.DagPb),
150-
MhType: uint64(multicodec.Sha2_256),
151-
MhLength: -1,
152-
}
153-
154-
c, err := cidBuilder.Sum(buffer[0:length])
155-
if err != nil {
156-
// If this fails to parse a buffer the input is invalid.
157-
panic(err)
158-
}
159-
160-
summary.DataHashes = append(summary.DataHashes, c)
161-
162-
log.Info("Added ", length, " bytes, now: ", c.String())
163-
}
164-
165-
bufferOffset := 0
166133
printedDebug := false
167134
paused := false
168135
log.Info("Running for loop.")
@@ -177,6 +144,7 @@ func (cw *CollectorWorker) run(ctx context.Context, buffer []byte) {
177144
case <-cw.resume:
178145
log.Info("Worker resumed.")
179146
// Clear the summary cid buffer.
147+
cw.rpStream.Reset()
180148
cw.summary.DataHashes = cw.summary.DataHashes[:0]
181149
paused = false
182150

@@ -189,16 +157,15 @@ func (cw *CollectorWorker) run(ctx context.Context, buffer []byte) {
189157
log.Info("Channel stopped.")
190158

191159
// Flush the buffer!
192-
if len(buffer) > 0 {
193-
log.Info("Flushed")
194-
summaryAppend(cw.summary, buffer, len(buffer))
195-
}
160+
cw.rpStream.Flush()
196161

197162
// Hopefully go will just call memset here...
198-
for i := range buffer {
199-
buffer[i] = 0
200-
}
201-
bufferOffset = 0
163+
log.Debug("One: ", len(cw.rpStream.GetCids()))
164+
log.Debug("Two: ", len(cw.summary.DataHashes))
165+
166+
cw.summary.DataHashes = make([]cid.Cid, len(cw.rpStream.GetCids()))
167+
168+
copy(cw.summary.DataHashes[:], cw.rpStream.GetCids())
202169

203170
log.Info("Worker paused until resume is called.")
204171
paused = true
@@ -215,26 +182,7 @@ func (cw *CollectorWorker) run(ctx context.Context, buffer []byte) {
215182
break
216183
}
217184

218-
if bufferOffset+len(message) > len(buffer) {
219-
// TODO: Add to Resource Pool from here?
220-
summaryAppend(cw.summary, buffer, bufferOffset)
221-
bufferOffset = 0
222-
}
223-
224-
// If the message still doesn't fit, divide it into chunks and add it until it fits.
225-
for len(message) > len(buffer) {
226-
// XXX: Should the cids we post be capped at the length of the buffer?
227-
// Or can they be any size? For now I assume they are capped at the size of the buffer.
228-
// Do we do padding? Need a spreadsheet to "empirically" test this.
229-
summaryAppend(cw.summary, message, len(buffer))
230-
message = message[len(buffer):]
231-
}
232-
233-
// Add message to buffer.
234-
copy(buffer[bufferOffset:], message)
235-
// log.Info("Done here.")
236-
237-
bufferOffset += len(message)
185+
cw.rpStream.Append(message)
238186
}
239187
}
240188
}
@@ -246,14 +194,14 @@ func (ci *CollectorInstance) Start(ctx context.Context) {
246194
ci.ctx = ctx
247195

248196
for i := range ci.workers {
249-
buffer := make([]byte, 4096)
250197
ci.workers[i].pause = make(chan bool)
251198
ci.workers[i].resume = make(chan bool)
252199
ci.workers[i].message = make(chan []byte)
253200
ci.workers[i].summary = &ci.summariesNew[i]
201+
ci.workers[i].rpStream = resourcepool.NewStream()
254202

255203
index := i
256-
runFunc := func() { ci.workers[index].run(ci.ctx, buffer) }
204+
runFunc := func() { ci.workers[index].run(ci.ctx) }
257205

258206
log.Infof("Deploying worker for collector.")
259207
ci.workerWaitGroup.Go(runFunc)

collector/collector_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,5 @@ import (
55
)
66

77
func TestBasic(t *testing.T) {
8-
8+
New()
99
}

collector/data_size1.csv

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
Source,Symbol,Data Size (bytes),Throughput,"Busiest Time (Data collected for 60 seconds Btw 13:28:07, 29 Apr 2024 till 13:29:07, 29 Apr 2024)",Throughput in the interval,messages count received during interval
2+
bybit,publicTrade.BTCUSDT,151389,2523,2024-04-29T13:28:30+10:00,5542.90,217
3+
okx,sprd-books5,318,5,2024-04-29T13:28:00+10:00,10.70,2
4+
okx,sprd-public-trades,114,1,2024-04-29T13:28:00+10:00,11.40,1
5+
coinbase,BTC-USD,116172,1936,2024-04-29T13:28:30+10:00,2657.20,66
6+
binance,btcusdt,131467,2191,2024-04-29T13:28:30+10:00,3328.40,199
7+
okx,sprd-tickers,326,5,2024-04-29T13:28:00+10:00,10.80,2
8+
coinbase,ETH-USD,165348,2755,2024-04-29T13:28:30+10:00,7399.20,185
9+
binance,solusdt,33707,561,2024-04-29T13:28:30+10:00,1138.90,70
10+
binance,ethusdt,249488,4158,2024-04-29T13:28:30+10:00,11899.00,715
11+
coinbase,BTC-ETH,128,2,2024-04-29T13:28:00+10:00,9.00,2
12+
bybit,orderbook.50.BTCUSDT,1010316,16838,2024-04-29T13:28:30+10:00,21417.10,485
13+
polygon-ankr-rpc,,919856,15330,2024-04-29T13:28:20+10:00,17162.30,3
14+
ethereum-ankr-rpc,,218175,3636,2024-04-29T13:28:30+10:00,7719.40,1
15+
bybit,kline.M.BTCUSDT,15137,252,2024-04-29T13:28:40+10:00,300.70,10
16+
bybit,tickers.BTCUSDT,218856,3647,2024-04-29T13:28:30+10:00,7737.50,237
17+
dydx,MATIC-USD,11941,199,2024-04-29T13:28:00+10:00,1079.50,2
18+
dydx,ETH-USD,23918,398,2024-04-29T13:28:00+10:00,1255.10,9
19+
dydx,SOL-USD,10768,179,2024-04-29T13:28:00+10:00,9.00,2
20+
dydx,LINK-USD,10816,180,2024-04-29T13:28:00+10:00,1055.50,2
21+
okx,sprd-bbo-tbt,261,4,2024-04-29T13:28:00+10:00,10.80,2
22+
dydx,BTC-USD,14570,242,2024-04-29T13:28:00+10:00,1083.00,2

0 commit comments

Comments
 (0)