Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 32 additions & 76 deletions backend/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,91 +257,47 @@ async def get_dataset_rows(
if invalid_columns:
raise HTTPException(status_code=400, detail=f"Invalid columns: {invalid_columns}")

# For corrupted datasets, provide a helpful schema-only view
# Read rows, falling back to an informational response if the dataset is unreadable
result_table = None
total_count = 0

try:
# Check if this is a known corrupted dataset
if dataset_name == "images":
logger.info(f"Detected images dataset - using schema-only approach due to known corruption")

# Create a schema-based representation instead of reading data
schema = table.schema
schema_info = []

for field in schema:
field_info = {
"column": field.name,
"type": str(field.type),
"nullable": field.nullable
}

# Add special info for vector columns
if (pa.types.is_list(field.type) or pa.types.is_fixed_size_list(field.type)) and pa.types.is_floating(field.type.value_type):
field_info["vector_info"] = {
"is_vector": True,
"element_type": str(field.type.value_type),
"description": "CLIP embedding vectors (corrupted data - schema only)"
}

schema_info.append(field_info)

# Create informative response about the corrupted dataset
info_schema = pa.schema([
pa.field("status", pa.string()),
pa.field("dataset", pa.string()),
pa.field("schema_info", pa.string()),
pa.field("corruption_details", pa.string())
])

info_data = [
["corrupted_but_readable_schema"],
[dataset_name],
[f"Schema: {', '.join([f.name + ':' + str(f.type) for f in schema])}"],
["Lance file corruption detected - bytes range error. Schema available but data unreadable."]
]

result_table = pa.Table.from_arrays(info_data, schema=info_schema)
total_count = 1
try:
# Native pagination: read only the requested rows from disk
total_count = table.count_rows()
end = min(offset + limit, total_count)
if offset >= total_count:
result_table = pa.table({field.name: pa.array([], type=field.type) for field in table.schema})
else:
offsets = list(range(offset, end))
builder = table.take_offsets(offsets)
if column_list:
available_columns = [col for col in column_list if col in [field.name for field in table.schema]]
if available_columns:
builder = builder.select(available_columns)
result_table = builder.to_arrow()

logger.info(f"Returned schema info for corrupted {dataset_name} dataset")
logger.info(f"Read {result_table.num_rows} rows (offset={offset}, limit={limit}) from {dataset_name} ({total_count} total)")

else:
try:
# Native pagination: read only the requested rows from disk
total_count = table.count_rows()
end = min(offset + limit, total_count)
if offset >= total_count:
result_table = pa.table({field.name: pa.array([], type=field.type) for field in table.schema})
else:
offsets = list(range(offset, end))
builder = table.take_offsets(offsets)
if column_list:
available_columns = [col for col in column_list if col in [field.name for field in table.schema]]
if available_columns:
builder = builder.select(available_columns)
result_table = builder.to_arrow()

logger.info(f"Read {result_table.num_rows} rows (offset={offset}, limit={limit}) from {dataset_name} ({total_count} total)")

except (AttributeError, TypeError):
# Fallback for older Lance versions without take_offsets/count_rows
logger.info(f"Native pagination unavailable, using Arrow slice for {dataset_name}")
arrow_table = table.to_arrow()
total_count = arrow_table.num_rows
except (AttributeError, TypeError):
# Fallback for older Lance versions without take_offsets/count_rows
logger.info(f"Native pagination unavailable, using Arrow slice for {dataset_name}")
arrow_table = table.to_arrow()
total_count = arrow_table.num_rows

if column_list:
available_columns = [col for col in column_list if col in arrow_table.column_names]
if available_columns:
arrow_table = arrow_table.select(available_columns)
if column_list:
available_columns = [col for col in column_list if col in arrow_table.column_names]
if available_columns:
arrow_table = arrow_table.select(available_columns)

result_table = arrow_table.slice(offset, limit)
result_table = arrow_table.slice(offset, limit)

except Exception as general_error:
logger.error(f"Reading failed for {dataset_name}: {general_error}")
except Exception as read_error:
# Graceful degradation: any dataset that fails to read (corruption,
# format error, unreadable bytes) returns a single informational row
# instead of a 500.
logger.warning(f"Failed to read rows from {dataset_name}, falling back to informational response: {read_error}")

# Fallback: provide informative error response
error_schema = pa.schema([
pa.field("error", pa.string()),
pa.field("dataset", pa.string()),
Expand All @@ -350,7 +306,7 @@ async def get_dataset_rows(
error_data = [
["Unable to read dataset"],
[dataset_name],
[f"Error: {str(general_error)[:200]}"]
[f"Error: {str(read_error)[:200]}"]
]
result_table = pa.Table.from_arrays(error_data, schema=error_schema)
total_count = 1
Expand Down
Loading