@@ -85,7 +85,6 @@ abstract class AbstractRouter(
85
85
86
86
/* *
87
87
* Flushes all pending message parts for all peers
88
- * @see addPendingRpcPart
89
88
*/
90
89
protected fun flushAllPending () {
91
90
pendingRpcParts.pendingPeers.forEach(::flushPending)
@@ -163,7 +162,7 @@ abstract class AbstractRouter(
163
162
164
163
// Validate message
165
164
if (! validateMessageListLimits(msg)) {
166
- logger.debug(" Dropping msg with lists exceeding limits from peer $peer " )
165
+ logger.debug(" Dropping msg with lists exceeding limits from peer {} " , peer )
167
166
return
168
167
}
169
168
@@ -173,7 +172,7 @@ abstract class AbstractRouter(
173
172
.filterIncomingSubscriptions(subscriptions, peersTopics.getByFirst(peer))
174
173
.forEach { handleMessageSubscriptions(peer, it) }
175
174
} catch (e: Exception ) {
176
- logger.debug(" Subscription filter error, ignoring message from peer $peer " , e)
175
+ logger.debug(" Subscription filter error, ignoring message from peer {} " , peer , e)
177
176
return
178
177
}
179
178
@@ -182,7 +181,7 @@ abstract class AbstractRouter(
182
181
}
183
182
184
183
val (msgSubscribed, nonSubscribed) = msg.publishList
185
- .partition { it .topicIDsList.any { it in subscribedTopics } }
184
+ .partition { rpcMsg -> rpcMsg .topicIDsList.any { it in subscribedTopics } }
186
185
187
186
nonSubscribed.forEach { notifyNonSubscribedMessage(peer, it) }
188
187
@@ -207,7 +206,7 @@ abstract class AbstractRouter(
207
206
messageValidator.validate(it)
208
207
true
209
208
} catch (e: Exception ) {
210
- logger.debug(" Invalid pubsub message from peer $peer : $it " , e)
209
+ logger.debug(" Invalid pubsub message from peer {}: {} " , peer, it , e)
211
210
seenMessages[it] = Optional .of(ValidationResult .Invalid )
212
211
notifyUnseenInvalidMessage(peer, it)
213
212
false
@@ -248,8 +247,16 @@ abstract class AbstractRouter(
248
247
{ res, err ->
249
248
when {
250
249
err != null -> logger.warn(" Exception while handling message from peer $peer : ${it.first} " , err)
251
- res == ValidationResult .Invalid -> logger.debug(" Invalid pubsub message from peer $peer : ${it.first} " )
252
- res == ValidationResult .Ignore -> logger.trace(" Ignoring pubsub message from peer $peer : ${it.first} " )
250
+ res == ValidationResult .Invalid -> logger.debug(
251
+ " Invalid pubsub message from peer {}: {}" ,
252
+ peer,
253
+ it.first
254
+ )
255
+ res == ValidationResult .Ignore -> logger.trace(
256
+ " Ignoring pubsub message from peer {}: {}" ,
257
+ peer,
258
+ it.first
259
+ )
253
260
else -> {
254
261
newValidatedMessages(singletonList(it.first), peer)
255
262
flushAllPending()
@@ -273,15 +280,15 @@ abstract class AbstractRouter(
273
280
274
281
override fun onPeerWireException (peer : PeerHandler ? , cause : Throwable ) {
275
282
// exception occurred in protobuf decoders
276
- logger.debug(" Malformed message from $peer : $cause " )
283
+ logger.debug(" Malformed message from {} : {} " , peer, cause )
277
284
peer?.also { notifyMalformedMessage(it) }
278
285
}
279
286
280
287
override fun onServiceException (peer : PeerHandler ? , msg : Any? , cause : Throwable ) {
281
288
if (cause is BadPeerException ) {
282
- logger.debug(" Remote peer ($peer ) misbehaviour on message $msg : $cause " )
289
+ logger.debug(" Remote peer ({} ) misbehaviour on message {} : {} " , peer, msg, cause )
283
290
} else {
284
- logger.warn(" AbstractRouter internal error on message $msg from peer $peer " , cause)
291
+ logger.warn(" AbstractRouter internal error on message {} from peer {} " , msg, peer , cause)
285
292
}
286
293
}
287
294
0 commit comments