diff --git a/src/network/connection.ts b/src/network/connection.ts index 780bdb8..906a071 100644 --- a/src/network/connection.ts +++ b/src/network/connection.ts @@ -350,7 +350,7 @@ export class Connection extends EventEmitter { } catch (err) { diagnostic.error = err as Error connectionsApiChannel.error.publish(diagnostic) - throw err + return callback(err, undefined as unknown as ReturnType) } writer.appendFrom(payload).prependLength() diff --git a/test/clients/producer/producer.test.ts b/test/clients/producer/producer.test.ts index b028d5d..e96eada 100644 --- a/test/clients/producer/producer.test.ts +++ b/test/clients/producer/producer.test.ts @@ -4,6 +4,8 @@ import * as Prometheus from 'prom-client' import { kConnections } from '../../../src/clients/base/base.ts' import { type ClientDiagnosticEvent, + compressionsAlgorithms, + GenericError, initProducerIdV5, instancesChannel, MultipleErrors, @@ -691,6 +693,30 @@ test('send should auto-initialize idempotent producer if needed', async t => { strictEqual(result.offsets?.length, 1) }) +test('send should handle synchronuous error during payload creation', async t => { + const producer = createProducer(t, { strict: true }) + const testTopic = await createTopic(t) + + const compression = 'lz4' + const expectedError = new GenericError('PLT_KFK_UNSUPPORTED_COMPRESSION', 'Avoid RUD') + t.mock.method(compressionsAlgorithms[compression], 'compressSync', () => { throw expectedError }) + + await rejects( + + async () => { + await producer.send({ + messages: [{ topic: testTopic, value: Buffer.from('auto-init-idempotent-message') }], + compression, + }) + }, + (error: any) => { + strictEqual(error instanceof AggregateError, true) + strictEqual(error.errors[0], expectedError) + return true + } + ) +}) + test('send should validate options in strict mode', async t => { const producer = createProducer(t, { strict: true }) const testTopic = await createTopic(t) diff --git a/test/network/connection.test.ts b/test/network/connection.test.ts index a3c2427..139aa30 100644 --- a/test/network/connection.test.ts +++ b/test/network/connection.test.ts @@ -752,8 +752,7 @@ test('Connection should handle request serialization errors', async t => { throw new Error('Parser error') } - // Send a request - await throws(() => + const requestPromise = new Promise((resolve, reject) => { connection.send( 0, // apiKey 0, // apiVersion @@ -761,8 +760,19 @@ test('Connection should handle request serialization errors', async t => { parser, false, false, - () => {} - )) + (err: any) => { + if (err) { + reject(err) + } + + resolve(null) + } + ) + }) + + await rejects(() => requestPromise, { + message: 'Serialization error' + }) verifyTracingChannel() })