Conversation
jeqo
left a comment
There was a problem hiding this comment.
@ferbercedo thanks for this PR. This is a beneficial feature for other users, I appreciate you taking the initiative to contribute it.
I have added a few comments to have a more complete solution to this, let's discuss. Also, please consider adding tests and docs to your proposal.
There was a problem hiding this comment.
This sender needs to know if the URL is configured to inject a key or not. Otherwise, it will wastefully transform a key in to a string, and eventually not use it.
Consider adding a flag that is passed from the configuration to have this condition.
| for (final SinkRecord record : records) { | ||
| final String body = recordValueConverter.convert(record); | ||
| httpSender.send(body); | ||
| httpSender.send(body, record.key().toString()); |
There was a problem hiding this comment.
Consider having a recordKeyConverter -- it should only convert known types (e.g. String, Numbers) into String to avoid messing with complex/unexpected conversions.
There was a problem hiding this comment.
Also, what happens when the key is null? Should it use an empty string? fail? use a fallback value?
| final var configOriginalMap = this.config.originalsStrings(); | ||
| final String genericUrl = configOriginalMap.get("http.url"); | ||
| final String newUrl = genericUrl.replace("{key}", key); | ||
| configOriginalMap.replace("http.url", newUrl); | ||
| log.debug("Key replaced in URL. New URL: {}", newUrl); | ||
|
|
||
| final var newConfig = new HttpSinkConfig(configOriginalMap); | ||
| log.debug("Sending request with body: {}", body); |
There was a problem hiding this comment.
Instead of modifying the configs on every execution, I'd prefer a more structured approach to flag this.
Consider:
- adding a new config, e.g.
http.url.inject.key.enable=true/falsedefaults: false. - maybe have a fallback value to use it when key is null or cannot be converted, e.g.
http.url.inject.key.fallback=stringdefaults: null -- if value cannot be converted or null, then fail at runtime. - delegate to the config on how to replace the key with the url by adding a new
config.httpUri(keySupplier.get())
| public interface HttpSender { | ||
| HttpResponse<String> send(final String body); | ||
|
|
||
| HttpResponse<String> send(final String body, String key); |
There was a problem hiding this comment.
Consider passing the key as a function instead of a value:
| HttpResponse<String> send(final String body, String key); | |
| HttpResponse<String> send(final String body, Supplier<String> keySupplier); |
|
Awesome work on this! |
Added functionality to replace '{key}' with registry key to single records.
For example, url: https://example.com/{key}
Record with Key 11, new url: https://example.com/11