Skip to content

Commit 527a1c0

Browse files
shuiyisongevenyag
andauthored
fix: pipeline loading issue (GreptimeTeam#7491)
* fix: pipeline loading Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: change string to str Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: minor fix to save returned version Signed-off-by: shuiyisong <xixing.sys@gmail.com> * refactor: introduce PipelineContent Signed-off-by: shuiyisong <xixing.sys@gmail.com> * fix: use found schema Signed-off-by: shuiyisong <xixing.sys@gmail.com> * chore: update CR Co-authored-by: Yingwen <realevenyag@gmail.com> * chore: CR issue Signed-off-by: shuiyisong <xixing.sys@gmail.com> --------- Signed-off-by: shuiyisong <xixing.sys@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
1 parent 7e24363 commit 527a1c0

File tree

4 files changed

+118
-77
lines changed

4 files changed

+118
-77
lines changed

src/pipeline/src/error.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -609,10 +609,14 @@ pub enum Error {
609609
},
610610

611611
#[snafu(display(
612-
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. schemas: {}",
613-
schemas
612+
"Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
613+
name,
614+
current_schema,
615+
schemas,
614616
))]
615617
MultiPipelineWithDiffSchema {
618+
name: String,
619+
current_schema: String,
616620
schemas: String,
617621
#[snafu(implicit)]
618622
location: Location,

src/pipeline/src/manager/pipeline_cache.rs

Lines changed: 52 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use std::sync::Arc;
1616
use std::time::Duration;
1717

18+
use common_telemetry::debug;
1819
use datatypes::timestamp::TimestampNanosecond;
1920
use moka::sync::Cache;
2021

@@ -33,10 +34,18 @@ const PIPELINES_CACHE_TTL: Duration = Duration::from_secs(10);
3334
/// to encapsulate inner cache. Only public methods are exposed.
3435
pub(crate) struct PipelineCache {
3536
pipelines: Cache<String, Arc<Pipeline>>,
36-
original_pipelines: Cache<String, (String, TimestampNanosecond)>,
37+
original_pipelines: Cache<String, PipelineContent>,
3738
/// If the pipeline table is invalid, we can use this cache to prevent failures when writing logs through the pipeline
3839
/// The failover cache never expires, but it will be updated when the pipelines cache is updated.
39-
failover_cache: Cache<String, (String, TimestampNanosecond)>,
40+
failover_cache: Cache<String, PipelineContent>,
41+
}
42+
43+
#[derive(Clone, Debug, PartialEq, Eq)]
44+
pub struct PipelineContent {
45+
pub name: String,
46+
pub content: String,
47+
pub version: TimestampNanosecond,
48+
pub schema: String,
4049
}
4150

4251
impl PipelineCache {
@@ -45,12 +54,17 @@ impl PipelineCache {
4554
pipelines: Cache::builder()
4655
.max_capacity(PIPELINES_CACHE_SIZE)
4756
.time_to_live(PIPELINES_CACHE_TTL)
57+
.name("pipelines")
4858
.build(),
4959
original_pipelines: Cache::builder()
5060
.max_capacity(PIPELINES_CACHE_SIZE)
5161
.time_to_live(PIPELINES_CACHE_TTL)
62+
.name("original_pipelines")
63+
.build(),
64+
failover_cache: Cache::builder()
65+
.max_capacity(PIPELINES_CACHE_SIZE)
66+
.name("failover_cache")
5267
.build(),
53-
failover_cache: Cache::builder().max_capacity(PIPELINES_CACHE_SIZE).build(),
5468
}
5569
}
5670

@@ -72,28 +86,24 @@ impl PipelineCache {
7286
);
7387
}
7488

