Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 118 additions & 150 deletions camelot/parsers/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def column_spread(left, right, col_anchors):
return index_right - index_left


def find_closest_tls( # noqa: C901
def find_closest_tls(
bbox: list[float], tls: list[LTTextLineHorizontal | LTTextLineVertical]
) -> dict[str, LTTextLineHorizontal | LTTextLineVertical | None]:
"""Search for textlines that are closest to the bounding box but outside in all four directions.
Expand Down Expand Up @@ -257,7 +257,8 @@ def search_header_from_body_bbox(
merged_zones = _merge_zones(zones)

max_spread = max(
column_spread(zone[0], zone[1], col_anchors) for zone in merged_zones
(column_spread(zone[0], zone[1], col_anchors) for zone in merged_zones),
default=0,
)

# Accept textlines that cross columns boundaries, as long as they
Expand Down Expand Up @@ -481,13 +482,13 @@ def compute_plausible_gaps(self):
# Calculate gaps between textlines
h_gaps = np.array(
[
h_textlines[i].x0 - h_textlines[i - 1].x0
h_textlines[i].x0 - h_textlines[i - 1].x1
for i in range(1, len(h_textlines))
]
)
v_gaps = np.array(
[
v_textlines[i].y0 - v_textlines[i - 1].y0
v_textlines[i].y0 - v_textlines[i - 1].y1
for i in range(1, len(v_textlines))
]
)
Expand Down Expand Up @@ -527,6 +528,8 @@ def search_table_body(
[x0, y0, x1, y1] or None if not enough textlines are found.
"""
most_aligned_tl = self.most_connected_textline()
if most_aligned_tl is None:
return None
max_h_gap, max_v_gap = gaps_hv

parse_details_search: dict[str, Any] | None = None
Expand Down Expand Up @@ -813,176 +816,141 @@ def __init__(
)

def _generate_table_bbox(self):
"""
Generate table bounding boxes.

This method is refactored to handle two distinct cases:
1. User-provided `table_areas`: It iterates through each area,
running the network detection algorithm only on the text contained
within that specific area.
2. No `table_areas` (Discovery mode): It uses the original logic to
scan the entire page to find tables.
"""
user_provided_bboxes = self._get_user_provided_bboxes()

filtered_textlines = list(
self._get_filtered_textlines()
) # Convert to list if not already
textlines = list(filtered_textlines) if filtered_textlines else []

textlines_processed = (
set()
) # Use a set for O(1) average time complexity for lookups
all_textlines = list(self._get_filtered_textlines())
self.table_bbox_parses = {}
if self.parse_details is not None:
self.parse_details["network_searches"] = []
self.parse_details["bbox_searches"] = []
self.parse_details["col_searches"] = []

while textlines: # Continue while there are textlines to process
bbox_body, gaps_hv = self._get_bbox_body(user_provided_bboxes, textlines)

if bbox_body is None:
break # Exit the loop if no more bbox_body can be generated

tls_in_bbox = textlines_overlapping_bbox(bbox_body, textlines)
if not tls_in_bbox: # If there are no textlines in the bbox, break
break

cols_boundaries = find_columns_boundaries(tls_in_bbox)
cols_anchors = boundaries_to_split_lines(cols_boundaries)

bbox_full = self._get_full_bbox(
user_provided_bboxes,
bbox_body,
tls_in_bbox,
textlines,
cols_anchors,
gaps_hv,
)

if isinstance(bbox_full, list):
bbox_full = tuple(bbox_full)

table_parse = {
"bbox_body": bbox_body,
"cols_boundaries": cols_boundaries,
"cols_anchors": cols_anchors,
"bbox_full": bbox_full,
}

self.table_bbox_parses[bbox_full] = table_parse

if self.parse_details is not None:
self.parse_details["col_searches"].append(table_parse)
if user_provided_bboxes:
# Case 1: User has specified table areas.
for user_bbox in user_provided_bboxes:
textlines_in_area = textlines_overlapping_bbox(user_bbox, all_textlines)
if not textlines_in_area:
continue

text_network = TextNetworks()
text_network.generate(textlines_in_area)
text_network.remove_unconnected_edges()
gaps_hv = text_network.compute_plausible_gaps()

if gaps_hv is None:
bbox_body = user_bbox
else:
bbox_body = text_network.search_table_body(
gaps_hv, self.parse_details
)
if bbox_body is None:
bbox_body = user_bbox

tls_in_body = textlines_overlapping_bbox(bbox_body, textlines_in_area)
if not tls_in_body:
continue

cols_boundaries = find_columns_boundaries(tls_in_body)
cols_anchors = boundaries_to_split_lines(cols_boundaries)

bbox_full = self._get_full_bbox(
[],
bbox_body,
tls_in_body,
textlines_in_area,
cols_anchors,
gaps_hv,
)

# Update processed textlines
textlines_processed.update(tls_in_bbox)
# Filter out processed textlines
textlines = [tl for tl in textlines if tl not in textlines_processed]
if isinstance(bbox_full, list):
bbox_full = tuple(bbox_full)

table_parse = {
"bbox_body": bbox_body,
"cols_boundaries": cols_boundaries,
"cols_anchors": cols_anchors,
"bbox_full": bbox_full,
}
self.table_bbox_parses[bbox_full] = table_parse
if self.parse_details is not None:
self.parse_details["col_searches"].append(table_parse)
else:
# Case 2: No user-specified areas (discovery mode).
textlines = all_textlines
textlines_processed = set()
while textlines:
bbox_body, gaps_hv = self._get_bbox_body(None, textlines)
if bbox_body is None:
break

tls_in_bbox = textlines_overlapping_bbox(bbox_body, textlines)
if not tls_in_bbox:
break

cols_boundaries = find_columns_boundaries(tls_in_bbox)
cols_anchors = boundaries_to_split_lines(cols_boundaries)

bbox_full = self._get_full_bbox(
None,
bbox_body,
tls_in_bbox,
textlines,
cols_anchors,
gaps_hv,
)

# Early exit if all textlines have been processed
if not textlines: # Check if there are no more textlines to process
break
if isinstance(bbox_full, list):
bbox_full = tuple(bbox_full)

table_parse = {
"bbox_body": bbox_body,
"cols_boundaries": cols_boundaries,
"cols_anchors": cols_anchors,
"bbox_full": bbox_full,
}
self.table_bbox_parses[bbox_full] = table_parse
if self.parse_details is not None:
self.parse_details["col_searches"].append(table_parse)

tls_in_full_bbox = textlines_overlapping_bbox(bbox_full, textlines)
textlines_processed.update(tls_in_full_bbox)
textlines = [tl for tl in textlines if tl not in textlines_processed]
if not textlines:
break

def _get_bbox_body(self, user_provided_bboxes, textlines):
if user_provided_bboxes is not None:
if len(user_provided_bboxes) > 0:
return (
user_provided_bboxes.pop(),
None,
) # Return None for gaps_hv if using user bbox

"""
This method is now only used for table discovery on a full page.
The case for user-provided bboxes is handled in _generate_table_bbox.
"""
text_network = TextNetworks()
text_network.generate(textlines)
text_network.remove_unconnected_edges()
gaps_hv = text_network.compute_plausible_gaps()

if gaps_hv is None:
return None, None # End the loop if no gaps can be computed
return None, None

edge_tol_hv = (
gaps_hv[0],
gaps_hv[1] if self.edge_tol is None else self.edge_tol,
)
bbox_body = text_network.search_table_body(
edge_tol_hv,
parse_details=(
self.parse_details["bbox_searches"] if self.parse_details else None
),
)

if self.parse_details is not None:
self.parse_details["network_searches"].append(text_network)

return bbox_body, gaps_hv # Return the computed bbox_body and gaps_hv

def _get_full_bbox(
self,
user_provided_bboxes,
bbox_body,
tls_in_bbox,
textlines,
cols_anchors,
gaps_hv,
):
if user_provided_bboxes is not None:
if len(user_provided_bboxes) > 0:
return bbox_body # Use the existing bbox_body directly
else:
bbox_body_from_tls = bbox_from_textlines(tls_in_bbox)
if bbox_body_from_tls is not None:
return search_header_from_body_bbox(
bbox_body_from_tls, textlines, cols_anchors, gaps_hv[1]
)
return bbox_body

def _get_filtered_textlines(self):
all_textlines = [
t
for t in self.horizontal_text + self.vertical_text
if len(t.get_text().strip()) > 0
]
return self._apply_regions_filter(all_textlines)
self.parse_details["network_searches"].append(
(text_network, gaps_hv, edge_tol_hv)
)

def _get_user_provided_bboxes(self):
if self.table_areas is not None:
return [bbox_from_str(area_str) for area_str in self.table_areas]
return None
bbox_body = text_network.search_table_body(edge_tol_hv, self.parse_details)
return bbox_body, gaps_hv

def _generate_columns_and_rows(self, bbox, user_cols):
# select elements which lie within table_bbox
self.t_bbox = text_in_bbox_per_axis(
bbox, self.horizontal_text, self.vertical_text
)

all_tls = list(
sorted(
filter(
lambda textline: len(textline.get_text().strip()) > 0,
self.t_bbox["horizontal"] + self.t_bbox["vertical"],
),
key=lambda textline: (-textline.y0, textline.x0),
)
)
text_x_min, text_y_min, text_x_max, text_y_max = bbox_from_textlines(all_tls)
# FRHTODO:
# This algorithm takes the horizontal textlines in the bbox, and groups
# them into rows based on their bottom y0.
# That's wrong: it misses the vertical items, and misses out on all
# the alignment identification work we've done earlier.
rows_grouped = self._group_rows(all_tls, row_tol=self.row_tol)
rows = self._join_rows(rows_grouped, text_y_max, text_y_min)

if user_cols is not None:
cols = [text_x_min] + user_cols + [text_x_max]
cols = [(cols[i], cols[i + 1]) for i in range(0, len(cols) - 1)]
else:
# Check if the bounding box exists as a key in the dictionary
if bbox in self.table_bbox_parses:
parse_details = self.table_bbox_parses[bbox]
col_anchors = parse_details["cols_anchors"]
cols = list(
map(
lambda idx: [col_anchors[idx], col_anchors[idx + 1]],
range(0, len(col_anchors) - 1),
)
)
else:
# Handle the KeyError gracefully by returning empty lists
# or by performing alternative logic, such as using a default
# bounding box or skipping the table.
print(f"Warning: Bounding box {bbox} not found in table_bbox_parses.")
return [], [], [], [] # Return empty lists for cols, rows, v_s, h_s

return cols, rows, None, None
Binary file added tests/files/good_energy.pdf
Binary file not shown.
59 changes: 59 additions & 0 deletions tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,62 @@ def test_network_no_infinite_execution(testdir):
)

