- 
                Notifications
    You must be signed in to change notification settings 
- Fork 664
New Consumer API Design
This document is a draft of REST Proxy API changes to be made as part of supporting the "new" 0.9+ Java consumer. The "new" Java consumer is sufficiently different from the "old" consumer that it warrants REST API changes.
Note that this document is still very much in draft form. Many APIs haven't been specified yet. And some APIs listed here may change.
The "new" Java consumer will not replace the "old" Java consumer in existing REST API endpoints. Instead, a new consumer REST API will be introduced that is powered by the "new" Java consumer.
Exactly how versioning is done is TBD. One option is to introduce a version number in the url, eg /v2/... Another option is to use the content type. Perhaps there are other alternatiaves to consider, too.
POST /consumers/(string: group_name)
The request JSON object will contain the following configuration keys:
- name
- consumer-configs – forwarded to the Java consumer, eg max.poll.records
Response:
- instance_id
- base_uri – the REST proxy instance with the KafkaConsumer instance, where all subsequent poll and commit calls should be made
DELETE /consumers/(string: group_name)/instances/(string: instance_id)
POST /consumers/(string: group_name)/instances/(string: instance_id)/subscription
The request JSON object will contain the following configuration keys:
- topics – a list of topics, or:
- topic_pattern – a REGEX pattern
GET /consumers/(string: group_name)/instances/(string: instance_id)/subscription
DELETE /consumers/(string: group_name)/instances/(string: instance_id)/subscription
GET /consumers/(string: group_name)/instances/(string: instance_id)/records
timeout is a query parameter.
Response:
[
  {
    "topic": "foo",
    "key": "a2V5",
    "value": "Y29uZmx1ZW50",
    "partition": 1,
    "offset": 100
  }
]
POST /consumers/(string: group_name)/instances/(string: instance_id)/assignment
Request
[ {"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 1} ]
GET /consumers/(string: group_name)/instances/(string: instance_id)/assignment
Response
[ {"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 1} ]
POST /consumers/(string: group_name)/instances/(string: instance_id)/positions
Seek to an offset
Request
{"topic": "foo", "partition": 0, "offset": 12}
POST /consumers/(string: group_name)/instances/(string: instance_id)/positions/beginning
Seek to the beginning
Request
[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]
POST /consumers/(string: group_name)/instances/(string: instance_id)/positions/end
Seek to the end
Request
[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]
POST /consumers/(string: group_name)/instances/(string: instance_id)/offsets
Optional query parameter: async -- if present, commit asynchrously.
Commit offsets returned on the last .../records call for all the subscribed list of topics and partitions.
POST /consumers/(string: group_name)/instances/(string: instance_id)/offsets
Optional query parameter: async -- if present, commit asynchrously.
Commit specific offsets
Request
[{"topic": "foo", "partition": 0, "offset": 12}, {"topic": "bar", "partition": 1, "offset": 101}]
GET /consumers/(string: group_name)/instances/(string: instance_id)/offsets
Get the last committed offset for the given partition (whether the commit happened by this process or another).
Request
[{"topic": "foo", "partition": 0}, {"topic": "bar", "partition": 2}]
The following API endpoints still need to be spec'd and must be included in this new API. Note that several API endpoints (eg, committed) are not included and can come in later releases. Ordered roughly by importance/likelihood of implementation:
- close (DELETE)
- commit (sync/async/with or without a specified offset)
- seek (general, toBeginning, toEnd)
- assign