Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 26 additions & 51 deletions backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def get_dataset_columns(dataset_name: str):
@app.get("/datasets/{dataset_name}/rows")
async def get_dataset_rows(
dataset_name: str,
limit: int = Query(default=50, le=MAX_LIMIT),
limit: int = Query(default=50, ge=1, le=MAX_LIMIT),
offset: int = Query(default=0, ge=0),
columns: Optional[str] = Query(default=None)
):
Expand Down Expand Up @@ -294,60 +294,35 @@ async def get_dataset_rows(
logger.info(f"Returned schema info for corrupted {dataset_name} dataset")

else:
# For other datasets, try normal reading
logger.info(f"Attempting to read {dataset_name} using to_arrow().to_pylist() approach")

# This is the approach that works in the search API
data_list = table.to_arrow().to_pylist()
total_count = len(data_list)

# Apply pagination at the list level
start_idx = offset
end_idx = min(offset + limit, total_count)
paginated_data = data_list[start_idx:end_idx]

# Convert back to Arrow table for consistent processing
if paginated_data:
# Get the schema from the original table
schema = table.schema
try:
# Native pagination: read only the requested rows from disk
total_count = table.count_rows()
end = min(offset + limit, total_count)
if offset >= total_count:
result_table = pa.table({field.name: pa.array([], type=field.type) for field in table.schema})
else:
offsets = list(range(offset, end))
builder = table.take_offsets(offsets)
if column_list:
available_columns = [col for col in column_list if col in [field.name for field in table.schema]]
if available_columns:
builder = builder.select(available_columns)
result_table = builder.to_arrow()

logger.info(f"Read {result_table.num_rows} rows (offset={offset}, limit={limit}) from {dataset_name} ({total_count} total)")

except (AttributeError, TypeError):
# Fallback for older Lance versions without take_offsets/count_rows
logger.info(f"Native pagination unavailable, using Arrow slice for {dataset_name}")
arrow_table = table.to_arrow()
total_count = arrow_table.num_rows

# Apply column selection if specified
if column_list:
# Filter the schema and data
available_columns = [col for col in column_list if col in [field.name for field in schema]]
if available_columns:
# Create filtered data
filtered_data = []
for row in paginated_data:
filtered_row = {col: row.get(col) for col in available_columns}
filtered_data.append(filtered_row)
paginated_data = filtered_data

# Create filtered schema
filtered_fields = [field for field in schema if field.name in available_columns]
schema = pa.schema(filtered_fields)

# Convert the paginated data back to Arrow format
arrays = []
for field in schema:
column_data = [row.get(field.name) for row in paginated_data]
arrays.append(pa.array(column_data, type=field.type))

result_table = pa.Table.from_arrays(arrays, schema=schema)
else:
# Empty result - create empty table with correct schema
schema = table.schema
if column_list:
available_columns = [col for col in column_list if col in [field.name for field in schema]]
available_columns = [col for col in column_list if col in arrow_table.column_names]
if available_columns:
filtered_fields = [field for field in schema if field.name in available_columns]
schema = pa.schema(filtered_fields)

# Create empty arrays for each field
arrays = [pa.array([], type=field.type) for field in schema]
result_table = pa.Table.from_arrays(arrays, schema=schema)
arrow_table = arrow_table.select(available_columns)

logger.info(f"Successfully read {len(paginated_data)} rows from {dataset_name}")
result_table = arrow_table.slice(offset, limit)

except Exception as general_error:
logger.error(f"Reading failed for {dataset_name}: {general_error}")
Expand Down
Loading