Skip to content

Commit f97166f

Browse files
authored
Enhance HighLevelProducer to take schema serializers (#92)
1 parent 4938194 commit f97166f

File tree

2 files changed

+79
-7
lines changed

2 files changed

+79
-7
lines changed

lib/producer/high-level-producer.js

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,37 @@ function createSerializer(serializer) {
4343
// operation if the number of parameters the function accepts is > 1
4444
return {
4545
apply: applyFn,
46-
async: serializer.length > 1
46+
async: serializer.length > 1,
47+
needsTopic: false
48+
};
49+
}
50+
51+
/**
52+
* Create a serializer that additionally takes the topic name
53+
*
54+
* Method simply wraps a serializer provided by a user
55+
* so it adds context to the error
56+
*
57+
* @returns {function} Serialization function
58+
*/
59+
function createTopicSerializer(serializer) {
60+
var applyFn = function serializationWrapper(t, v, cb) {
61+
try {
62+
return cb ? serializer(t, v, cb) : serializer(t, v);
63+
} catch (e) {
64+
var modifiedError = new Error('Could not serialize value: ' + e.message);
65+
modifiedError.value = v;
66+
modifiedError.serializer = serializer;
67+
throw modifiedError;
68+
}
69+
};
70+
71+
// We can check how many parameters the function has and activate the asynchronous
72+
// operation if the number of parameters the function accepts is > 2
73+
return {
74+
apply: applyFn,
75+
async: serializer.length > 2,
76+
needsTopic: true
4777
};
4878
}
4979

@@ -256,10 +286,20 @@ HighLevelProducer.prototype._modifiedProduce = function(topic, partition, messag
256286

257287
try {
258288
if (this.valueSerializer.async) {
259-
// If this is async we need to give it a callback
260-
this.valueSerializer.apply(message, valueSerializerCallback);
289+
if (this.valueSerializer.needsTopic) {
290+
// If this is async we need to give it a callback
291+
this.valueSerializer.apply(topic, message, valueSerializerCallback);
292+
} else {
293+
// If this is async we need to give it a callback
294+
this.valueSerializer.apply(message, valueSerializerCallback);
295+
}
261296
} else {
262-
var serializedValue = this.valueSerializer.apply(message);
297+
var serializedValue;
298+
if (this.valueSerializer.needsTopic) {
299+
serializedValue = this.valueSerializer.apply(topic, message);
300+
} else {
301+
serializedValue = this.valueSerializer.apply(message);
302+
}
263303
// Check if we were returned a promise in order to support promise behavior
264304
if (serializedValue &&
265305
typeof serializedValue.then === 'function' &&
@@ -272,10 +312,20 @@ HighLevelProducer.prototype._modifiedProduce = function(topic, partition, messag
272312
}
273313

274314
if (this.keySerializer.async) {
275-
// If this is async we need to give it a callback
276-
this.keySerializer.apply(key, keySerializerCallback);
315+
if (this.valueSerializer.needsTopic) {
316+
// If this is async we need to give it a callback
317+
this.keySerializer.apply(topic, key, keySerializerCallback);
318+
} else {
319+
// If this is async we need to give it a callback
320+
this.keySerializer.apply(key, keySerializerCallback);
321+
}
277322
} else {
278-
var serializedKey = this.keySerializer.apply(key);
323+
var serializedKey;
324+
if (this.valueSerializer.needsTopic) {
325+
serializedKey = this.keySerializer.apply(topic, key);
326+
} else {
327+
serializedKey = this.keySerializer.apply(key);
328+
}
279329
// Check if we were returned a promise in order to support promise behavior
280330
if (serializedKey &&
281331
typeof serializedKey.then === 'function' &&
@@ -319,3 +369,21 @@ HighLevelProducer.prototype.setKeySerializer = function(serializer) {
319369
HighLevelProducer.prototype.setValueSerializer = function(serializer) {
320370
this.valueSerializer = createSerializer(serializer);
321371
};
372+
373+
/**
374+
* Set the topic-key serializer
375+
*
376+
* A serializer that takes the topic name in addition to the key.
377+
*/
378+
HighLevelProducer.prototype.setTopicKeySerializer = function(serializer) {
379+
this.keySerializer = createTopicSerializer(serializer);
380+
};
381+
382+
/**
383+
* Set the topic-value serializer
384+
*
385+
* A serializer that takes the topic name in addition to the value.
386+
*/
387+
HighLevelProducer.prototype.setTopicValueSerializer = function(serializer) {
388+
this.valueSerializer = createTopicSerializer(serializer);
389+
};

types/rdkafka.d.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,10 @@ export class HighLevelProducer extends Producer {
296296
setKeySerializer(serializer: (key: any) => MessageKey | Promise<MessageKey>): void;
297297
setValueSerializer(serializer: (value: any, cb: (err: any, value: MessageValue) => void) => void): void;
298298
setValueSerializer(serializer: (value: any) => MessageValue | Promise<MessageValue>): void;
299+
setTopicKeySerializer(serializer: (topic: string, key: any, cb: (err: any, key: MessageKey) => void) => void): void;
300+
setTopicKeySerializer(serializer: (topic: string, key: any) => MessageKey | Promise<MessageKey>): void;
301+
setTopicValueSerializer(serializer: (topic: string, value: any, cb: (err: any, value: MessageValue) => void) => void): void;
302+
setTopicValueSerializer(serializer: (topic: string, value: any) => MessageValue | Promise<MessageValue>): void;
299303
}
300304

301305
export const features: string[];

0 commit comments

Comments
 (0)