Skip to content

Commit 5179918

Browse files
committed
iceberg_loader: Simplify batch_metadata
1 parent bd938d7 commit 5179918

File tree

2 files changed

+3
-36
lines changed

2 files changed

+3
-36
lines changed

src/amp/loaders/implementations/iceberg_loader.py

Lines changed: 3 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ def _load_batch_impl(self, batch: pa.RecordBatch, table_name: str, **kwargs) ->
135135
# Get the Iceberg table (already created by _create_table_from_schema if needed)
136136
mode = kwargs.get('mode', LoadMode.APPEND)
137137
table_identifier = f'{self.config.namespace}.{table_name}'
138-
138+
139139
# Use cached table if available
140140
if table_identifier in self._table_cache:
141141
iceberg_table = self._table_cache[table_identifier]
@@ -408,45 +408,13 @@ def _perform_load_operation(self, iceberg_table: IcebergTable, arrow_table: pa.T
408408

409409
def _get_loader_batch_metadata(self, batch: pa.RecordBatch, duration: float, **kwargs) -> Dict[str, Any]:
410410
"""Get Iceberg-specific metadata for batch operation"""
411-
metadata = {'namespace': self.config.namespace}
412-
413-
# Add partition columns if available
414-
table_name = kwargs.get('table_name')
415-
if table_name and self._table_exists(table_name):
416-
try:
417-
table_info = self.get_table_info(table_name)
418-
metadata['partition_columns'] = table_info.get('partition_columns', [])
419-
except Exception:
420-
metadata['partition_columns'] = []
421-
else:
422-
# For new tables, get partition fields from partition_spec if available
423-
metadata['partition_columns'] = []
424-
425-
return metadata
411+
return {'namespace': self.config.namespace}
426412

427413
def _get_loader_table_metadata(
428414
self, table: pa.Table, duration: float, batch_count: int, **kwargs
429415
) -> Dict[str, Any]:
430416
"""Get Iceberg-specific metadata for table operation"""
431-
metadata = {'namespace': self.config.namespace}
432-
433-
# Add partition columns if available
434-
table_name = kwargs.get('table_name')
435-
if table_name and self._table_exists(table_name):
436-
try:
437-
table_info = self.get_table_info(table_name)
438-
metadata['partition_columns'] = table_info.get('partition_columns', [])
439-
except Exception:
440-
metadata['partition_columns'] = []
441-
else:
442-
# For new tables, get partition fields from partition_spec if available
443-
metadata['partition_columns'] = []
444-
if self.config.partition_spec and hasattr(self.config.partition_spec, 'fields'):
445-
# partition_spec.fields contains partition field definitions
446-
# We'll extract them during table creation
447-
metadata['partition_columns'] = [] # Will be populated after table creation
448-
449-
return metadata
417+
return {'namespace': self.config.namespace}
450418

451419
def _table_exists(self, table_name: str) -> bool:
452420
"""Check if a table exists"""

tests/integration/test_iceberg_loader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,6 @@ def test_partitioning(self, iceberg_partitioned_config, small_test_data):
228228

229229
assert result.success == True
230230
# Note: Partitioning requires creating PartitionSpec objects now
231-
assert result.metadata['partition_columns'] == []
232231
assert result.metadata['namespace'] == iceberg_partitioned_config['namespace']
233232

234233
def test_timestamp_conversion(self, iceberg_basic_config):

0 commit comments

Comments
 (0)