Skip to content

Conversation

@muhamadazmy
Copy link
Contributor

@muhamadazmy muhamadazmy commented Nov 12, 2025

[Ingress-Kafka] Refactor to use ingestion-client

Summary:
Refactor ingress-kafka to leverage on ingestion-client implementation. This replaces
the previous direct write to bifrost which allows:


Stack created with Sapling. Best reviewed with ReviewStack.

@muhamadazmy muhamadazmy force-pushed the pr3974 branch 2 times, most recently from 384928a to 45e2daf Compare November 12, 2025 09:57
@muhamadazmy muhamadazmy marked this pull request as ready for review November 12, 2025 10:53
@muhamadazmy muhamadazmy force-pushed the pr3974 branch 5 times, most recently from 36539bf to eb4dffa Compare November 13, 2025 08:35
@muhamadazmy muhamadazmy changed the title [Ingress-Kafka] Refactor to use ingress-core [Ingress-Kafka] Refactor to use ingress-client Nov 13, 2025
@muhamadazmy muhamadazmy force-pushed the pr3974 branch 4 times, most recently from 2a6ae6f to 3aa9219 Compare November 13, 2025 11:27
@muhamadazmy muhamadazmy force-pushed the pr3974 branch 5 times, most recently from 67091b8 to 95472a3 Compare November 17, 2025 08:24
@muhamadazmy muhamadazmy force-pushed the pr3974 branch 2 times, most recently from 107eab3 to 0e6a70f Compare December 11, 2025 08:43
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for updating this PR @muhamadazmy. I think we are really close to merging this PR. I left a few minor questions/comments. The main one being whether there can be a problem when running a mixed v1.5 and v1.6 cluster and what happens to the legacy consumer task that tries to get the dedup information from a v1.5 node.