75-
pub(crate) fn insert_pipeline_str_cache(
76-
&self,
77-
schema: &str,
78-
name: &str,
79-
version: PipelineVersion,
80-
pipeline: (String, TimestampNanosecond),
81-
with_latest: bool,
82-
) {
89+
pub(crate) fn insert_pipeline_str_cache(&self, pipeline: &PipelineContent, with_latest: bool) {
90+
let schema = pipeline.schema.as_str();
91+
let name = pipeline.name.as_str();
92+
let version = pipeline.version;
8393
insert_cache_generic(
8494
&self.original_pipelines,
8595
schema,
8696
name,
87-
version,
97+
Some(version),
8898
pipeline.clone(),
8999
with_latest,
90100
);
91101
insert_cache_generic(
92102
&self.failover_cache,
93103
schema,
94104
name,
95-
version,
96-
pipeline,
105+
Some(version),
106+
pipeline.clone(),
97107
with_latest,
98108
);
99109
}
@@ -112,7 +122,7 @@ impl PipelineCache {
112122
schema: &str,
113123
name: &str,
114124
version: PipelineVersion,
115-
) -> Result<Option<(String, TimestampNanosecond)>> {
125+
) -> Result<Option<PipelineContent>> {
116126
get_cache_generic(&self.failover_cache, schema, name, version)
117127
}
118128

@@ -121,7 +131,7 @@ impl PipelineCache {
121131
schema: &str,
122132
name: &str,
123133
version: PipelineVersion,
124-
) -> Result<Option<(String, TimestampNanosecond)>> {
134+
) -> Result<Option<PipelineContent>> {
125135
get_cache_generic(&self.original_pipelines, schema, name, version)
126136
}
127137

@@ -174,13 +184,13 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
174184
version: PipelineVersion,
175185
) -> Result<Option<T>> {
176186
// lets try empty schema first
177-
let k = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
178-
if let Some(value) = cache.get(&k) {
187+
let emp_key = generate_pipeline_cache_key(EMPTY_SCHEMA_NAME, name, version);
188+
if let Some(value) = cache.get(&emp_key) {
179189
return Ok(Some(value));
180190
}
181191
// use input schema
182-
let k = generate_pipeline_cache_key(schema, name, version);
183-
if let Some(value) = cache.get(&k) {
192+
let schema_k = generate_pipeline_cache_key(schema, name, version);
193+
if let Some(value) = cache.get(&schema_k) {
184194
return Ok(Some(value));
185195
}
186196

@@ -193,14 +203,28 @@ fn get_cache_generic<T: Clone + Send + Sync + 'static>(
193203

194204
match ks.len() {
195205
0 => Ok(None),
196-
1 => Ok(Some(ks.remove(0).1)),
197-
_ => MultiPipelineWithDiffSchemaSnafu {
198-
schemas: ks
199-
.iter()
200-
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
201-
.collect::<Vec<_>>()
202-
.join(","),
206+
1 => {
207+
let (_, value) = ks.remove(0);
208+
Ok(Some(value))
209+
}
210+
_ => {
211+
debug!(
212+
"caches keys: {:?}, emp key: {:?}, schema key: {:?}, suffix key: {:?}",
213+
cache.iter().map(|e| e.0).collect::<Vec<_>>(),
214+
emp_key,
215+
schema_k,
216+
suffix_key
217+
);
218+
MultiPipelineWithDiffSchemaSnafu {
219+
name: name.to_string(),
220+
current_schema: schema.to_string(),
221+
schemas: ks
222+
.iter()
223+
.filter_map(|(k, _)| k.split_once('/').map(|k| k.0))
224+
.collect::<Vec<_>>()
225+
.join(","),
226+
}
227+
.fail()?
203228
}
204-
.fail()?,
205229
}
206230
}

src/pipeline/src/manager/pipeline_operator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ impl PipelineOperator {
220220
.observe(timer.elapsed().as_secs_f64())
221221
})
222222
.await
223+
.map(|p| (p.content, p.version))
223224
}
224225

225226
/// Insert a pipeline into the pipeline table.

src/pipeline/src/manager/table.rs

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use crate::error::{
4646
MultiPipelineWithDiffSchemaSnafu, PipelineNotFoundSnafu, RecordBatchLenNotMatchSnafu, Result,
4747
};
4848
use crate::etl::{Content, Pipeline, parse};
49-
use crate::manager::pipeline_cache::PipelineCache;
49+
use crate::manager::pipeline_cache::{PipelineCache, PipelineContent};
5050
use crate::manager::{PipelineInfo, PipelineVersion};
5151
use crate::metrics::METRIC_PIPELINE_TABLE_FIND_COUNT;
5252
use crate::util::prepare_dataframe_conditions;
@@ -260,17 +260,22 @@ impl PipelineTable {
260260
&self,
261261
schema: &str,
262262
name: &str,
263-
version: PipelineVersion,
263+
input_version: PipelineVersion,
264264
) -> Result<Arc<Pipeline>> {
265-
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, version)? {
265+
if let Some(pipeline) = self.cache.get_pipeline_cache(schema, name, input_version)? {
266266
return Ok(pipeline);
267267
}
268268

269-
let pipeline = self.get_pipeline_str(schema, name, version).await?;
270-
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline.0)?);
269+
let pipeline_content = self.get_pipeline_str(schema, name, input_version).await?;
270+
let compiled_pipeline = Arc::new(Self::compile_pipeline(&pipeline_content.content)?);
271271

272-
self.cache
273-
.insert_pipeline_cache(schema, name, version, compiled_pipeline.clone(), false);
272+
self.cache.insert_pipeline_cache(
273+
&pipeline_content.schema,
274+
name,
275+
Some(pipeline_content.version),
276+
compiled_pipeline.clone(),
277+
input_version.is_none(),
278+
);
274279
Ok(compiled_pipeline)
275280
}
276281

