Skip to content

Commit b02b98e

Browse files
authored
fix: Remove ApiEndpoint::name to be consistent with other kinds of sinks (#2316)
* fix: Remove `ApiEndpoint::name` to be consistent with other kinds of sinks * fix e2e tests * fix compilation * fix test
1 parent a2a19f0 commit b02b98e

File tree

54 files changed

+288
-255
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+288
-255
lines changed

dozer-api/src/cache_builder/builder_impl.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ impl CacheBuilderImpl {
131131
LogOperation::Op { op } => match op {
132132
Operation::Delete { old } => {
133133
if let Some(meta) = self.building.delete(&old)? {
134-
if let Some((endpoint_name, operations_sender)) = operations_sender {
134+
if let Some((table_name, operations_sender)) = operations_sender {
135135
let operation = types_helper::map_delete_operation(
136-
endpoint_name.clone(),
136+
table_name.clone(),
137137
CacheRecord::new(meta.id, meta.version, old),
138138
);
139139
send_and_log_error(operations_sender, operation);
@@ -157,9 +157,9 @@ impl CacheBuilderImpl {
157157
);
158158
increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
159159

160-
if let Some((endpoint_name, operations_sender)) = operations_sender {
160+
if let Some((table_name, operations_sender)) = operations_sender {
161161
send_upsert_result(
162-
endpoint_name,
162+
table_name,
163163
operations_sender,
164164
result,
165165
&self.building.get_schema().0,
@@ -178,9 +178,9 @@ impl CacheBuilderImpl {
178178
);
179179
increment_counter!(CACHE_OPERATION_COUNTER_NAME, labels);
180180

181-
if let Some((endpoint_name, operations_sender)) = operations_sender {
181+
if let Some((table_name, operations_sender)) = operations_sender {
182182
send_upsert_result(
183-
endpoint_name,
183+
table_name,
184184
operations_sender,
185185
upsert_result,
186186
&self.building.get_schema().0,
@@ -200,9 +200,9 @@ impl CacheBuilderImpl {
200200

201201
for record in new {
202202
let upsert_result = self.building.insert(&record)?;
203-
if let Some((endpoint_name, operations_sender)) = operations_sender {
203+
if let Some((table_name, operations_sender)) = operations_sender {
204204
send_upsert_result(
205-
endpoint_name,
205+
table_name,
206206
operations_sender,
207207
upsert_result,
208208
&self.building.get_schema().0,
@@ -276,7 +276,7 @@ fn snapshotting_str(snapshotting: bool) -> &'static str {
276276
}
277277

278278
fn send_upsert_result(
279-
endpoint_name: &str,
279+
table_name: &str,
280280
operations_sender: &Sender<GrpcOperation>,
281281
upsert_result: UpsertResult,
282282
schema: &Schema,
@@ -286,7 +286,7 @@ fn send_upsert_result(
286286
match upsert_result {
287287
UpsertResult::Inserted { meta } => {
288288
let op = types_helper::map_insert_operation(
289-
endpoint_name.to_string(),
289+
table_name.to_string(),
290290
CacheRecord::new(meta.id, meta.version, new),
291291
);
292292
send_and_log_error(operations_sender, op);
@@ -303,7 +303,7 @@ fn send_upsert_result(
303303
record
304304
});
305305
let op = types_helper::map_update_operation(
306-
endpoint_name.to_string(),
306+
table_name.to_string(),
307307
CacheRecord::new(old_meta.id, old_meta.version, old),
308308
CacheRecord::new(new_meta.id, new_meta.version, new),
309309
);
@@ -365,7 +365,7 @@ mod tests {
365365

366366
fn test_endpoint_meta(log_id: String) -> EndpointMeta {
367367
EndpointMeta {
368-
name: Default::default(),
368+
table_name: Default::default(),
369369
log_id,
370370
schema: EndpointSchema {
371371
path: Default::default(),

dozer-api/src/cache_builder/endpoint_meta.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,23 @@ use crate::{cache_alias_and_labels, errors::ApiInitError};
99

1010
#[derive(Debug, Clone, PartialEq, Eq)]
1111
pub struct EndpointMeta {
12-
pub name: String,
12+
pub table_name: String,
1313
pub log_id: String,
1414
pub schema: EndpointSchema,
1515
}
1616

1717
impl EndpointMeta {
1818
pub async fn load_from_client(
1919
client: &mut InternalPipelineServiceClient<Channel>,
20-
endpoint: String,
20+
table_name: String,
2121
) -> Result<(Self, LogClient), ApiInitError> {
2222
// We establish the log stream first to avoid tonic auto-reconnecting without us knowing.
23-
let (log_client, schema) = LogClient::new(client, endpoint.clone()).await?;
23+
let (log_client, schema) = LogClient::new(client, table_name.clone()).await?;
2424
let log_id = client.get_id(()).await?.into_inner().id;
2525

2626
Ok((
2727
Self {
28-
name: endpoint,
28+
table_name,
2929
log_id,
3030
schema,
3131
},
@@ -34,12 +34,12 @@ impl EndpointMeta {
3434
}
3535

3636
pub fn cache_alias_and_labels(&self, extra_labels: Labels) -> (String, Labels) {
37-
let (alias, mut labels) = cache_alias_and_labels(self.name.clone());
37+
let (alias, mut labels) = cache_alias_and_labels(self.table_name.clone());
3838
labels.extend(extra_labels);
3939
(alias, labels)
4040
}
4141

4242
pub fn cache_name(&self) -> String {
43-
format!("{}_{}", self.log_id, self.name)
43+
format!("{}_{}", self.log_id, self.table_name)
4444
}
4545
}

dozer-api/src/cache_builder/mod.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ const READ_LOG_RETRY_INTERVAL: Duration = Duration::from_secs(1);
3333
#[derive(Debug)]
3434
pub struct CacheBuilder {
3535
client: InternalPipelineServiceClient<Channel>,
36-
endpoint: String,
36+
table_name: String,
3737
cache_manager: Arc<dyn RwCacheManager>,
3838
serving: Arc<ArcSwap<CacheReader>>,
3939
labels: Labels,
@@ -52,6 +52,7 @@ impl CacheBuilder {
5252
pub async fn new(
5353
cache_manager: Arc<dyn RwCacheManager>,
5454
app_server_url: String,
55+
table_name: String,
5556
endpoint: &ApiEndpoint,
5657
labels: LabelsAndProgress,
5758
) -> Result<(Self, EndpointSchema), ApiInitError> {
@@ -63,7 +64,7 @@ impl CacheBuilder {
6364
error,
6465
})?;
6566
let (endpoint_meta, _) =
66-
EndpointMeta::load_from_client(&mut client, endpoint.name.clone()).await?;
67+
EndpointMeta::load_from_client(&mut client, table_name.clone()).await?;
6768

6869
// Open or create cache.
6970
let cache_write_options = cache_write_options(endpoint.conflict_resolution);
@@ -74,14 +75,15 @@ impl CacheBuilder {
7475
cache_write_options,
7576
)
7677
.map_err(ApiInitError::OpenOrCreateCache)?;
77-
let progress_bar = labels.create_progress_bar(format!("cache: {}", endpoint_meta.name));
78+
let progress_bar =
79+
labels.create_progress_bar(format!("cache: {}", endpoint_meta.table_name));
7880

7981
let log_reader_options = get_log_reader_options(endpoint);
8082

8183
Ok((
8284
Self {
8385
client,
84-
endpoint: endpoint.name.clone(),
86+
table_name,
8587
cache_manager,
8688
serving: Arc::new(ArcSwap::from_pointee(serving)),
8789
labels: labels.labels().clone(),
@@ -106,7 +108,7 @@ impl CacheBuilder {
106108
loop {
107109
// Connect to the endpoint's log.
108110
let Some(connect_result) = runtime.block_on(with_cancel(
109-
connect_until_success(&mut self.client, &self.endpoint),
111+
connect_until_success(&mut self.client, &self.table_name),
110112
cancel,
111113
)) else {
112114
return Ok(());

dozer-api/src/generator/oapi/generator.rs

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@ use serde_json::{json, Value};
1212
pub struct OpenApiGenerator<'a> {
1313
schema: &'a dozer_types::types::Schema,
1414
secondary_indexes: &'a [IndexDefinition],
15+
table_name: String,
1516
endpoint: ApiEndpoint,
1617
server_host: Vec<String>,
1718
}
1819
impl<'a> OpenApiGenerator<'a> {
1920
fn get_singular_name(&self) -> String {
20-
self.endpoint.name.to_string()
21+
self.table_name.to_string()
2122
}
2223
fn get_plural_name(&self) -> String {
23-
format!("{}_array", self.endpoint.name)
24+
format!("{}_array", self.table_name)
2425
}
2526

2627
// Generate first secondary_index as an example
@@ -80,18 +81,18 @@ impl<'a> OpenApiGenerator<'a> {
8081
let responses = Responses {
8182
responses: indexmap::indexmap! {
8283
StatusCode::Code(200) =>
83-
ReferenceOr::Item(create_reference_response(format!("Get by id {}", self.endpoint.name), format!("#/components/schemas/{}", self.get_singular_name())))
84+
ReferenceOr::Item(create_reference_response(format!("Get by id {}", self.table_name), format!("#/components/schemas/{}", self.get_singular_name())))
8485
},
8586
..Default::default()
8687
};
8788
let get_operation = Some(Operation {
88-
tags: vec![format!("{}", self.endpoint.name)],
89+
tags: vec![format!("{}", self.table_name)],
8990
summary: Some("Fetch a single document record by primary key".to_owned()),
9091
description: Some(
9192
"Generated API to fetch a single record. Primary key specified will be used for lookup"
9293
.to_owned(),
9394
),
94-
operation_id: Some(format!("{}-by-id", self.endpoint.name)),
95+
operation_id: Some(format!("{}-by-id", self.table_name)),
9596
parameters: vec![ReferenceOr::Item(Parameter::Path {
9697
parameter_data: ParameterData {
9798
name: "id".to_owned(),
@@ -123,17 +124,17 @@ impl<'a> OpenApiGenerator<'a> {
123124
fn generate_list_route(&self) -> ReferenceOr<PathItem> {
124125
let responses = Responses {
125126
responses: indexmap::indexmap! {
126-
StatusCode::Code(200) => ReferenceOr::Item(create_reference_response(format!("A page array of {}", self.endpoint.name.to_owned()), format!("#/components/schemas/{}",self.get_plural_name())))
127+
StatusCode::Code(200) => ReferenceOr::Item(create_reference_response(format!("A page array of {}", self.table_name.to_owned()), format!("#/components/schemas/{}",self.get_plural_name())))
127128
},
128129
..Default::default()
129130
};
130131
let operation = Some(Operation {
131-
tags: vec![format!("{}", self.endpoint.name)],
132+
tags: vec![format!("{}", self.table_name)],
132133
summary: Some("Fetch multiple documents in the default sort order".to_owned()),
133134
description: Some(
134135
"This is used when no filter expression or sort is needed.".to_owned(),
135136
),
136-
operation_id: Some(format!("list-{}", self.endpoint.name.to_owned())),
137+
operation_id: Some(format!("list-{}", self.table_name.to_owned())),
137138
responses,
138139
..Default::default()
139140
});
@@ -170,10 +171,10 @@ impl<'a> OpenApiGenerator<'a> {
170171
..Default::default()
171172
};
172173
let operation = Some(Operation {
173-
tags: vec![format!("{}", self.endpoint.name)],
174+
tags: vec![format!("{}", self.table_name)],
174175
summary: Some("Count documents based on an expression".to_string()),
175176
description: Some("Count documents based on an expression".to_string()),
176-
operation_id: Some(format!("count-{}", self.endpoint.name)),
177+
operation_id: Some(format!("count-{}", self.table_name)),
177178
request_body: Some(ReferenceOr::Item(request_body)),
178179
responses,
179180
..Default::default()
@@ -194,17 +195,17 @@ impl<'a> OpenApiGenerator<'a> {
194195
};
195196
let responses = Responses {
196197
responses: indexmap::indexmap! {
197-
StatusCode::Code(200) => ReferenceOr::Item(create_reference_response(format!("A page array of {}", self.endpoint.name.to_owned()), format!("#/components/schemas/{}", self.get_plural_name()) ))
198+
StatusCode::Code(200) => ReferenceOr::Item(create_reference_response(format!("A page array of {}", self.table_name.to_owned()), format!("#/components/schemas/{}", self.get_plural_name()) ))
198199
},
199200
..Default::default()
200201
};
201202
let operation = Some(Operation {
202-
tags: vec![format!("{}", self.endpoint.name)],
203+
tags: vec![format!("{}", self.table_name)],
203204
summary: Some("Query documents based on an expression".to_owned()),
204205
description: Some(
205206
"Documents can be queried based on a simple or a composite expression".to_owned(),
206207
),
207-
operation_id: Some(format!("query-{}", self.endpoint.name)),
208+
operation_id: Some(format!("query-{}", self.table_name)),
208209
request_body: Some(ReferenceOr::Item(request_body)),
209210
responses,
210211
..Default::default()
@@ -234,13 +235,13 @@ impl<'a> OpenApiGenerator<'a> {
234235

235236
fn generate_component_schema(&self) -> Components {
236237
let generated_schema =
237-
convert_cache_to_oapi_schema(self.schema.to_owned(), &self.endpoint.name);
238+
convert_cache_to_oapi_schema(self.schema.to_owned(), &self.table_name);
238239

239240
let schemas = indexmap::indexmap! {
240241
self.get_singular_name() => ReferenceOr::Item(generated_schema),
241242
self.get_plural_name() => ReferenceOr::Item(Schema {
242243
schema_data: SchemaData {
243-
description: Some(format!("Array of {}", &self.endpoint.name)),
244+
description: Some(format!("Array of {}", &self.table_name)),
244245
..Default::default()
245246
},
246247
schema_kind: SchemaKind::Type(Type::Array(ArrayType {
@@ -267,17 +268,17 @@ impl<'a> OpenApiGenerator<'a> {
267268
OpenAPI {
268269
openapi: "3.0.0".to_owned(),
269270
info: Info {
270-
title: self.endpoint.name.to_uppercase(),
271+
title: self.table_name.to_uppercase(),
271272
description: Some(format!(
272273
"API documentation for {}. Powered by Dozer Data.",
273-
self.endpoint.name.to_lowercase()
274+
self.table_name.to_lowercase()
274275
)),
275276
version: "1.0.0".to_owned(),
276277
contact: create_contact_info(),
277278
..Default::default()
278279
},
279280
tags: vec![Tag {
280-
name: self.endpoint.name.to_string(),
281+
name: self.table_name.to_string(),
281282
..Default::default()
282283
}],
283284
servers: self
@@ -297,12 +298,14 @@ impl<'a> OpenApiGenerator<'a> {
297298
pub fn new(
298299
schema: &'a dozer_types::types::Schema,
299300
secondary_indexes: &'a [IndexDefinition],
301+
table_name: String,
300302
endpoint: ApiEndpoint,
301303
server_host: Vec<String>,
302304
) -> Self {
303305
Self {
304306
schema,
305307
secondary_indexes,
308+
table_name,
306309
endpoint,
307310
server_host,
308311
}

0 commit comments

Comments
 (0)