assert len(tables) >= 1


# Reported as https://github.com/camelot-dev/camelot/issues/585
def test_issue_585(testdir):
"""Test for GitHub issue #585.

This test checks that Camelot can successfully extract tables when using
the 'network' flavor with specified 'table_areas' and 'columns',
ensuring that at least one table is detected.

Parameters
----------
testdir : str
The path to the test directory.

"""
filename = os.path.join(testdir, "multiple_tables.pdf")
tables = camelot.read_pdf(
filename,
flavor="network",
table_areas=["100,700,500,100"],
columns=["150,200,250,300,350,400,450,500"],
)
assert len(tables) > 0


def test_issue_585_network_flavor_with_table_areas(testdir):
"""Test for GitHub issue #585, focusing on the 'network' flavor.

This test verifies that Camelot's 'network' flavor can detect and
extract a table when a specific 'table_areas' is provided. The issue
reported that this scenario was failing, while the 'lattice' flavor
worked. This test ensures the 'network' flavor now behaves as expected.

It checks that exactly one table is found in the specified area.

Parameters
----------
testdir : str
The path to the test directory, provided by the testing framework.
This directory should contain the 'issue_585.pdf' file.

"""
# Use the PDF file mentioned in the GitHub issue
filename = os.path.join(testdir, "good_energy.pdf")

# The table_areas and columns are taken directly from the issue report
# to replicate the exact conditions.
tables = camelot.read_pdf(
filename,
flavor="network",
table_areas=["46,213,558,180"],
columns=["92,159,262,357,454,534"],
split_text=True,
)

# The core of the issue was that no tables were being detected.
# This assertion now checks that exactly one table is found.
assert len(tables) == 1
Loading