@@ -280,14 +285,17 @@ impl PipelineTable {
280285
&self,
281286
schema: &str,
282287
name: &str,
283-
version: PipelineVersion,
284-
) -> Result<(String, TimestampNanosecond)> {
285-
if let Some(pipeline) = self.cache.get_pipeline_str_cache(schema, name, version)? {
288+
input_version: PipelineVersion,
289+
) -> Result<PipelineContent> {
290+
if let Some(pipeline) = self
291+
.cache
292+
.get_pipeline_str_cache(schema, name, input_version)?
293+
{
286294
return Ok(pipeline);
287295
}
288296

289297
let mut pipeline_vec;
290-
match self.find_pipeline(name, version).await {
298+
match self.find_pipeline(name, input_version).await {
291299
Ok(p) => {
292300
METRIC_PIPELINE_TABLE_FIND_COUNT
293301
.with_label_values(&["true"])
@@ -304,8 +312,11 @@ impl PipelineTable {
304312
.inc();
305313
return self
306314
.cache
307-
.get_failover_cache(schema, name, version)?
308-
.ok_or(PipelineNotFoundSnafu { name, version }.build());
315+
.get_failover_cache(schema, name, input_version)?
316+
.context(PipelineNotFoundSnafu {
317+
name,
318+
version: input_version,
319+
});
309320
}
310321
_ => {
311322
// if other error, we should return it
@@ -316,42 +327,40 @@ impl PipelineTable {
316327
};
317328
ensure!(
318329
!pipeline_vec.is_empty(),
319-
PipelineNotFoundSnafu { name, version }
330+
PipelineNotFoundSnafu {
331+
name,
332+
version: input_version
333+
}
320334
);
321335

322336
// if the result is exact one, use it
323337
if pipeline_vec.len() == 1 {
324-
let (pipeline_content, found_schema, version) = pipeline_vec.remove(0);
325-
let p = (pipeline_content, version);
326-
self.cache.insert_pipeline_str_cache(
327-
&found_schema,
328-
name,
329-
Some(version),
330-
p.clone(),
331-
false,
332-
);
333-
return Ok(p);
338+
let pipeline_content = pipeline_vec.remove(0);
339+
340+
self.cache
341+
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
342+
return Ok(pipeline_content);
334343
}
335344

336345
// check if there's empty schema pipeline
337346
// if there isn't, check current schema
338347
let pipeline = pipeline_vec
339348
.iter()
340-
.find(|v| v.1 == EMPTY_SCHEMA_NAME)
341-
.or_else(|| pipeline_vec.iter().find(|v| v.1 == schema));
349+
.position(|v| v.schema == EMPTY_SCHEMA_NAME)
350+
.or_else(|| pipeline_vec.iter().position(|v| v.schema == schema))
351+
.map(|idx| pipeline_vec.remove(idx));
342352

343353
// multiple pipeline with no empty or current schema
344354
// throw an error
345-
let (pipeline_content, found_schema, version) =
346-
pipeline.context(MultiPipelineWithDiffSchemaSnafu {
347-
schemas: pipeline_vec.iter().map(|v| v.1.clone()).join(","),
348-
})?;
355+
let pipeline_content = pipeline.with_context(|| MultiPipelineWithDiffSchemaSnafu {
356+
name: name.to_string(),
357+
current_schema: schema.to_string(),
358+
schemas: pipeline_vec.iter().map(|v| v.schema.clone()).join(","),
359+
})?;
349360

350-
let v = *version;
351-
let p = (pipeline_content.clone(), v);
352361
self.cache
353-
.insert_pipeline_str_cache(found_schema, name, Some(v), p.clone(), false);
354-
Ok(p)
362+
.insert_pipeline_str_cache(&pipeline_content, input_version.is_none());
363+
Ok(pipeline_content)
355364
}
356365

357366
/// Insert a pipeline into the pipeline table and compile it.
@@ -378,13 +387,15 @@ impl PipelineTable {
378387
true,
379388
);
380389

381-
self.cache.insert_pipeline_str_cache(
382-
EMPTY_SCHEMA_NAME,
383-
name,
384-
Some(TimestampNanosecond(version)),
385-
(pipeline.to_owned(), TimestampNanosecond(version)),
386-
true,
387-
);
390+
let pipeline_content = PipelineContent {
391+
name: name.to_string(),
392+
content: pipeline.to_string(),
393+
version: TimestampNanosecond(version),
394+
schema: EMPTY_SCHEMA_NAME.to_string(),
395+
};
396+
397+
self.cache
398+
.insert_pipeline_str_cache(&pipeline_content, true);
388399
}
389400

390401
Ok((version, compiled_pipeline))
@@ -466,7 +477,7 @@ impl PipelineTable {
466477
&self,
467478
name: &str,
468479
version: PipelineVersion,
469-
) -> Result<Vec<(String, String, TimestampNanosecond)>> {
480+
) -> Result<Vec<PipelineContent>> {
470481
// 1. prepare dataframe
471482
let dataframe = self
472483
.query_engine
@@ -566,11 +577,12 @@ impl PipelineTable {
566577

567578
let len = pipeline_content.len();
568579
for i in 0..len {
569-
re.push((
570-
pipeline_content.value(i).to_string(),
571-
pipeline_schema.value(i).to_string(),
572-
TimestampNanosecond::new(pipeline_created_at.value(i)),
573-
));
580+
re.push(PipelineContent {
581+
name: name.to_string(),
582+
content: pipeline_content.value(i).to_string(),
583+
version: TimestampNanosecond::new(pipeline_created_at.value(i)),
584+
schema: pipeline_schema.value(i).to_string(),
585+
});
574586
}
575587
}
576588

0 commit comments

Comments
 (0)