-
Notifications
You must be signed in to change notification settings - Fork 154
Description
When trying to slice candidates (pd.MultiIndex) and perform comparer.compute(), sometimes among my partitions there will be
"ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements".
I have checked the shapes are all right, then the problem should be about the computing process. I assume that the additional 12 rows are misplaced columns (10 comparing columns + two index columns)
`import os
import pandas as pd
import recordlinkage
from recordlinkage import Compare
Perform the actual comparisons and store the result
print("Performing comparisons...")
def compute_and_save_partition(comparer, candidates, df1, df2, start, end, partition_path):
# Check if the partition is already computed and saved
if os.path.exists(partition_path):
# Load the partition from the Parquet file
print(f"reading in {start}:{end}")
return pd.read_parquet(partition_path)
else:
# Compute the partition
candidate_slice = candidates[start:end]
print(f"processing partition {partition_path.split('_')[1:-1]}: start={start}, end={end}, candidate_slice: {len(candidate_slice)}")
if len(candidate_slice) != 1000000:
raise ValueError(f"candidate_slice has incorrect number of rows: {len(candidate_slice)}")
print(f"df1 shape: {df1.shape}, df2 shape: {df2.shape}")
print(f"Candidate slice length: {len(candidate_slice)}")
partition_result = comparer.compute(candidate_slice, df1, df2)
print(f"Partition result shape: {partition_result.shape}")
# Save the computed partition to a Parquet file
partition_result.to_parquet(partition_path)
return partition_result
def parallel_compute_and_save(comparer, candidates, df1, df2, output_dir, partition=1000000):
result = []
total_candidates = len(candidates)
for i in range(0, total_candidates, partition):
end = min(i + partition, total_candidates)
partition_path = os.path.join(output_dir, f'partition_{i}_{end}.parquet')
result.append(compute_and_save_partition(comparer, candidates, df1, df2, i, end, partition_path))
return result
Setup the output directory
name = "your_dataset_name" # Replace with your dataset name
output_dir = f"../Output/temp/{name}_compare"
os.makedirs(output_dir, exist_ok=True)
Replace _1861[cols_to_compare] and _1851[cols_to_compare] with your dataframes and columns to compare
final_result = parallel_compute_and_save(comparer, candidates, _1861[cols_to_compare], _1851[cols_to_compare], output_dir)
Update the columns of the result dataframe
final_result = pd.concat(final_result)
final_result.columns = ['pname', 'oname', 'sname', 'pname_soundex', 'sname_soundex', 'pname_metaphone', 'sname_metaphone', 'address', 'sname_pop_metaphone', 'dateofbirth']
final_result = final_result.reset_index(drop=True)
`
Performing comparisons...
reading in 0:1000000
processing partition ['dataset', 'name', 'compare/partition', '1000000']: start=1000000, end=2000000, candidate_slice: 1000000
df1 shape: (19828561, 10), df2 shape: (17711058, 10)
Candidate slice length: 1000000
_RemoteTraceback Traceback (most recent call last)
_RemoteTraceback:
"""
Traceback (most recent call last):
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/externals/loky/process_executor.py", line 463, in _process_worker
r = call_item()
^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/externals/loky/process_executor.py", line 291, in call
return self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/parallel.py", line 589, in call
return [func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/joblib/parallel.py", line 589, in
return [func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/base.py", line 33, in _parallel_compare_helper
return class_obj._compute(pairs, x, x_link)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/base.py", line 668, in _compute
df_b_indexed = frame_indexing(x_link[sublabels_right], pairs, 1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/users/xiet13/.local/lib/python3.11/site-packages/recordlinkage/utils.py", line 198, in frame_indexing
data.index = multi_index
^^^^^^^^^^
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/generic.py", line 6218, in setattr
return object.setattr(self, name, value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "properties.pyx", line 69, in pandas._libs.properties.AxisProperty.set
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/generic.py", line 767, in _set_axis
self._mgr.set_axis(axis, labels)
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/internals/managers.py", line 227, in set_axis
self._validate_set_axis(axis, new_labels)
File "/opt/apps/hand/miniconda/23.5.2/lib/python3.11/site-packages/pandas/core/internals/base.py", line 85, in _validate_set_axis
raise ValueError(
ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements
"""The above exception was the direct cause of the following exception:
ValueError Traceback (most recent call last)
Cell In[31], line 45
42 os.makedirs(output_dir, exist_ok=True)
44 # Replace _1861[cols_to_compare] and _1851[cols_to_compare] with your dataframes and columns to compare
---> 45 final_result = parallel_compute_and_save(comparer, candidates, _1861[cols_to_compare], _1851[cols_to_compare], output_dir)
47 # Update the columns of the result dataframe
48 final_result = pd.concat(final_result)Cell In[31], line 36, in parallel_compute_and_save(comparer, candidates, df1, df2, output_dir, partition)
34 end = min(i + partition, total_candidates)
35 partition_path = os.path.join(output_dir, f'partition_{i}_{end}.parquet')
---> 36 result.append(compute_and_save_partition(comparer, candidates, df1, df2, i, end, partition_path))
37 return resultCell In[31], line 24, in compute_and_save_partition(comparer, candidates, df1, df2, start, end, partition_path)
22 print(f"df1 shape: {df1.shape}, df2 shape: {df2.shape}")
23 print(f"Candidate slice length: {len(candidate_slice)}")
---> 24 partition_result = comparer.compute(candidate_slice, df1, df2)
25 print(f"Partition result shape: {partition_result.shape}")
26 # Save the computed partition to a Parquet fileFile ~/.local/lib/python3.11/site-packages/recordlinkage/base.py:836, in BaseCompare.compute(self, pairs, x, x_link)
834 results = self._compute(pairs, x, x_link)
835 elif self.n_jobs > 1:
--> 836 results = self._compute_parallel(pairs, x, x_link, n_jobs=self.n_jobs)
837 else:
838 raise ValueError("number of jobs should be positive integer")File ~/.local/lib/python3.11/site-packages/recordlinkage/base.py:648, in BaseCompare._compute_parallel(self, pairs, x, x_link, n_jobs)
646 def _compute_parallel(self, pairs, x, x_link=None, n_jobs=1):
647 df_chunks = index_split(pairs, n_jobs)
--> 648 result_chunks = Parallel(n_jobs=n_jobs)(
649 delayed(_parallel_compare_helper)(self, chunk, x, x_link)
650 for chunk in df_chunks
651 )
653 result = pandas.concat(result_chunks)
654 return resultFile ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1952, in Parallel.call(self, iterable)
1946 # The first item from the output is blank, but it makes the interpreter
1947 # progress until it enters the Try/Except block of the generator and
1948 # reach the firstyieldstatement. This starts the aynchronous
1949 # dispatch of the tasks to the workers.
1950 next(output)
-> 1952 return output if self.return_generator else list(output)File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1595, in Parallel._get_outputs(self, iterator, pre_dispatch)
1592 yield
1594 with self._backend.retrieval_context():
-> 1595 yield from self._retrieve()
1597 except GeneratorExit:
1598 # The generator has been garbage collected before being fully
1599 # consumed. This aborts the remaining tasks if possible and warn
1600 # the user if necessary.
1601 self._exception = TrueFile ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1699, in Parallel._retrieve(self)
1692 while self._wait_retrieval():
1693
1694 # If the callback thread of a worker has signaled that its task
1695 # triggered an exception, or if the retrieval loop has raised an
1696 # exception (e.g.GeneratorExit), exit the loop and surface the
1697 # worker traceback.
1698 if self._aborting:
-> 1699 self._raise_error_fast()
1700 break
1702 # If the next job is not ready for retrieval yet, we just wait for
1703 # async callbacks to progress.File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:1734, in Parallel._raise_error_fast(self)
1730 # If this error job exists, immediatly raise the error by
1731 # calling get_result. This job might not exists if abort has been
1732 # called directly or if the generator is gc'ed.
1733 if error_job is not None:
-> 1734 error_job.get_result(self.timeout)File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:736, in BatchCompletionCallBack.get_result(self, timeout)
730 backend = self.parallel._backend
732 if backend.supports_retrieve_callback:
733 # We assume that the result has already been retrieved by the
734 # callback thread, and is stored internally. It's just waiting to
735 # be returned.
--> 736 return self._return_or_raise()
738 # For other backends, the main thread needs to run the retrieval step.
739 try:File ~/.local/lib/python3.11/site-packages/joblib/parallel.py:754, in BatchCompletionCallBack._return_or_raise(self)
752 try:
753 if self.status == TASK_ERROR:
--> 754 raise self._result
755 return self._result
756 finally:ValueError: Length mismatch: Expected axis has 31262 elements, new values have 31250 elements