-
Notifications
You must be signed in to change notification settings - Fork 622
Open
Description
Bug Report
If get offsets by timestamp (from Offset.fetch method) and broker responses with empty list, decoder goes to infinite loop due to extra bytes reading.
Environment
- Node version: v11.15.0
- Kafka-node version: 4.1.3
- Kafka version: 2.12-2.1.0
- Number of Brokers: 1
- Number partitions for topic: 1
Sample Code to reproduce behavior
const kafka = require('kafka-node');
const kafkaOptions = {
kafkaHost: '172.30.30.34:9094',
groupId: 'test',
autoCommit: false,
id: 'node_kafka',
fromOffset: 'earliest',
outOfRangeOffset: 'earliest',
autoConnect: true
};
const client = new kafka.KafkaClient(kafkaOptions);
const offset = new kafka.Offset(client);
offset.fetch([
{
topic: 'test',
partition: 0,
time: new Date('2001-10-12').getTime()
},
{
topic: 'test2',
partition: 0,
time: new Date('2001-10-12').getTime()
}
], (err, data) => {
console.log(err, data);
});Debug output
kafka-node:KafkaClient Connect attempt 1 +0ms
kafka-node:KafkaClient Trying to connect to host: 172.30.30.34 port: 9094 +2ms
kafka-node:KafkaClient kafka-node-client createBroker 172.30.30.34:9094 +1ms
kafka-node:KafkaClient kafka-node-client sending versions request to 172.30.30.34:9094 +5ms
kafka-node:KafkaClient broker socket connected {"host":"172.30.30.34","port":9094} +3ms
kafka-node:KafkaClient connected to socket, trying to load initial metadata +1ms
kafka-node:KafkaClient missing apiSupport waiting until broker is ready... +1ms
kafka-node:KafkaClient waitUntilReady [BrokerWrapper 172.30.30.34:9094 (connected: true) (ready: false) (idle: false) (needAuthentication: false) (authenticated: false)] +0ms
kafka-node:KafkaClient Received versions response from 172.30.30.34:9094 +4ms
kafka-node:KafkaClient setting api support to {"21":{"min":0,"max":1,"usable":false},"22":{"min":0,"max":1,"usable":false},"23":{"min":0,"max":2,"usable":false},"24":{"min":0,"max":1,"usable":false},"25":{"min":0,"max":1,"usable":false},"26":{"min":0,"max":1,"usable":false},"27":{"min":0,"max":0,"usable":false},"28":{"min":0,"max":2,"usable":false},"29":{"min":0,"max":1,"usable":false},"30":{"min":0,"max":1,"usable":false},"31":{"min":0,"max":1,"usable":false},"33":{"min":0,"max":1,"usable":false},"34":{"min":0,"max":1,"usable":false},"35":{"min":0,"max":1,"usable":false},"37":{"min":0,"max":1,"usable":false},"38":{"min":0,"max":1,"usable":false},"39":{"min":0,"max":1,"usable":false},"40":{"min":0,"max":1,"usable":false},"41":{"min":0,"max":1,"usable":false},"42":{"min":0,"max":1,"usable":false},"produce":{"min":0,"max":7,"usable":2},"fetch":{"min":0,"max":10,"usable":2},"offset":{"min":0,"max":4,"usable":0},"metadata":{"min":0,"max":7,"usable":1},"leader":{"min":0,"max":1,"usable":false},"stopReplica":{"min":0,"max":0,"usable":false},"updateMetadata":{"min":0,"max":4,"usable":false},"controlledShutdown":{"min":0,"max":1,"usable":false},"offsetCommit":{"min":0,"max":6,"usable":2},"offsetFetch":{"min":0,"max":5,"usable":1},"groupCoordinator":{"min":0,"max":2,"usable":0},"joinGroup":{"min":0,"max":3,"usable":0},"heartbeat":{"min":0,"max":2,"usable":0},"leaveGroup":{"min":0,"max":2,"usable":0},"syncGroup":{"min":0,"max":2,"usable":0},"describeGroups":{"min":0,"max":2,"usable":0},"listGroups":{"min":0,"max":2,"usable":0},"saslHandshake":{"min":0,"max":1,"usable":1},"apiVersions":{"min":0,"max":2,"usable":0},"createTopics":{"min":0,"max":3,"usable":1},"deleteTopics":{"min":0,"max":3,"usable":false},"describeConfigs":{"min":0,"max":2,"usable":0},"saslAuthenticate":{"min":0,"max":0,"usable":0}} +0ms
kafka-node:KafkaClient broker is now ready +0ms
kafka-node:KafkaClient kafka-node-client updated internal metadata +129ms
Possible fix
Just add
if (vars.offsetNum < 0) {
return;
}after 1129 line in protocol.js
Reactions are currently unavailable