Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ readme = "README.md"
publish = false
default-run = "vector"

[profile.release]
codegen-units = 1
lto = "fat"

[[bin]]
name = "vector"
path = "src/main.rs"
Expand Down Expand Up @@ -63,6 +67,8 @@ vector = { git = "https://github.com/vectordotdev/vector", tag = "v0.49.0", defa
vector-config = { git = "https://github.com/vectordotdev/vector", tag = "v0.49.0", default-features = false }
vector-lib = { git = "https://github.com/vectordotdev/vector", tag = "v0.49.0", default-features = false }
xz2 = { version = "0.1.7" }
lazy_static = "1.4.0"
lru = "0.12.5"

[dev-dependencies]
lazy_static = "1.4.0"
Expand Down
2 changes: 2 additions & 0 deletions proto/tidb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ message TopSQLRecordItem {
map<string, uint64> stmt_kv_exec_count = 4; // target => count
uint64 stmt_duration_sum_ns = 5;
uint64 stmt_duration_count = 6;
uint64 stmt_network_in_bytes = 7; // traffic from client
uint64 stmt_network_out_bytes = 8; // traffic to client
}

message SQLMeta {
Expand Down
11 changes: 11 additions & 0 deletions proto/tikv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ message EmptyResponse {}
message ResourceUsageRecord {
oneof record_oneof {
GroupTagRecord record = 1;
RegionRecord region_record = 2;
}
}

Expand All @@ -25,9 +26,19 @@ message GroupTagRecord {
repeated GroupTagRecordItem items = 2;
}

// RegionRecord is a set of resource usage data grouped by region.
message RegionRecord {
uint64 region_id = 1;
repeated GroupTagRecordItem items = 2;
}

message GroupTagRecordItem {
uint64 timestamp_sec = 1;
uint32 cpu_time_ms = 2;
uint32 read_keys = 3;
uint32 write_keys = 4;
uint64 network_in_bytes = 5;
uint64 network_out_bytes = 6;
uint64 logical_read_bytes = 7;
uint64 logical_write_bytes = 8;
}
22 changes: 14 additions & 8 deletions src/common/deltalake_writer/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ impl EventConverter {
schema_manager: &mut SchemaManager,
events: Vec<Event>,
fixed_schema: &Option<Schema>,
default_table_name: Option<&str>,
) -> Result<(RecordBatch, Schema), Box<dyn std::error::Error + Send + Sync>> {
if events.is_empty() {
return Err("No events to convert".into());
Expand All @@ -34,14 +35,14 @@ impl EventConverter {
} else {
// Build fixed schema from first event and cache it
let first_event = &events[0];
schema_manager.build_arrow_schema(first_event)?
schema_manager.build_arrow_schema(first_event, default_table_name)?
};

// Convert events to columns
let mut columns: Vec<ArrayRef> = Vec::new();

for field in &schema.fields {
let column = Self::create_column(field, &events)?;
let column = Self::create_column(field, &events, default_table_name)?;
columns.push(column);
}

Expand All @@ -54,9 +55,10 @@ impl EventConverter {
fn create_column(
field: &Field,
events: &[Event],
default_table_name: Option<&str>,
) -> Result<ArrayRef, Box<dyn std::error::Error + Send + Sync>> {
match field.data_type() {
DataType::Utf8 => Self::build_string_column(field, events),
DataType::Utf8 => Self::build_string_column(field, events, default_table_name),
DataType::Int64 => Self::build_int64_column(field, events),
DataType::Int32 => Self::build_int32_column(field, events),
DataType::UInt32 => Self::build_uint32_column(field, events),
Expand All @@ -79,16 +81,20 @@ impl EventConverter {
fn build_string_column(
field: &Field,
events: &[Event],
default_table_name: Option<&str>,
) -> Result<ArrayRef, Box<dyn std::error::Error + Send + Sync>> {
let mut builder = StringBuilder::with_capacity(events.len(), events.len() * 8);

for event in events.iter() {
if let Event::Log(log_event) = event {
let value_opt = match field.name().as_str() {
"_vector_table" => log_event
.get("_vector_table")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
"_vector_table" => {
if let Some(table_name) = log_event.get("_vector_table").and_then(|v| v.as_str()) {
Some(table_name.to_string())
} else {
default_table_name.map(|s| s.to_string())
}
},
"_vector_source_table" => log_event
.get("_vector_source_table")
.and_then(|v| v.as_str())
Expand Down Expand Up @@ -621,7 +627,7 @@ mod tests {
let events = vec![Event::Log(create_test_log_event())];
let field = Field::new("_vector_table", DataType::Utf8, false);

let result = EventConverter::build_string_column(&field, &events);
let result = EventConverter::build_string_column(&field, &events, None);
assert!(result.is_ok());

let array = result.unwrap();
Expand Down
98 changes: 69 additions & 29 deletions src/common/deltalake_writer/delta_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,41 +191,81 @@ impl DeltaOpsManager {

info!("Writing to Delta Lake table at: {}", table_uri);

// Use DeltaOps for improved S3 support, following the successful test pattern
let table_ops = self.create_delta_ops(&table_uri).await?;

// Get partition columns from schema manager
let partition_by = schema_manager.get_partition_by(table_name);

// Try to write directly first (avoid load() which can panic in deltalake-core 0.28.1)
info!("Attempting to write to Delta table at {}", table_uri);
// Retry logic for transaction conflicts (concurrent writes)
const MAX_RETRIES: u32 = 3;
const INITIAL_RETRY_DELAY_MS: u64 = 100;

for attempt in 0..MAX_RETRIES {
if attempt > 0 {
// Exponential backoff: 100ms, 200ms, 400ms
let delay_ms = INITIAL_RETRY_DELAY_MS * (1 << (attempt - 1));
let delay = std::time::Duration::from_millis(delay_ms);
info!(
"Retrying write to Delta table (attempt {}/{}) after {:?} delay",
attempt + 1, MAX_RETRIES, delay
);
tokio::time::sleep(delay).await;
}

// Reload table_ops on each attempt to get latest table state
// Use DeltaOps for improved S3 support, following the successful test pattern
let table_ops = self.create_delta_ops(&table_uri).await?;

info!("Attempting to write to Delta table at {} (attempt {}/{})", table_uri, attempt + 1, MAX_RETRIES);

let write_builder =
self.configure_write_builder(table_ops, record_batch.clone(), partition_by);
let write_result = write_builder.await;
let write_builder =
self.configure_write_builder(table_ops, record_batch.clone(), partition_by);
let write_result = write_builder.await;

match write_result {
Ok(table) => {
info!("✅ Successfully wrote to Delta table at {}", table_uri);
info!("Table version: {:?}", table.version());
return Ok(());
}
Err(e) => {
// Check if error is due to table not existing
let error_str = e.to_string();
if error_str.contains("does not exist")
|| error_str.contains("not found")
|| error_str.contains("Not a Delta table")
{
info!(
"Table doesn't exist, will create it. Error was: {}",
error_str
);
// Fall through to table creation below
} else {
// Other error, fail immediately
error!("Failed to write to Delta table: {}", e);
return Err(e.into());
match write_result {
Ok(table) => {
info!("✅ Successfully wrote to Delta table at {}", table_uri);
info!("Table version: {:?}", table.version());
return Ok(());
}
Err(e) => {
let error_str = e.to_string();

// Check if error is due to table not existing
if error_str.contains("does not exist")
|| error_str.contains("not found")
|| error_str.contains("Not a Delta table")
{
info!(
"Table doesn't exist, will create it. Error was: {}",
error_str
);
// Fall through to table creation below
break;
}
// Check if error is due to transaction conflict (retryable)
else if error_str.contains("conflict detected")
|| error_str.contains("Metadata changed since last commit")
|| error_str.contains("concurrent modification")
{
if attempt < MAX_RETRIES - 1 {
warn!(
"Transaction conflict detected (attempt {}/{}): {}. Will retry...",
attempt + 1, MAX_RETRIES, error_str
);
// Continue to retry
continue;
} else {
error!(
"Transaction conflict after {} retries: {}",
MAX_RETRIES, error_str
);
return Err(e.into());
}
} else {
// Other error, fail immediately
error!("Failed to write to Delta table: {}", e);
return Err(e.into());
}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/common/deltalake_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ impl DeltaLakeWriter {
table_config: DeltaTableConfig,
write_config: WriteConfig,
storage_options: Option<HashMap<String, String>>,
) -> Self {
Self::new_with_options(table_path, table_config, write_config, storage_options, true)
}

/// Create a new Delta Lake writer with options
pub fn new_with_options(
table_path: PathBuf,
table_config: DeltaTableConfig,
write_config: WriteConfig,
storage_options: Option<HashMap<String, String>>,
enable_standard_fields: bool,
) -> Self {
// Initialize S3 handlers if this is an S3 path
if table_path.to_string_lossy().starts_with("s3://") {
Expand All @@ -85,7 +96,7 @@ impl DeltaLakeWriter {
}

let type_converter = TypeConverter::new();
let schema_manager = SchemaManager::new(type_converter);
let schema_manager = SchemaManager::new_with_options(type_converter, enable_standard_fields);
let delta_ops_manager = DeltaOpsManager::new(storage_options.clone());

Self {
Expand Down Expand Up @@ -125,6 +136,7 @@ impl DeltaLakeWriter {
&mut self.schema_manager,
events,
&self.fixed_arrow_schema,
Some(&self.table_config.name),
)?;

// Cache the schema if not already cached
Expand Down
33 changes: 22 additions & 11 deletions src/common/deltalake_writer/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,33 @@ pub struct SchemaManager {
cached_arrow_schemas: HashMap<String, Schema>,
/// Type converter
type_converter: TypeConverter,
/// Whether to add system fields to the schema
enable_standard_fields: bool,
}

impl SchemaManager {
#[allow(dead_code)]
pub fn new(type_converter: TypeConverter) -> Self {
Self::new_with_options(type_converter, true)
}

pub fn new_with_options(type_converter: TypeConverter, enable_standard_fields: bool) -> Self {
Self {
cached_schemas: HashMap::new(),
cached_arrow_schemas: HashMap::new(),
type_converter,
enable_standard_fields,
}
}

/// Extract and cache schema metadata from event
pub fn extract_and_cache(&mut self, log_event: &LogEvent) -> Option<SchemaMetadata> {
pub fn extract_and_cache(&mut self, log_event: &LogEvent, default_table_name: Option<&str>) -> Option<SchemaMetadata> {
// Get table name for schema cache key
let table_name = log_event
.get("_vector_table")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown_table".to_string());
.unwrap_or_else(|| default_table_name.unwrap_or("unknown_table").to_string());

// Only extract if not already cached
if !self.cached_schemas.contains_key(&table_name) {
Expand Down Expand Up @@ -102,27 +110,30 @@ impl SchemaManager {
pub fn build_arrow_schema(
&mut self,
event: &Event,
default_table_name: Option<&str>,
) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
if let Event::Log(log_event) = event {
let mut fields = Vec::new();
let mut added_fields = std::collections::HashSet::new();

// First, extract and cache the MySQL schema metadata from the event
self.extract_and_cache(log_event);
self.extract_and_cache(log_event, default_table_name);

// Get table name for schema lookup
let table_name = log_event
.get("_vector_table")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| "unknown_table".to_string());
.unwrap_or_else(|| default_table_name.unwrap_or("unknown_table").to_string());

// Build fixed field list based on cached MySQL schema and Vector system fields

// 1. Add Vector system fields first
fields.extend(self.add_system_fields());
for field in &fields {
added_fields.insert(field.name().to_string());
// 1. Add Vector system fields first (if enabled)
if self.enable_standard_fields {
fields.extend(self.add_system_fields());
for field in &fields {
added_fields.insert(field.name().to_string());
}
}

// 2. Add date field for partitioning (derived from _vector_timestamp)
Expand Down Expand Up @@ -302,7 +313,7 @@ mod tests {
log.insert("_schema_metadata", LogValue::Object(schema_meta));

// Extract and cache
let metadata = manager.extract_and_cache(&log);
let metadata = manager.extract_and_cache(&log, None);
assert!(metadata.is_some());

let metadata = metadata.unwrap();
Expand Down Expand Up @@ -338,7 +349,7 @@ mod tests {
log.insert("_schema_metadata", LogValue::Object(schema_meta));

// Extract and cache
let metadata = manager.extract_and_cache(&log);
let metadata = manager.extract_and_cache(&log, None);
assert!(metadata.is_some());

let metadata = metadata.unwrap();
Expand Down Expand Up @@ -367,7 +378,7 @@ mod tests {
log.insert("_schema_metadata", LogValue::Object(schema_meta));

// Extract and cache
manager.extract_and_cache(&log);
manager.extract_and_cache(&log, None);

// Get partition_by
let partition_by = manager.get_partition_by("test_table");
Expand Down
Loading