Skip to content

Commit 834ce89

Browse files
committed
chore: remove unnecessary Arc for schema provider
1 parent 5525afb commit 834ce89

File tree

2 files changed

+29
-56
lines changed

2 files changed

+29
-56
lines changed

lib/codecs/src/encoding/format/arrow.rs

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,6 @@ pub struct ArrowStreamSerializerConfig {
5555
#[serde(default)]
5656
#[configurable(metadata(docs::examples = true))]
5757
pub allow_nullable_fields: bool,
58-
59-
/// Schema provider for lazy schema loading.
60-
#[serde(skip)]
61-
schema_provider: Option<Arc<dyn SchemaProvider>>,
6258
}
6359

6460
impl std::fmt::Debug for ArrowStreamSerializerConfig {
@@ -72,10 +68,6 @@ impl std::fmt::Debug for ArrowStreamSerializerConfig {
7268
.map(|s| format!("{} fields", s.fields().len())),
7369
)
7470
.field("allow_nullable_fields", &self.allow_nullable_fields)
75-
.field(
76-
"schema_provider",
77-
&self.schema_provider.as_ref().map(|_| "<provider>"),
78-
)
7971
.finish()
8072
}
8173
}
@@ -86,37 +78,20 @@ impl ArrowStreamSerializerConfig {
8678
Self {
8779
schema: Some(schema),
8880
allow_nullable_fields: false,
89-
schema_provider: None,
9081
}
9182
}
9283

93-
/// Create a new ArrowStreamSerializerConfig with a schema provider
94-
pub fn with_provider(mut self, provider: Arc<dyn SchemaProvider>) -> Self {
95-
self.schema = None;
96-
self.schema_provider = Some(provider);
97-
self
98-
}
99-
100-
/// Get the schema provider if one was configured
101-
pub fn provider(&self) -> Option<&Arc<dyn SchemaProvider>> {
102-
self.schema_provider.as_ref()
103-
}
104-
105-
/// Resolve the schema from the provider if present.
106-
pub async fn resolve(&mut self) -> Result<(), ArrowEncodingError> {
107-
// If schema already exists, nothing to do
108-
if self.schema.is_some() {
109-
return Ok(());
110-
}
111-
112-
// Fetch from provider if available
113-
if let Some(provider) = &self.schema_provider {
114-
let schema = provider.get_schema().await?;
115-
self.schema = Some(schema);
116-
Ok(())
117-
} else {
118-
Err(ArrowEncodingError::NoSchemaProvided)
119-
}
84+
/// Resolve the schema from a provider.
85+
///
86+
/// This fetches the schema from the provider and stores it in this config,
87+
/// preserving all other configuration fields (like `allow_nullable_fields`).
88+
pub async fn resolve_with_provider(
89+
&mut self,
90+
provider: &dyn SchemaProvider,
91+
) -> Result<(), ArrowEncodingError> {
92+
let schema = provider.get_schema().await?;
93+
self.schema = Some(schema);
94+
Ok(())
12095
}
12196

12297
/// The data type of events that are accepted by `ArrowStreamEncoder`.

src/sinks/clickhouse/config.rs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Configuration for the `Clickhouse` sink.
22
3-
use std::{fmt, sync::Arc};
3+
use std::fmt;
44

55
use http::{Request, StatusCode, Uri};
66
use hyper::Body;
@@ -290,21 +290,20 @@ impl ClickhouseConfig {
290290
.into());
291291
}
292292

293-
let base_arrow_config = match batch_encoding {
293+
let mut arrow_config = match batch_encoding {
294294
BatchSerializerConfig::ArrowStream(config) => config.clone(),
295295
};
296296

297-
let resolved_config = self
298-
.resolve_arrow_schema(
299-
client,
300-
endpoint.to_string(),
301-
database,
302-
auth,
303-
base_arrow_config,
304-
)
305-
.await?;
297+
self.resolve_arrow_schema(
298+
client,
299+
endpoint.to_string(),
300+
database,
301+
auth,
302+
&mut arrow_config,
303+
)
304+
.await?;
306305

307-
let resolved_batch_config = BatchSerializerConfig::ArrowStream(resolved_config);
306+
let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config);
308307
let arrow_serializer = resolved_batch_config.build()?;
309308
let batch_serializer = BatchSerializer::Arrow(arrow_serializer);
310309
let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer));
@@ -326,8 +325,8 @@ impl ClickhouseConfig {
326325
endpoint: String,
327326
database: &Template,
328327
auth: Option<&Auth>,
329-
base_config: ArrowStreamSerializerConfig,
330-
) -> crate::Result<ArrowStreamSerializerConfig> {
328+
config: &mut ArrowStreamSerializerConfig,
329+
) -> crate::Result<()> {
331330
use super::arrow;
332331

333332
if self.table.is_dynamic() || database.is_dynamic() {
@@ -345,16 +344,15 @@ impl ClickhouseConfig {
345344
database_str, table_str
346345
);
347346

348-
let provider = Arc::new(arrow::ClickHouseSchemaProvider::new(
347+
let provider = arrow::ClickHouseSchemaProvider::new(
349348
client.clone(),
350349
endpoint,
351350
database_str.to_string(),
352351
table_str.to_string(),
353352
auth.cloned(),
354-
));
353+
);
355354

356-
let mut arrow_config = base_config.with_provider(provider);
357-
arrow_config.resolve().await.map_err(|e| {
355+
config.resolve_with_provider(&provider).await.map_err(|e| {
358356
format!(
359357
"Failed to fetch schema for {}.{}: {}.",
360358
database_str, table_str, e
@@ -363,14 +361,14 @@ impl ClickhouseConfig {
363361

364362
debug!(
365363
"Successfully fetched Arrow schema with {} fields",
366-
arrow_config
364+
config
367365
.schema
368366
.as_ref()
369367
.map(|s| s.fields().len())
370368
.unwrap_or(0)
371369
);
372370

373-
Ok(arrow_config)
371+
Ok(())
374372
}
375373
}
376374

0 commit comments

Comments
 (0)