Skip to content

Commit 625f9c1

Browse files
committed
[Ingress-Kafka] Refactor to use ingestion-client
Summary: Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces the previous direct write to bifrost which allows: - Batching, which increases throughput - PP becomes the sole writer of its logs (WIP #3965)
1 parent 2628c79 commit 625f9c1

File tree

26 files changed

+2421
-309
lines changed

26 files changed

+2421
-309
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/encoding/src/common.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,55 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use bilrost::{
12+
DecodeErrorKind,
13+
encoding::{EmptyState, ForOverwrite, Proxiable},
14+
};
1115
use restate_encoding_derive::BilrostNewType;
1216

13-
use crate::NetSerde;
17+
use crate::{NetSerde, bilrost_encodings::RestateEncoding};
18+
19+
struct U128Tag;
20+
21+
impl Proxiable<U128Tag> for u128 {
22+
type Proxy = (u64, u64);
23+
24+
fn encode_proxy(&self) -> Self::Proxy {
25+
((*self >> 64) as u64, *self as u64)
26+
}
27+
28+
fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> {
29+
*self = (proxy.0 as u128) << 64 | proxy.1 as u128;
30+
Ok(())
31+
}
32+
}
33+
34+
impl ForOverwrite<RestateEncoding, u128> for () {
35+
fn for_overwrite() -> u128 {
36+
0
37+
}
38+
}
39+
40+
impl EmptyState<RestateEncoding, u128> for () {
41+
fn empty() -> u128 {
42+
0
43+
}
44+
45+
fn is_empty(val: &u128) -> bool {
46+
*val == 0
47+
}
48+
49+
fn clear(val: &mut u128) {
50+
*val = 0;
51+
}
52+
}
53+
54+
bilrost::delegate_proxied_encoding!(
55+
use encoding (::bilrost::encoding::General)
56+
to encode proxied type (u128)
57+
using proxy tag (U128Tag)
58+
with encoding (RestateEncoding)
59+
);
1460

1561
/// A Bilrost compatible U128 type.
1662
#[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)]

crates/ingress-kafka/Cargo.toml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ restate-workspace-hack = { workspace = true }
2020

2121
restate-bifrost = { workspace = true }
2222
restate-core = { workspace = true }
23+
restate-ingestion-client = { workspace = true }
2324
restate-serde-util = { workspace = true }
2425
restate-storage-api = { workspace = true }
2526
restate-timer-queue = { workspace = true }
@@ -31,6 +32,7 @@ anyhow = { workspace = true }
3132
base64 = { workspace = true }
3233
bytes = { workspace = true }
3334
derive_more = { workspace = true }
35+
futures = { workspace = true }
3436
metrics = { workspace = true }
3537
opentelemetry = { workspace = true }
3638
opentelemetry_sdk = { workspace = true }
@@ -39,14 +41,19 @@ parking_lot = { workspace = true }
3941
# https://github.com/fede1024/rust-rdkafka/pull/803. The PR bumps librdkafka to 2.12.1 and enables WITH_CURL for
4042
# librdkafka if the feature curl-static is enabled. Additionally, it cherry-picks https://github.com/confluentinc/librdkafka/pull/5182
4143
# which prevents pulling in curl if it is not activated. The additional fixes in fix-build-script fix the musl build.
42-
rdkafka = { version = "0.38", git = "https://github.com/restatedev/rust-rdkafka.git", rev = "e92cad90eff797a0dc29fa524cabb89b602ae234", features = ["libz-static", "cmake-build", "ssl-vendored"] }
44+
rdkafka = { version = "0.38", git = "https://github.com/restatedev/rust-rdkafka.git", rev = "e92cad90eff797a0dc29fa524cabb89b602ae234", features = [
45+
"libz-static",
46+
"cmake-build",
47+
"ssl-vendored",
48+
] }
4349
schemars = { workspace = true, optional = true }
4450
thiserror = { workspace = true }
4551
tokio = { workspace = true, features = ["sync", "rt"] }
4652
tracing = { workspace = true }
4753
tracing-opentelemetry = { workspace = true }
54+
xxhash-rust = { workspace = true, features = ["xxh3", "std"] }
4855

4956
[dev-dependencies]
5057
restate-types = { workspace = true, features = ["test-util"] }
5158

52-
base64 = { workspace = true }
59+
base64 = { workspace = true }

0 commit comments

Comments
 (0)