Comment on lines 595 to 614
debug!(
"Error while looking up latest dedup info for {partition_id}: {err} .. retrying"
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the level escalate if it happens repeatedly?

/// fallback to retry every 2 seconds.
pub connection_retry_policy: RetryPolicy,

/// # Request Batch Size
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When should I tweak this value as a user? What's the impact on the system?

@v1gnesh
Copy link

v1gnesh commented Dec 19, 2025

Hey @muhamadazmy, glad to see work on Kafka ingress.
Linking to another PR #3976 (comment), where you said, "I managed to get kafka ingestion rate up to 8000 req/sec.".

Is there a bench setup you have (or one that is already in the restate repo) that I can use to measure peak throughput & system behaviour in my deployment of Restate (cluster)?

@muhamadazmy
Copy link
Contributor Author

muhamadazmy commented Dec 19, 2025

Dear @v1gnesh,

Just to clarify: PR #3976 only introduces the foundational piece (the ingestion client). The actual performance improvements to Kafka ingestion are implemented in this PR (#3975), which makes use of the new ingestion client.

That said, I’m happy to explain how I measure throughput.

Test setup:

  • Kafka: Single-node Kafka cluster using the Docker image confluentinc/cp-kafka:7.5.0
  • Monitoring: Prometheus + Grafana, with Prometheus scraping the Restate node at :5122
  • Test service: Any simple service or workflow will work (e.g., using the Restate TypeScript SDK or another language). The service I use simply echoes back its input. Note that ingestion is separate from invocation processing, ingestion only concerns accepting and persisting invocations, not executing them.
  • Restate: A single Restate node with the service registered and Kafka configured according to the documentation

I prepare a Kafka topic containing exactly 1 million invocations. This allows me to reset the consumer group ID and repeatedly measure throughput under different configurations.

Once the Kafka subscription is created, I monitor throughput in Grafana. The most relevant metrics are:

  • restate_kafka_ingress_requests_total useful for plotting total ingested invocations and ingestion rate
  • restate_partition_ingest_request_len a histogram tracking the number of records per batch
  • restate_partition_ingest_request_size_bytes a histogram tracking batch size in bytes

Note that using the new ingestion client is disable by default since it's still a new feature. To enable it explicitly you need to start restate server with RESTATE_INGRESS__INGESTION__LEGACY_INGESTION=false AFTER this PR is merged (or you can build it yourself of course)

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating this PR @muhamadazmy. The changes look good to me. I've left a comment for a possible follow-up where we could reconsider whether to use u128 for the ProducerId::Producer and a comment about the usage of silent modifications which I believe are dangerous to use and might result in incorrect behavior.

Comment on lines 454 to 455
"failed to query latest dedup \
sequence number for '{legacy_producer_id}': {status:?}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe include node_id.

Comment on lines +570 to 582
fn dedup_producer_id(
subscription: &SubscriptionId,
consumer_group: &str,
topic: &str,
partition: i32,
) -> u128 {
let mut hasher = xxhash_rust::xxh3::Xxh3::new();

subscription.hash(&mut hasher);
'\0'.hash(&mut hasher);
consumer_group.hash(&mut hasher);
'\0'.hash(&mut hasher);
topic.hash(&mut hasher);
'\0'.hash(&mut hasher);
partition.hash(&mut hasher);

hasher.digest128()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering whether u128 is actually an overkill for the ProducerId::Producer. How many producers could there be and could we generate the Kafka producer id based on the Schema (e.g. having a counter value that counts the number of subscriptions and every new subscription gets the next counter value)? If we wouldn't hash then decreasing the producer id value to a u64 should be more than sufficient. Of course, one would have to handle the different partitions which could be done by using a few bits of the u64 for the partition (I guess that 16 bist should be more than enough for the number of partitions).

Nothing we have to address right now but maybe something to double check before creating the v1.6 release.

pub enum ProducerId {
Partition(PartitionId),
Other(ByteString),
Producer(U128),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering whether u128 is really or whether u64 could be sufficient if we generated a deterministic subscription counter from the Schema, for example. Maybe something to consider before we release things.

Copy link
Contributor Author

@muhamadazmy muhamadazmy Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind using u128 is also to allow external producer ids in case we create a public ingestion API in the future. This way the user can use unique names for his external producer and we still can calculate deterministic producer ids with virtually infinite collision-free space, without the need to track producer ids.

@v1gnesh
Copy link

v1gnesh commented Dec 19, 2025

Thanks for taking the time for a detailed response, @muhamadazmy.
Soon, the kafka consumer lag metric will also be available & useful in this measurement.

I had tested a Restate cluster with 3 container nodes in kubernetes.
Each had 1 CPU and 6Gi RAM (75% for RocksDB), taking on no other work, and just dealing with a Kafka topic with 2M messages.
I could see it reach only ~60 RPS, and each of the nodes was ~20.
Wasn't doing any work in the handler either.

So I started messing with the rdkafka config params, and just when the subscription was created, the network activity was high (as I was fiddling with the initial req bytes kinda param), then it went back to ~60.

@muhamadazmy
Copy link
Contributor Author

@v1gnesh If you would like with this new ingestion mechanism you can always build the restate server binary from this branch directly.

@muhamadazmy muhamadazmy force-pushed the pr3974 branch 2 times, most recently from 8e592ee to 57ff349 Compare December 29, 2025 13:02
Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for updating this PR @muhamadazmy. Nice work! LGTM. +1 for merging after addressing my minor comments.

if let Some(key) = msg.key() {
headers.push(Header::new(
"kafka.key",
&*base64::prelude::BASE64_URL_SAFE.encode(key),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we base64 encoding the key when we require the key to be valid utf-8 here

std::str::from_utf8(&key)
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is retained as is from the legacy ingestion. Mainly to avoid unnecessary changes between the two versions.

Comment on lines 370 to 373
// this future will be aborted when the partition is no longer needed, so any exit is a failure
if let Err(err) = self.run_inner().await {
_ = self.failed.send(err);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment does not seem to be aligned with what is happening here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_inner() will only return on error. but i see why this can be confusing. I will change it so it always signal a failure if returned even with an Ok().

Comment on lines 446 to 447
"failed to query legacy dedup \
sequence number for producer '{legacy_producer_id}'"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the node_id to the error message as well.

Comment on lines 472 to 485
debug!(
restate.subscription.id = %self.builder.subscription().id(),
messaging.consumer.group.name = self.consumer_group_id,
"Starting topic '{}' partition '{}' consumption loop",
self.topic_partition.0,
self.topic_partition.1
);

let legacy_dedup_offset = self.legacy_dedup_offset().await;
debug!(
topic=%self.topic_partition.0, kafka_partition=%self.topic_partition.1,
consumer_group=%self.consumer_group_id,
"Legacy dedup offset: {legacy_dedup_offset:?}",
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to #[instrument()] run_inner and exposing the commonly used fields as part of the span instead of adding it to every log statement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, indeed

kafka_partition=%self.topic_partition.1,
consumer_group=%self.consumer_group_id,
offset=%offset,
"store kafka offset",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
"store kafka offset",
"Store kafka offset",

kafka_partition=%self.topic_partition.1,
consumer_group=%self.consumer_group_id,
offset=%offset,
"skipping kafka message (dedup)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we start log statements with a capital letter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make sure to follow the convention :)

self.consumer.store_offset(&self.topic_partition.0, self.topic_partition.1, offset)?;
},
Some(received) = consumer_stream.next() => {
let msg = received?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are exiting the consumption task here, does it make sense to try to drain inflight before exiting or wouldn't it matter because the offset wouldn't be stored on the Kafka broker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If consumer stream failed to receive next message it's probably due to network error, so big chance we will fail to update committed offset. But also, even if some inflight has already been ingested, they will get automatically deduped on next task restart.


/// # Experimental Kafka batch ingestion
///
/// Use the new experimental kafka ingestion path which leverage batching
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Use the new experimental kafka ingestion path which leverage batching
/// Use the new experimental kafka ingestion path which leverages batching

Comment on lines 564 to 566
if let Some(lsn) = &self.status.last_applied_log_lsn {
self.last_applied_log_lsn_watch.send_replace(*lsn);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we directly using lsn?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm 😅 I can't remember exactly, but obviously a mistake

@muhamadazmy muhamadazmy force-pushed the pr3974 branch 2 times, most recently from d46c641 to 98c293f Compare January 6, 2026 08:48
Summary:
Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces
the previous direct write to bifrost which allows:
- Batching, which increases throughput
- PP becomes the sole writer of its logs (WIP restatedev#3965)
@muhamadazmy muhamadazmy merged commit 7aa55dc into restatedev:main Jan 6, 2026
58 checks passed
@muhamadazmy muhamadazmy deleted the pr3974 branch January 6, 2026 09:39
@github-actions github-actions bot locked and limited conversation to collaborators Jan 6, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

New ingress API

3 participants