diff --git a/camelot/parsers/network.py b/camelot/parsers/network.py index 4fbcf5f7..edc415c1 100644 --- a/camelot/parsers/network.py +++ b/camelot/parsers/network.py @@ -66,7 +66,7 @@ def column_spread(left, right, col_anchors): return index_right - index_left -def find_closest_tls( # noqa: C901 +def find_closest_tls( bbox: list[float], tls: list[LTTextLineHorizontal | LTTextLineVertical] ) -> dict[str, LTTextLineHorizontal | LTTextLineVertical | None]: """Search for textlines that are closest to the bounding box but outside in all four directions. @@ -257,7 +257,8 @@ def search_header_from_body_bbox( merged_zones = _merge_zones(zones) max_spread = max( - column_spread(zone[0], zone[1], col_anchors) for zone in merged_zones + (column_spread(zone[0], zone[1], col_anchors) for zone in merged_zones), + default=0, ) # Accept textlines that cross columns boundaries, as long as they @@ -481,13 +482,13 @@ def compute_plausible_gaps(self): # Calculate gaps between textlines h_gaps = np.array( [ - h_textlines[i].x0 - h_textlines[i - 1].x0 + h_textlines[i].x0 - h_textlines[i - 1].x1 for i in range(1, len(h_textlines)) ] ) v_gaps = np.array( [ - v_textlines[i].y0 - v_textlines[i - 1].y0 + v_textlines[i].y0 - v_textlines[i - 1].y1 for i in range(1, len(v_textlines)) ] ) @@ -527,6 +528,8 @@ def search_table_body( [x0, y0, x1, y1] or None if not enough textlines are found. """ most_aligned_tl = self.most_connected_textline() + if most_aligned_tl is None: + return None max_h_gap, max_v_gap = gaps_hv parse_details_search: dict[str, Any] | None = None @@ -813,176 +816,141 @@ def __init__( ) def _generate_table_bbox(self): + """ + Generate table bounding boxes. + + This method is refactored to handle two distinct cases: + 1. User-provided `table_areas`: It iterates through each area, + running the network detection algorithm only on the text contained + within that specific area. + 2. No `table_areas` (Discovery mode): It uses the original logic to + scan the entire page to find tables. + """ user_provided_bboxes = self._get_user_provided_bboxes() - - filtered_textlines = list( - self._get_filtered_textlines() - ) # Convert to list if not already - textlines = list(filtered_textlines) if filtered_textlines else [] - - textlines_processed = ( - set() - ) # Use a set for O(1) average time complexity for lookups + all_textlines = list(self._get_filtered_textlines()) self.table_bbox_parses = {} if self.parse_details is not None: self.parse_details["network_searches"] = [] self.parse_details["bbox_searches"] = [] self.parse_details["col_searches"] = [] - while textlines: # Continue while there are textlines to process - bbox_body, gaps_hv = self._get_bbox_body(user_provided_bboxes, textlines) - - if bbox_body is None: - break # Exit the loop if no more bbox_body can be generated - - tls_in_bbox = textlines_overlapping_bbox(bbox_body, textlines) - if not tls_in_bbox: # If there are no textlines in the bbox, break - break - - cols_boundaries = find_columns_boundaries(tls_in_bbox) - cols_anchors = boundaries_to_split_lines(cols_boundaries) - - bbox_full = self._get_full_bbox( - user_provided_bboxes, - bbox_body, - tls_in_bbox, - textlines, - cols_anchors, - gaps_hv, - ) - - if isinstance(bbox_full, list): - bbox_full = tuple(bbox_full) - - table_parse = { - "bbox_body": bbox_body, - "cols_boundaries": cols_boundaries, - "cols_anchors": cols_anchors, - "bbox_full": bbox_full, - } - - self.table_bbox_parses[bbox_full] = table_parse - - if self.parse_details is not None: - self.parse_details["col_searches"].append(table_parse) + if user_provided_bboxes: + # Case 1: User has specified table areas. + for user_bbox in user_provided_bboxes: + textlines_in_area = textlines_overlapping_bbox(user_bbox, all_textlines) + if not textlines_in_area: + continue + + text_network = TextNetworks() + text_network.generate(textlines_in_area) + text_network.remove_unconnected_edges() + gaps_hv = text_network.compute_plausible_gaps() + + if gaps_hv is None: + bbox_body = user_bbox + else: + bbox_body = text_network.search_table_body( + gaps_hv, self.parse_details + ) + if bbox_body is None: + bbox_body = user_bbox + + tls_in_body = textlines_overlapping_bbox(bbox_body, textlines_in_area) + if not tls_in_body: + continue + + cols_boundaries = find_columns_boundaries(tls_in_body) + cols_anchors = boundaries_to_split_lines(cols_boundaries) + + bbox_full = self._get_full_bbox( + [], + bbox_body, + tls_in_body, + textlines_in_area, + cols_anchors, + gaps_hv, + ) - # Update processed textlines - textlines_processed.update(tls_in_bbox) - # Filter out processed textlines - textlines = [tl for tl in textlines if tl not in textlines_processed] + if isinstance(bbox_full, list): + bbox_full = tuple(bbox_full) + + table_parse = { + "bbox_body": bbox_body, + "cols_boundaries": cols_boundaries, + "cols_anchors": cols_anchors, + "bbox_full": bbox_full, + } + self.table_bbox_parses[bbox_full] = table_parse + if self.parse_details is not None: + self.parse_details["col_searches"].append(table_parse) + else: + # Case 2: No user-specified areas (discovery mode). + textlines = all_textlines + textlines_processed = set() + while textlines: + bbox_body, gaps_hv = self._get_bbox_body(None, textlines) + if bbox_body is None: + break + + tls_in_bbox = textlines_overlapping_bbox(bbox_body, textlines) + if not tls_in_bbox: + break + + cols_boundaries = find_columns_boundaries(tls_in_bbox) + cols_anchors = boundaries_to_split_lines(cols_boundaries) + + bbox_full = self._get_full_bbox( + None, + bbox_body, + tls_in_bbox, + textlines, + cols_anchors, + gaps_hv, + ) - # Early exit if all textlines have been processed - if not textlines: # Check if there are no more textlines to process - break + if isinstance(bbox_full, list): + bbox_full = tuple(bbox_full) + + table_parse = { + "bbox_body": bbox_body, + "cols_boundaries": cols_boundaries, + "cols_anchors": cols_anchors, + "bbox_full": bbox_full, + } + self.table_bbox_parses[bbox_full] = table_parse + if self.parse_details is not None: + self.parse_details["col_searches"].append(table_parse) + + tls_in_full_bbox = textlines_overlapping_bbox(bbox_full, textlines) + textlines_processed.update(tls_in_full_bbox) + textlines = [tl for tl in textlines if tl not in textlines_processed] + if not textlines: + break def _get_bbox_body(self, user_provided_bboxes, textlines): - if user_provided_bboxes is not None: - if len(user_provided_bboxes) > 0: - return ( - user_provided_bboxes.pop(), - None, - ) # Return None for gaps_hv if using user bbox - + """ + This method is now only used for table discovery on a full page. + The case for user-provided bboxes is handled in _generate_table_bbox. + """ text_network = TextNetworks() text_network.generate(textlines) text_network.remove_unconnected_edges() gaps_hv = text_network.compute_plausible_gaps() if gaps_hv is None: - return None, None # End the loop if no gaps can be computed + return None, None edge_tol_hv = ( gaps_hv[0], gaps_hv[1] if self.edge_tol is None else self.edge_tol, ) - bbox_body = text_network.search_table_body( - edge_tol_hv, - parse_details=( - self.parse_details["bbox_searches"] if self.parse_details else None - ), - ) if self.parse_details is not None: - self.parse_details["network_searches"].append(text_network) - - return bbox_body, gaps_hv # Return the computed bbox_body and gaps_hv - - def _get_full_bbox( - self, - user_provided_bboxes, - bbox_body, - tls_in_bbox, - textlines, - cols_anchors, - gaps_hv, - ): - if user_provided_bboxes is not None: - if len(user_provided_bboxes) > 0: - return bbox_body # Use the existing bbox_body directly - else: - bbox_body_from_tls = bbox_from_textlines(tls_in_bbox) - if bbox_body_from_tls is not None: - return search_header_from_body_bbox( - bbox_body_from_tls, textlines, cols_anchors, gaps_hv[1] - ) - return bbox_body - - def _get_filtered_textlines(self): - all_textlines = [ - t - for t in self.horizontal_text + self.vertical_text - if len(t.get_text().strip()) > 0 - ] - return self._apply_regions_filter(all_textlines) + self.parse_details["network_searches"].append( + (text_network, gaps_hv, edge_tol_hv) + ) - def _get_user_provided_bboxes(self): - if self.table_areas is not None: - return [bbox_from_str(area_str) for area_str in self.table_areas] - return None + bbox_body = text_network.search_table_body(edge_tol_hv, self.parse_details) + return bbox_body, gaps_hv - def _generate_columns_and_rows(self, bbox, user_cols): - # select elements which lie within table_bbox - self.t_bbox = text_in_bbox_per_axis( - bbox, self.horizontal_text, self.vertical_text - ) - all_tls = list( - sorted( - filter( - lambda textline: len(textline.get_text().strip()) > 0, - self.t_bbox["horizontal"] + self.t_bbox["vertical"], - ), - key=lambda textline: (-textline.y0, textline.x0), - ) - ) - text_x_min, text_y_min, text_x_max, text_y_max = bbox_from_textlines(all_tls) - # FRHTODO: - # This algorithm takes the horizontal textlines in the bbox, and groups - # them into rows based on their bottom y0. - # That's wrong: it misses the vertical items, and misses out on all - # the alignment identification work we've done earlier. - rows_grouped = self._group_rows(all_tls, row_tol=self.row_tol) - rows = self._join_rows(rows_grouped, text_y_max, text_y_min) - - if user_cols is not None: - cols = [text_x_min] + user_cols + [text_x_max] - cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)] - else: - # Check if the bounding box exists as a key in the dictionary - if bbox in self.table_bbox_parses: - parse_details = self.table_bbox_parses[bbox] - col_anchors = parse_details["cols_anchors"] - cols = list( - map( - lambda idx: [col_anchors[idx], col_anchors[idx + 1]], - range(0, len(col_anchors) - 1), - ) - ) - else: - # Handle the KeyError gracefully by returning empty lists - # or by performing alternative logic, such as using a default - # bounding box or skipping the table. - print(f"Warning: Bounding box {bbox} not found in table_bbox_parses.") - return [], [], [], [] # Return empty lists for cols, rows, v_s, h_s - - return cols, rows, None, None diff --git a/tests/files/good_energy.pdf b/tests/files/good_energy.pdf new file mode 100644 index 00000000..506fbd01 Binary files /dev/null and b/tests/files/good_energy.pdf differ diff --git a/tests/test_network.py b/tests/test_network.py index 33d59600..15a0cc07 100644 --- a/tests/test_network.py +++ b/tests/test_network.py @@ -155,3 +155,62 @@ def test_network_no_infinite_execution(testdir): ) assert len(tables) >= 1 + + +# Reported as https://github.com/camelot-dev/camelot/issues/585 +def test_issue_585(testdir): + """Test for GitHub issue #585. + + This test checks that Camelot can successfully extract tables when using + the 'network' flavor with specified 'table_areas' and 'columns', + ensuring that at least one table is detected. + + Parameters + ---------- + testdir : str + The path to the test directory. + + """ + filename = os.path.join(testdir, "multiple_tables.pdf") + tables = camelot.read_pdf( + filename, + flavor="network", + table_areas=["100,700,500,100"], + columns=["150,200,250,300,350,400,450,500"], + ) + assert len(tables) > 0 + + +def test_issue_585_network_flavor_with_table_areas(testdir): + """Test for GitHub issue #585, focusing on the 'network' flavor. + + This test verifies that Camelot's 'network' flavor can detect and + extract a table when a specific 'table_areas' is provided. The issue + reported that this scenario was failing, while the 'lattice' flavor + worked. This test ensures the 'network' flavor now behaves as expected. + + It checks that exactly one table is found in the specified area. + + Parameters + ---------- + testdir : str + The path to the test directory, provided by the testing framework. + This directory should contain the 'issue_585.pdf' file. + + """ + # Use the PDF file mentioned in the GitHub issue + filename = os.path.join(testdir, "good_energy.pdf") + + # The table_areas and columns are taken directly from the issue report + # to replicate the exact conditions. + tables = camelot.read_pdf( + filename, + flavor="network", + table_areas=["46,213,558,180"], + columns=["92,159,262,357,454,534"], + split_text=True, + ) + + # The core of the issue was that no tables were being detected. + # This assertion now checks that exactly one table is found. + assert len(tables) == 1