Skip to content

Commit 8957f84

Browse files
authored
Handle request serialisation errors asynchronously (#154)
1 parent 4966e07 commit 8957f84

File tree

3 files changed

+41
-5
lines changed

3 files changed

+41
-5
lines changed

src/network/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ export class Connection extends EventEmitter {
350350
} catch (err) {
351351
diagnostic.error = err as Error
352352
connectionsApiChannel.error.publish(diagnostic)
353-
throw err
353+
return callback(err, undefined as unknown as ReturnType)
354354
}
355355

356356
writer.appendFrom(payload).prependLength()

test/clients/producer/producer.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import * as Prometheus from 'prom-client'
44
import { kConnections } from '../../../src/clients/base/base.ts'
55
import {
66
type ClientDiagnosticEvent,
7+
compressionsAlgorithms,
8+
GenericError,
79
initProducerIdV5,
810
instancesChannel,
911
MultipleErrors,
@@ -691,6 +693,30 @@ test('send should auto-initialize idempotent producer if needed', async t => {
691693
strictEqual(result.offsets?.length, 1)
692694
})
693695

696+
test('send should handle synchronuous error during payload creation', async t => {
697+
const producer = createProducer(t, { strict: true })
698+
const testTopic = await createTopic(t)
699+
700+
const compression = 'lz4'
701+
const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD')
702+
t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => { throw expectedError })
703+
704+
await rejects(
705+
706+
async () => {
707+
await producer.send({
708+
messages: [{ topic: testTopic, value: Buffer.from('auto-init-idempotent-message') }],
709+
compression,
710+
})
711+
},
712+
(error: any) => {
713+
strictEqual(error instanceof AggregateError, true)
714+
strictEqual(error.errors[0], expectedError)
715+
return true
716+
}
717+
)
718+
})
719+
694720
test('send should validate options in strict mode', async t => {
695721
const producer = createProducer(t, { strict: true })
696722
const testTopic = await createTopic(t)

test/network/connection.test.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -752,17 +752,27 @@ test('Connection should handle request serialization errors', async t => {
752752
throw new Error('Parser error')
753753
}
754754

755-
// Send a request
756-
await throws(() =>
755+
const requestPromise = new Promise((resolve, reject) => {
757756
connection.send(
758757
0, // apiKey
759758
0, // apiVersion
760759
payloadFn as () => Writer,
761760
parser,
762761
false,
763762
false,
764-
() => {}
765-
))
763+
(err: any) => {
764+
if (err) {
765+
reject(err)
766+
}
767+
768+
resolve(null)
769+
}
770+
)
771+
})
772+
773+
await rejects(() => requestPromise, {
774+
message: 'Serialization error'
775+
})
766776

767777
verifyTracingChannel()
768778
})

0 commit comments

Comments
 (0)