WIP: Adding Kafka datasink.#60307
Conversation
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a KafkaDatasink for Ray Data, which is a valuable addition. The implementation is generally well-structured. I've identified a few areas for improvement to enhance robustness and maintainability. My feedback includes suggestions to refactor duplicated code, correct potentially buggy logic in object-to-dictionary conversion, add parameter validation, and fix an incorrect docstring example. Addressing these points will strengthen the new datasink implementation.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
owenowenisme
left a comment
There was a problem hiding this comment.
Thanks for the contribution!
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
…n't happen again. Signed-off-by: Justin Miller <justinrmiller@gmail.com>
…er/ray into 58725-Kafka-Datasync
|
|
||
| if TYPE_CHECKING: | ||
| from kafka import KafkaProducer | ||
| from kafka.errors import KafkaError, KafkaTimeoutError |
There was a problem hiding this comment.
Missing runtime imports for Kafka classes
High Severity
KafkaProducer, KafkaError, and KafkaTimeoutError are imported only under TYPE_CHECKING, meaning they won't exist at runtime. When the write method executes, it will fail with a NameError because these names are undefined. The existing kafka_datasource.py correctly handles this by importing inside the function that uses them (e.g., from kafka import KafkaConsumer on line 336 of that file).
Additional Locations (2)
| key = self._extract_key(row) | ||
|
|
||
| # Serialize value | ||
| value = self._serialize_value(row) |
There was a problem hiding this comment.
Redundant row-to-dict conversion per row
Low Severity
_row_to_dict is called twice for each row during processing—once in _extract_key and once in _serialize_value. The PR discussion explicitly noted this redundancy and the author marked it as "addressed", but the duplicate conversion remains. The row could be converted once and passed to both methods.
Additional Locations (2)
| try: | ||
| future.get(timeout=0) # Non-blocking check since we already flushed | ||
| except Exception: | ||
| failed_messages += 1 |
There was a problem hiding this comment.
Failed messages silently counted instead of raising exception
Medium Severity
When message delivery fails, the exception is caught and the failure is silently counted in failed_messages. Unlike other datasinks (e.g., BigQuery which raises RuntimeError on write failure), no exception is raised. Since write_kafka returns None, users have no way to know messages failed. This inconsistency with other datasinks could cause silent data loss.
| producer_config: Additional Kafka producer configuration (kafka-python format) | ||
| delivery_callback: Optional callback for delivery reports (called with metadata or exception) | ||
| """ | ||
| VALID_SERIALIZERS = {"json", "string", "bytes"} |
| self, | ||
| topic: str, | ||
| bootstrap_servers: str, | ||
| key_field: str | None = None, |
| key_serializer: str = "string", | ||
| value_serializer: str = "json", | ||
| producer_config: dict[str, Any] | None = None, | ||
| delivery_callback: Callable | None = None, |
There was a problem hiding this comment.
We should also accept and pass concurrency & ray_remote_args
| @@ -0,0 +1,537 @@ | |||
| import json | |||
There was a problem hiding this comment.
Let's move this test into test_kafka.py
| key = self._extract_key(row) | ||
|
|
||
| # Serialize value | ||
| value = self._serialize_value(row) |
| self, | ||
| topic: str, | ||
| bootstrap_servers: str, | ||
| key_field: str | None = None, |
| future.add_errback( | ||
| lambda e: self.delivery_callback(exception=e) | ||
| ) | ||
| futures.append(future) |
There was a problem hiding this comment.
Do you think we should have flush the future buffer when there are N items in buffer? If we have millions of rows this could accumulate.
| from kafka import KafkaProducer | ||
| from kafka.errors import KafkaError, KafkaTimeoutError | ||
|
|
||
| from ray.data import Datasink |
There was a problem hiding this comment.
This will give circular import error
| from ray.data import Datasink | |
| from ray.data.datasource.datasink import Datasink |
| """ | ||
| Convenience method to write Ray Dataset to Kafka. | ||
|
|
||
| Example: |
There was a problem hiding this comment.
| Example: | |
| Examples: |
ConsumptionAPi use word "Examples" to match inorder to insert doc
| # Close the producer | ||
| producer.close(timeout=5.0) | ||
|
|
||
| return {"total_records": total_records, "failed_messages": failed_messages} |


Description
This PR adds a Kafka Datasink to Ray, complementing the already existing Kafka Datasource.
Related issues
Closes #58725
Additional information
I will add additional information and tests later.