Skip to content

Commit 8829153

Browse files
committed
[Ingress-Kafka] Refactor to use ingestion-client
Summary: Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces the previous direct write to bifrost which allows: - Batching, which increases throughput - PP becomes the sole writer of its logs (WIP #3965)
1 parent 384853c commit 8829153

File tree

26 files changed

+2439
-335
lines changed

26 files changed

+2439
-335
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/encoding/src/common.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,55 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use bilrost::{
12+
DecodeErrorKind,
13+
encoding::{EmptyState, ForOverwrite, Proxiable},
14+
};
1115
use restate_encoding_derive::BilrostNewType;
1216

13-
use crate::NetSerde;
17+
use crate::{NetSerde, bilrost_encodings::RestateEncoding};
18+
19+
struct U128Tag;
20+
21+
impl Proxiable<U128Tag> for u128 {
22+
type Proxy = (u64, u64);
23+
24+
fn encode_proxy(&self) -> Self::Proxy {
25+
((*self >> 64) as u64, *self as u64)
26+
}
27+
28+
fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> {
29+
*self = (proxy.0 as u128) << 64 | proxy.1 as u128;
30+
Ok(())
31+
}
32+
}
33+
34+
impl ForOverwrite<RestateEncoding, u128> for () {
35+
fn for_overwrite() -> u128 {
36+
0
37+
}
38+
}
39+
40+
impl EmptyState<RestateEncoding, u128> for () {
41+
fn empty() -> u128 {
42+
0
43+
}
44+
45+
fn is_empty(val: &u128) -> bool {
46+
*val == 0
47+
}
48+
49+
fn clear(val: &mut u128) {
50+
*val = 0;
51+
}
52+
}
53+
54+
bilrost::delegate_proxied_encoding!(
55+
use encoding (::bilrost::encoding::General)
56+
to encode proxied type (u128)
57+
using proxy tag (U128Tag)
58+
with encoding (RestateEncoding)
59+
);
1460

1561
/// A Bilrost compatible U128 type.
1662
#[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)]

crates/ingress-kafka/Cargo.toml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ license.workspace = true
88
publish = false
99

1010
[features]
11+
# todo(azmy): remove "legacy-ingestion" from default features
12+
# in v1.7.0
1113
default = []
1214
options_schema = ["dep:schemars"]
15+
# legacy-ingestion introduced in v1.6.0
16+
legacy-ingestion = []
1317
# support for OIDC based authentication via the Kafka consumer, building with rdkafka/curl-static does not work as it
1418
# fails with "SSL certificate OpenSSL verify result: unable to get local issuer certificate (20) (-1)" when trying to
1519
# obtain the OIDC token.
@@ -20,6 +24,7 @@ restate-workspace-hack = { workspace = true }
2024

2125
restate-bifrost = { workspace = true }
2226
restate-core = { workspace = true }
27+
restate-ingestion-client = { workspace = true }
2328
restate-serde-util = { workspace = true }
2429
restate-storage-api = { workspace = true }
2530
restate-timer-queue = { workspace = true }
@@ -31,6 +36,7 @@ anyhow = { workspace = true }
3136
base64 = { workspace = true }
3237
bytes = { workspace = true }
3338
derive_more = { workspace = true }
39+
futures = { workspace = true }
3440
metrics = { workspace = true }
3541
opentelemetry = { workspace = true }
3642
opentelemetry_sdk = { workspace = true }
@@ -39,14 +45,19 @@ parking_lot = { workspace = true }
3945
# https://github.com/fede1024/rust-rdkafka/pull/803. The PR bumps librdkafka to 2.12.1 and enables WITH_CURL for
4046
# librdkafka if the feature curl-static is enabled. Additionally, it cherry-picks https://github.com/confluentinc/librdkafka/pull/5182
4147
# which prevents pulling in curl if it is not activated. The additional fixes in fix-build-script fix the musl build.
42-
rdkafka = { version = "0.38", git = "https://github.com/restatedev/rust-rdkafka.git", rev = "e92cad90eff797a0dc29fa524cabb89b602ae234", features = ["libz-static", "cmake-build", "ssl-vendored"] }
48+
rdkafka = { version = "0.38", git = "https://github.com/restatedev/rust-rdkafka.git", rev = "e92cad90eff797a0dc29fa524cabb89b602ae234", features = [
49+
"libz-static",
50+
"cmake-build",
51+
"ssl-vendored",
52+
] }
4353
schemars = { workspace = true, optional = true }
4454
thiserror = { workspace = true }
4555
tokio = { workspace = true, features = ["sync", "rt"] }
4656
tracing = { workspace = true }
4757
tracing-opentelemetry = { workspace = true }
58+
xxhash-rust = { workspace = true, features = ["xxh3", "std"] }
4859

4960
[dev-dependencies]
5061
restate-types = { workspace = true, features = ["test-util"] }
5162

52-
base64 = { workspace = true }
63+
base64 = { workspace = true }

0 commit comments

Comments
 (0)