-
Notifications
You must be signed in to change notification settings - Fork 17
Technical Documentation
The connector takes advantage of a slightly modified version of the official Couchbase Kafka Connector.
The original connector reads a stream of documents from Couchbase and publish them on Kafka using a normal producer.
Instead of producing documents directly to Kafka, the class CouchbaseReader
adds the documents to a queue which is consumed by the task of the connector. In this way it's possible to rely on the Kafka Connect API to store and read offsets.
To limit the size of the queue and to avoid memory problems, every time there's a new document on the Couchbase feed, it's checked how many messages have been received in the last second. If the number of messages exceed the configured maximum drain rate, the function waits until the connector Task has started to consume the messages in the queue.
couchbase-kafka-connector v1.0.0 Javadoc
The class inherits from SourceConnector
and it's job is to start the connector Tasks using the supplied properties.
The start(Map<String, String> props)
is called for starting the connector. It checks that all the mandatory properties are configured and parses them.
It is then called the method taskConfigs(int maxTasks)
which job is to return a set of configurations for the Task.
The class implements the SourceTask
class and it's the responsible for publishing data to Kafka.
The start(Map<String, String> props)
is responsible for creating the Schema to use for validating data, it loads the saved offsets and it initializes and starts a CouchbaseConnector
.
Since the documents inside a Couchbase buckets can have different schemas, a standard schema is used:
{
"bucket": {
"type": ["String", "null"]
},
"document": {
"type": ["String", "null"]
},
"body": {
"type": ["String", "null"]
}
}
where bucket represents the bucket from which the document have been read, document it's the document id and body is the body of the document.
Inside the poll()
method the ConnectWriter
is asked for new documents to publish on Kafka. If new documents are present, for every document, the record to publish on Kafka is created using the supplied schema. Before adding the record to the list of records to publish on Kafka, the new offsets for the bucket are calculated.
To calculate the offsets, a local lookup map is used. For every record, the map is asked for the current offset of the partition to which the document belongs. The offset is then incremented and saved again on the map.
Inside this package is present a slightly modified version of the official Couchbase Kafka Connector.
The major modifications involve the classes CouchbaseReader
, ConnectWriter
, and SourceTaskContextStateSerializer
.
The class is responsible for connecting to Couchbase and then creates a stream for receiving mutation messages. In the original version of the connector, for every new mutation messaage, the normal Kafka Producer was used to ingest data into Kafka.
Instead of using the normal producer, inside the method run(final BucketStreamAggregatorState state, final RunMode mode)
, every mutation message is added to a queue. In order to avoid that the queue to grow faster than we can publish messages on Kafka, if in a second the method has received more messages than the configured maximum drain rate, it stops for a second.
In the original version of the Connector, this class was an implementation of a normal Kafka producer.
To allow the use of the Kafka Connector API we need a source from where read data using the poll()
function. This class it's a simple implementation of a concurrent queue.
The method addToQueue(final DCPEvent event)
checks if the event contains a mutation message. If the event passes the check, then the mutation message is extracted and added to the queue.
The method getQueue()
it's called every time the method poll()
it's called, and it simply returns a copy of the queue and clears the local queue.
The class is responsible of loading the last offsets for every partition. It is not necessary to save the offsets using this class since the save of the offsets it's managed by the Kafka Connect framework.
When the connector is started, the function load(BucketStreamAggregatorState aggregatorState)
is called. The function, for every partition, calls the function load(BucketStreamAggregatorState aggregatorState, Short partition)
.
Inside this function, the offset of the last saved document on Kafka is loaded and then used to create a BucketStreamState
. The BucketStreamState
is then used in the class CouchbaseReader
in order to resume the reading from Couchbase from the last saved published document.