Skip to content

Commit c7542a9

Browse files
committed
chore: add comment on cancel
1 parent 2d4638e commit c7542a9

File tree

2 files changed

+35
-13
lines changed

2 files changed

+35
-13
lines changed

src/handler.js

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ function handle ({ context, logger, batchSize = config.blocksBatchSize }) {
5656
const normalizedWantlist = getNormalizedWantlist(wantlist, context, logger)
5757

5858
// Set state of processing blocks in canceled state
59-
updateCanceledState(normalizedWantlist, context)
59+
updateCanceledState(normalizedWantlist, context, logger)
6060

6161
process.nextTick(async () => {
6262
// catch async error in libp2p connection
@@ -186,8 +186,20 @@ async function batchResponse ({ blocks, context, logger }) {
186186
let message = new Message()
187187
for (let i = 0; i < blocks.length; i++) {
188188
const block = blocks[i]
189+
// console.log('block key', block.key)
189190
const canceledItem = context.canceled.get(block.key)
190-
if (!canceledItem || canceledItem !== block.type) {
191+
logger.info({ keyList: context.canceled.keyList }, 'check keyList')
192+
logger.info({ key: block.key }, 'check')
193+
logger.info({ canceled: canceledItem }, 'canceled')
194+
195+
if (canceledItem === block.type) {
196+
const size = messageSize[block.type](block)
197+
telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type])
198+
telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size)
199+
200+
logger.info({ key: block.key }, 'delete')
201+
context.canceled.delete(block.key)
202+
} else {
191203
const size = messageSize[block.type](block)
192204

193205
// maxMessageSize MUST BE larger than a single block info/data
@@ -198,12 +210,6 @@ async function batchResponse ({ blocks, context, logger }) {
198210

199211
message.push(block, size, context.protocol)
200212
sentMetrics[block.type](block, size)
201-
} else {
202-
const size = messageSize[block.type](block)
203-
telemetry.increaseLabelCount('bitswap-block-success-cancel', [block.type])
204-
telemetry.increaseLabelCount('bitswap-cancel-size', [block.type], size)
205-
206-
context.canceled.delete(block.key)
207213
}
208214
}
209215

@@ -217,22 +223,27 @@ async function batchResponse ({ blocks, context, logger }) {
217223
}
218224
}
219225

220-
function updateCanceledState (wantList, context) {
226+
function updateCanceledState (wantList, context, logger) {
221227
const { wantedBlocks, wantedHave, canceled } = wantList
222228

223229
// Removed previous canceled blocks
224230
wantedBlocks.forEach(block => {
231+
logger.info({ key: block.key }, 'remove block from canceled')
225232
context.canceled.delete(block.key)
226233
})
227234

228235
wantedHave.forEach(block => {
236+
logger.info({ key: block.key }, 'remove wanted block from canceled')
229237
context.canceled.delete(block.key)
230238
})
231239

232240
// Add new canceled blocks
233241
canceled.forEach(block => {
242+
logger.info({ key: block.key }, 'add block to canceled')
234243
context.canceled.set(block.key, block.wantType)
235244
})
245+
246+
logger.info({ keyList: context.canceled.keyList }, 'check keyList')
236247
}
237248

238249
// end response, close connection

src/service.js

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
8282
autoDialInterval: connectionConfig.p2p.autoDialInterval
8383
}
8484
})
85+
const cancelsPerPeer = new Map()
8586

8687
const handlerOptions = {
8788
maxInboundStreams: connectionConfig.handler.maxInboundStreams,
@@ -109,7 +110,9 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
109110
service.handle(protocol, async ({ connection: dial, stream }) => {
110111
try {
111112
const connection = new Connection(stream)
112-
const canceled = new LRU({ max: 200 })
113+
const canceled = cancelsPerPeer.get(dial.remotePeer.toString())
114+
115+
!canceled && console.log('canceled')
113116

114117
const hrTime = process.hrtime()
115118
const connectionId = hrTime[0] * 1000000000 + hrTime[1]
@@ -146,12 +149,13 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
146149
// another multiplexed stream.
147150
connection.on('end:receive', () => {
148151
// GC canceled LRU on finish
149-
canceled.clear()
152+
logger.info({}, 'end:receive')
150153
connection.close()
151154
})
152155

153156
connection.on('error', err => {
154157
// GC canceled LRU on error
158+
logger.info({}, 'error')
155159
canceled.clear()
156160
logger.error({ err, dial, stream, protocol }, 'Connection error')
157161
})
@@ -163,20 +167,27 @@ async function startService ({ peerId, port, peerAnnounceAddr, awsClient, connec
163167

164168
// TODO move to networking
165169
service.connectionManager.addEventListener('peer:connect', connection => {
170+
cancelsPerPeer.set(
171+
connection.detail.remotePeer.toString(),
172+
new LRU({ max: 200 })
173+
)
166174
try {
167175
telemetry.increaseCount('bitswap-connections')
168176
telemetry.increaseGauge('bitswap-active-connections')
169177
} catch (err) {
170-
logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer connecting')
178+
logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer connecting')
171179
}
172180
})
173181

174182
// TODO move to networking
175183
service.connectionManager.addEventListener('peer:disconnect', connection => {
184+
cancelsPerPeer.delete(
185+
connection.detail.remotePeer.toString()
186+
)
176187
try {
177188
telemetry.decreaseGauge('bitswap-active-connections')
178189
} catch (err) {
179-
logger.warn({ err, remotePeer: connection.remotePeer }, 'Error while peer disconnecting')
190+
logger.warn({ err, remotePeer: connection.detail.remotePeer.toString() }, 'Error while peer disconnecting')
180191
}
181192
})
182193

0 commit comments

Comments
 (0)