Skip to content

Commit 4fe8d93

Browse files
authored
chore(cubestore): Refactoring of job processing (#7094)
1 parent 7aaffd5 commit 4fe8d93

File tree

22 files changed

+2073
-864
lines changed

22 files changed

+2073
-864
lines changed
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use crate::config::injection::DIService;
2+
use crate::config::Config;
3+
use crate::import::ImportService;
4+
use crate::metastore::job::{Job, JobType};
5+
use crate::metastore::table::Table;
6+
use crate::metastore::{MetaStore, RowKey, TableId};
7+
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
8+
use crate::store::compaction::CompactionService;
9+
use crate::store::ChunkDataStore;
10+
use crate::CubeError;
11+
use async_trait::async_trait;
12+
use serde::{Deserialize, Serialize};
13+
use std::sync::Arc;
14+
15+
#[derive(Clone, Serialize, Deserialize, Debug)]
16+
pub struct JobProcessResult {
17+
data_loaded_size: usize,
18+
}
19+
20+
impl JobProcessResult {
21+
pub fn new(data_loaded_size: usize) -> Self {
22+
Self { data_loaded_size }
23+
}
24+
25+
pub fn data_loaded_size(&self) -> usize {
26+
self.data_loaded_size
27+
}
28+
}
29+
30+
impl Default for JobProcessResult {
31+
fn default() -> Self {
32+
Self {
33+
data_loaded_size: 0,
34+
}
35+
}
36+
}
37+
38+
#[async_trait]
39+
pub trait JobProcessor: DIService + Send + Sync {
40+
async fn wait_processing_loops(&self);
41+
async fn stop_processing_loops(&self) -> Result<(), CubeError>;
42+
async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError>;
43+
}
44+
45+
pub struct JobProcessorImpl {
46+
processor: Arc<JobIsolatedProcessor>,
47+
}
48+
49+
impl JobProcessorImpl {
50+
pub fn new(
51+
meta_store: Arc<dyn MetaStore>,
52+
chunk_store: Arc<dyn ChunkDataStore>,
53+
compaction_service: Arc<dyn CompactionService>,
54+
import_service: Arc<dyn ImportService>,
55+
) -> Arc<Self> {
56+
Arc::new(Self {
57+
processor: JobIsolatedProcessor::new(
58+
meta_store,
59+
chunk_store,
60+
compaction_service,
61+
import_service,
62+
),
63+
})
64+
}
65+
}
66+
67+
#[async_trait]
68+
impl JobProcessor for JobProcessorImpl {
69+
async fn wait_processing_loops(&self) {}
70+
71+
async fn stop_processing_loops(&self) -> Result<(), CubeError> {
72+
Ok(())
73+
}
74+
75+
async fn process_job(&self, job: Job) -> Result<JobProcessResult, CubeError> {
76+
self.processor.process_separate_job(&job).await
77+
}
78+
}
79+
80+
crate::di_service!(JobProcessorImpl, [JobProcessor]);
81+
82+
pub struct JobIsolatedProcessor {
83+
meta_store: Arc<dyn MetaStore>,
84+
chunk_store: Arc<dyn ChunkDataStore>,
85+
compaction_service: Arc<dyn CompactionService>,
86+
import_service: Arc<dyn ImportService>,
87+
}
88+
89+
impl JobIsolatedProcessor {
90+
pub fn new(
91+
meta_store: Arc<dyn MetaStore>,
92+
chunk_store: Arc<dyn ChunkDataStore>,
93+
compaction_service: Arc<dyn CompactionService>,
94+
import_service: Arc<dyn ImportService>,
95+
) -> Arc<Self> {
96+
Arc::new(Self {
97+
meta_store,
98+
chunk_store,
99+
compaction_service,
100+
import_service,
101+
})
102+
}
103+
104+
pub async fn new_from_config(config: &Config) -> Arc<Self> {
105+
Self::new(
106+
config.injector().get_service_typed().await,
107+
config.injector().get_service_typed().await,
108+
config.injector().get_service_typed().await,
109+
config.injector().get_service_typed().await,
110+
)
111+
}
112+
113+
pub async fn process_separate_job(&self, job: &Job) -> Result<JobProcessResult, CubeError> {
114+
match job.job_type() {
115+
JobType::PartitionCompaction => {
116+
if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() {
117+
let compaction_service = self.compaction_service.clone();
118+
let partition_id = *partition_id;
119+
let data_loaded_size = DataLoadedSize::new();
120+
let r = compaction_service
121+
.compact(partition_id, data_loaded_size.clone())
122+
.await;
123+
r?;
124+
Ok(JobProcessResult::new(data_loaded_size.get()))
125+
} else {
126+
Self::fail_job_row_key(job)
127+
}
128+
}
129+
JobType::MultiPartitionSplit => {
130+
if let RowKey::Table(TableId::MultiPartitions, id) = job.row_reference() {
131+
let compaction_service = self.compaction_service.clone();
132+
let id = *id;
133+
compaction_service.split_multi_partition(id).await?;
134+
Ok(JobProcessResult::default())
135+
} else {
136+
Self::fail_job_row_key(job)
137+
}
138+
}
139+
JobType::FinishMultiSplit => {
140+
if let RowKey::Table(TableId::MultiPartitions, multi_part_id) = job.row_reference()
141+
{
142+
let meta_store = self.meta_store.clone();
143+
let compaction_service = self.compaction_service.clone();
144+
let multi_part_id = *multi_part_id;
145+
for p in meta_store.find_unsplit_partitions(multi_part_id).await? {
146+
compaction_service
147+
.finish_multi_split(multi_part_id, p)
148+
.await?
149+
}
150+
151+
Ok(JobProcessResult::default())
152+
} else {
153+
Self::fail_job_row_key(job)
154+
}
155+
}
156+
JobType::TableImport => {
157+
if let RowKey::Table(TableId::Tables, table_id) = job.row_reference() {
158+
let import_service = self.import_service.clone();
159+
let table_id = *table_id;
160+
import_service.import_table(table_id).await?;
161+
Ok(JobProcessResult::default())
162+
} else {
163+
Self::fail_job_row_key(job)
164+
}
165+
}
166+
JobType::TableImportCSV(location) => {
167+
if Table::is_stream_location(&location) {
168+
return Err(CubeError::internal(
169+
"Streaming import cannot be processed in separate process".to_string(),
170+
));
171+
}
172+
if let RowKey::Table(TableId::Tables, table_id) = job.row_reference() {
173+
let table_id = *table_id;
174+
let import_service = self.import_service.clone();
175+
let location = location.to_string();
176+
let data_loaded_size = Some(DataLoadedSize::new());
177+
import_service
178+
.clone()
179+
.import_table_part(table_id, &location, data_loaded_size.clone())
180+
.await?;
181+
Ok(JobProcessResult::new(
182+
data_loaded_size.map_or(0, |d| d.get()),
183+
))
184+
} else {
185+
Self::fail_job_row_key(job)
186+
}
187+
}
188+
JobType::RepartitionChunk => {
189+
if let RowKey::Table(TableId::Chunks, chunk_id) = job.row_reference() {
190+
let chunk_id = *chunk_id;
191+
let chunk = self.meta_store.get_chunk(chunk_id).await?;
192+
if chunk.get_row().in_memory() {
193+
return Err(CubeError::internal(
194+
"In-memory chunk cannot be repartitioned in separate process"
195+
.to_string(),
196+
));
197+
}
198+
let data_loaded_size = DataLoadedSize::new();
199+
self.chunk_store
200+
.repartition_chunk(chunk_id, data_loaded_size.clone())
201+
.await?;
202+
Ok(JobProcessResult::new(data_loaded_size.get()))
203+
} else {
204+
Self::fail_job_row_key(job)
205+
}
206+
}
207+
_ => Err(CubeError::internal(format!(
208+
"Job {:?} cannot be processed in separate process",
209+
job.job_type()
210+
))),
211+
}
212+
}
213+
214+
fn fail_job_row_key(job: &Job) -> Result<JobProcessResult, CubeError> {
215+
Err(CubeError::internal(format!(
216+
"Incorrect row key for {:?}: {:?}",
217+
job,
218+
job.row_reference()
219+
)))
220+
}
221+
}

0 commit comments

Comments
 (0)