Skip to content

Commit d46c641

Browse files
committed
[Ingress-Kafka] Refactor to use ingestion-client
Summary: Refactor ingress-kafka to leverage on `ingestion-client` implementation. This replaces the previous direct write to bifrost which allows: - Batching, which increases throughput - PP becomes the sole writer of its logs (WIP #3965)
1 parent 525854a commit d46c641

File tree

26 files changed

+2423
-307
lines changed

26 files changed

+2423
-307
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/encoding/src/common.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,55 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use bilrost::{
12+
DecodeErrorKind,
13+
encoding::{EmptyState, ForOverwrite, Proxiable},
14+
};
1115
use restate_encoding_derive::BilrostNewType;
1216

13-
use crate::NetSerde;
17+
use crate::{NetSerde, bilrost_encodings::RestateEncoding};
18+
19+
struct U128Tag;
20+
21+
impl Proxiable<U128Tag> for u128 {
22+
type Proxy = (u64, u64);
23+
24+
fn encode_proxy(&self) -> Self::Proxy {
25+
((*self >> 64) as u64, *self as u64)
26+
}
27+
28+
fn decode_proxy(&mut self, proxy: Self::Proxy) -> Result<(), DecodeErrorKind> {
29+
*self = (proxy.0 as u128) << 64 | proxy.1 as u128;
30+
Ok(())
31+
}
32+
}
33+
34+
impl ForOverwrite<RestateEncoding, u128> for () {
35+
fn for_overwrite() -> u128 {
36+
0
37+
}
38+
}
39+
40+
impl EmptyState<RestateEncoding, u128> for () {
41+
fn empty() -> u128 {
42+
0
43+
}
44+
45+
fn is_empty(val: &u128) -> bool {
46+
*val == 0
47+
}
48+
49+
fn clear(val: &mut u128) {
50+
*val = 0;
51+
}
52+
}
53+
54+
bilrost::delegate_proxied_encoding!(
55+
use encoding (::bilrost::encoding::General)
56+
to encode proxied type (u128)
57+
using proxy tag (U128Tag)
58+
with encoding (RestateEncoding)
59+
);
1460

1561
/// A Bilrost compatible U128 type.
1662
#[derive(Debug, Clone, Copy, PartialEq, Eq, BilrostNewType)]

crates/ingress-kafka/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ restate-workspace-hack = { workspace = true }
2020

2121
restate-bifrost = { workspace = true }
2222
restate-core = { workspace = true }
23+
restate-ingestion-client = { workspace = true }
2324
restate-storage-api = { workspace = true }
2425
restate-timer-queue = { workspace = true }
2526
restate-types = { workspace = true }
@@ -30,6 +31,7 @@ anyhow = { workspace = true }
3031
base64 = { workspace = true }
3132
bytes = { workspace = true }
3233
derive_more = { workspace = true }
34+
futures = { workspace = true }
3335
metrics = { workspace = true }
3436
opentelemetry = { workspace = true }
3537
opentelemetry_sdk = { workspace = true }
@@ -44,8 +46,9 @@ thiserror = { workspace = true }
4446
tokio = { workspace = true, features = ["sync", "rt"] }
4547
tracing = { workspace = true }
4648
tracing-opentelemetry = { workspace = true }
49+
xxhash-rust = { workspace = true, features = ["xxh3", "std"] }
4750

4851
[dev-dependencies]
4952
restate-types = { workspace = true, features = ["test-util"] }
5053

51-
base64 = { workspace = true }
54+
base64 = { workspace = true }

0 commit comments

Comments
 (0)