Skip to content

Commit b6c4de1

Browse files
authored
fix: Initialize rustls's CryptoProvider early in the code (#2312)
Signed-off-by: Sreekanth <[email protected]>
1 parent 5f5af1b commit b6c4de1

File tree

8 files changed

+16
-11
lines changed

8 files changed

+16
-11
lines changed

rust/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/numaflow-core/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ edition = "2021"
66
[features]
77
nats-tests = []
88
pulsar-tests = []
9-
all-tests = ["nats-tests", "pulsar-tests"]
9+
redis-tests = []
10+
all-tests = ["nats-tests", "pulsar-tests", "redis-tests"]
1011

1112
[lints]
1213
workspace = true

rust/numaflow-core/src/source/serving.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,18 @@ mod tests {
139139
}
140140
}
141141

142+
#[cfg(feature = "redis-tests")]
142143
#[tokio::test]
143144
async fn test_serving_source_reader_acker() -> Result<()> {
144145
let settings = Settings {
145146
app_listen_port: 2000,
146147
..Default::default()
147148
};
148149
let settings = Arc::new(settings);
150+
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
151+
// ServingSource starts an Axum HTTPS server in the background. Rustls is used to generate
152+
// self-signed certs when starting the server.
153+
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
149154
let mut serving_source = ServingSource::new(
150155
Arc::clone(&settings),
151156
10,

rust/numaflow/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ numaflow-models.workspace = true
1414
backoff.workspace = true
1515
tokio.workspace = true
1616
tracing.workspace = true
17+
rustls.workspace = true
1718
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

rust/numaflow/src/main.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
1919
)
2020
.with(tracing_subscriber::fmt::layer().with_ansi(false))
2121
.init();
22+
23+
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
24+
rustls::crypto::aws_lc_rs::default_provider()
25+
.install_default()
26+
.expect("Installing default CryptoProvider");
27+
2228
if let Err(e) = run().await {
2329
error!("{e:?}");
2430
return Err(e);

rust/serving/src/app/jetstream_proxy.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,13 @@ use crate::{app::callback::state, Message, MessageWrapper};
3232
// "from_vertex": "a"
3333
// }
3434

35-
const CALLBACK_URL_KEY: &str = "X-Numaflow-Callback-Url";
3635
const NUMAFLOW_RESP_ARRAY_LEN: &str = "Numaflow-Array-Len";
3736
const NUMAFLOW_RESP_ARRAY_IDX_LEN: &str = "Numaflow-Array-Index-Len";
3837

3938
struct ProxyState<T> {
4039
message: mpsc::Sender<MessageWrapper>,
4140
tid_header: String,
4241
callback: state::State<T>,
43-
callback_url: String,
4442
}
4543

4644
pub(crate) async fn jetstream_proxy<T: Clone + Send + Sync + Store + 'static>(
@@ -50,10 +48,6 @@ pub(crate) async fn jetstream_proxy<T: Clone + Send + Sync + Store + 'static>(
5048
message: state.message.clone(),
5149
tid_header: state.settings.tid_header.clone(),
5250
callback: state.callback_state.clone(),
53-
callback_url: format!(
54-
"https://{}:{}/v1/process/callback",
55-
state.settings.host_ip, state.settings.app_listen_port
56-
),
5751
});
5852

5953
let router = Router::new()

rust/serving/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ pub(crate) async fn serve<T>(
3939
where
4040
T: Clone + Send + Sync + Store + 'static,
4141
{
42-
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
43-
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
44-
4542
let (cert, key) = generate_certs()?;
4643

4744
let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())

rust/serving/src/pipeline.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ pub(crate) struct Edge {
6565
/// DCG (directed compute graph) of the pipeline with minimal information build using vertices and edges
6666
/// from the pipeline spec
6767
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
68-
pub(crate) struct PipelineDCG {
68+
pub struct PipelineDCG {
6969
pub(crate) vertices: Vec<Vertex>,
7070
pub(crate) edges: Vec<Edge>,
7171
}

0 commit comments

Comments
 (0)