diff --git a/backend/app.py b/backend/app.py index 5dc6b57..a618153 100644 --- a/backend/app.py +++ b/backend/app.py @@ -224,7 +224,7 @@ async def get_dataset_columns(dataset_name: str): @app.get("/datasets/{dataset_name}/rows") async def get_dataset_rows( dataset_name: str, - limit: int = Query(default=50, le=MAX_LIMIT), + limit: int = Query(default=50, ge=1, le=MAX_LIMIT), offset: int = Query(default=0, ge=0), columns: Optional[str] = Query(default=None) ): @@ -294,60 +294,35 @@ async def get_dataset_rows( logger.info(f"Returned schema info for corrupted {dataset_name} dataset") else: - # For other datasets, try normal reading - logger.info(f"Attempting to read {dataset_name} using to_arrow().to_pylist() approach") - - # This is the approach that works in the search API - data_list = table.to_arrow().to_pylist() - total_count = len(data_list) - - # Apply pagination at the list level - start_idx = offset - end_idx = min(offset + limit, total_count) - paginated_data = data_list[start_idx:end_idx] - - # Convert back to Arrow table for consistent processing - if paginated_data: - # Get the schema from the original table - schema = table.schema + try: + # Native pagination: read only the requested rows from disk + total_count = table.count_rows() + end = min(offset + limit, total_count) + if offset >= total_count: + result_table = pa.table({field.name: pa.array([], type=field.type) for field in table.schema}) + else: + offsets = list(range(offset, end)) + builder = table.take_offsets(offsets) + if column_list: + available_columns = [col for col in column_list if col in [field.name for field in table.schema]] + if available_columns: + builder = builder.select(available_columns) + result_table = builder.to_arrow() + + logger.info(f"Read {result_table.num_rows} rows (offset={offset}, limit={limit}) from {dataset_name} ({total_count} total)") + + except (AttributeError, TypeError): + # Fallback for older Lance versions without take_offsets/count_rows + logger.info(f"Native pagination unavailable, using Arrow slice for {dataset_name}") + arrow_table = table.to_arrow() + total_count = arrow_table.num_rows - # Apply column selection if specified - if column_list: - # Filter the schema and data - available_columns = [col for col in column_list if col in [field.name for field in schema]] - if available_columns: - # Create filtered data - filtered_data = [] - for row in paginated_data: - filtered_row = {col: row.get(col) for col in available_columns} - filtered_data.append(filtered_row) - paginated_data = filtered_data - - # Create filtered schema - filtered_fields = [field for field in schema if field.name in available_columns] - schema = pa.schema(filtered_fields) - - # Convert the paginated data back to Arrow format - arrays = [] - for field in schema: - column_data = [row.get(field.name) for row in paginated_data] - arrays.append(pa.array(column_data, type=field.type)) - - result_table = pa.Table.from_arrays(arrays, schema=schema) - else: - # Empty result - create empty table with correct schema - schema = table.schema if column_list: - available_columns = [col for col in column_list if col in [field.name for field in schema]] + available_columns = [col for col in column_list if col in arrow_table.column_names] if available_columns: - filtered_fields = [field for field in schema if field.name in available_columns] - schema = pa.schema(filtered_fields) - - # Create empty arrays for each field - arrays = [pa.array([], type=field.type) for field in schema] - result_table = pa.Table.from_arrays(arrays, schema=schema) + arrow_table = arrow_table.select(available_columns) - logger.info(f"Successfully read {len(paginated_data)} rows from {dataset_name}") + result_table = arrow_table.slice(offset, limit) except Exception as general_error: logger.error(f"Reading failed for {dataset_name}: {general_error}")