[WIP] Feat: Add RayImageBTSMinhashDeduplicator#897
[WIP] Feat: Add RayImageBTSMinhashDeduplicator#897Dludora wants to merge 4 commits intodatajuicer:mainfrom
Conversation
Summary of ChangesHello @Dludora, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the image processing capabilities by integrating a new distributed image deduplication operator. This operator, Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new RayImageBTSMinhashDeduplicator for identifying near-duplicate images using MinHash on visual tokens from a BEiT model, with distributed computation via Ray. The implementation is comprehensive, supporting both CPU and GPU execution and dynamic batch sizing. However, there are several areas for improvement. A critical bug exists due to an undefined attribute in an error log message. There is also significant code duplication in the run method of the RayImageBTSMinhashDeduplicatorWithUid class that should be refactored. Additionally, several blocks of commented-out code and an unused constant should be removed to enhance code clarity. The main class docstring is also misleading, as it describes a different hashing mechanism than the one implemented.
| def run(self, dataset, **kwargs): | ||
| self._ensure_actors() | ||
|
|
||
| start_time = time.time() | ||
|
|
||
| if self.use_cuda(): | ||
| logger.info("Using GPU for MinHash computation") | ||
| gpu_count = ray_gpu_count() | ||
| if gpu_count == 0: | ||
| logger.error("No GPUs available in Ray cluster") | ||
| raise RuntimeError("No GPUs available in Ray cluster") | ||
|
|
||
| concurrency = max(1, gpu_count) | ||
| gpu_memory = ray_available_gpu_memories() | ||
| if len(gpu_memory): | ||
| min_memory = min(gpu_memory) | ||
| safe_memory = min_memory * 0.8 | ||
| estimated_batch_size = int(safe_memory / self.memory_per_sample) | ||
| max_reasonable_batch = 2_000_000 | ||
| batch_size = max(1, min(estimated_batch_size, max_reasonable_batch)) | ||
| else: | ||
| batch_size = self.minhash_batch_size | ||
| else: | ||
| logger.info("Using CPU for MinHash computation") | ||
| cpu_count = int(ray.cluster_resources().get("CPU", 1)) | ||
| total_cluster_memory = int(ray.cluster_resources().get("memory", 0)) | ||
| safe_memory_total = total_cluster_memory * 0.8 | ||
| concurrency = max(1, cpu_count // 2) | ||
| memory_budget_per_worker = safe_memory_total / concurrency | ||
| bytes_per_sample = self.memory_per_sample * 1024 * 1024 | ||
| estimated_batch_size = int(memory_budget_per_worker / bytes_per_sample) | ||
| batch_size = max(32, min(estimated_batch_size, 1024)) | ||
|
|
||
| logger.info(f"Using batch size of {batch_size} for CPU MinHash computation") | ||
|
|
||
| def band_existing_uid(table: pa.Table) -> pa.Table: | ||
| if HashKeys.uid not in table.column_names: | ||
| raise ValueError(f"Dataset missing required column: {HashKeys.uid} for {OP_NAME}_with_uid operator.") | ||
|
|
||
| self.band_minhash(table["_minhash"], table[HashKeys.uid]) | ||
|
|
||
| return table.drop_columns(["_minhash"]) | ||
|
|
||
| from ray.data._internal.util import get_compute_strategy | ||
|
|
||
| compute = get_compute_strategy(ImageMinHashActor, concurrency=int(concurrency)) | ||
|
|
||
| dataset = dataset.map_batches( | ||
| ImageMinHashActor, | ||
| fn_constructor_kwargs={ | ||
| "model_key": self.model_key, | ||
| "use_cuda": self.use_cuda(), | ||
| "perm_a": self.perm_a, | ||
| "perm_b": self.perm_b, | ||
| "num_permutation": self.num_permutation, | ||
| "batch_size": batch_size, | ||
| }, | ||
| fn_kwargs={"image_key": self.image_key, "image_bytes_key": self.image_bytes_key}, | ||
| batch_format="pyarrow", | ||
| zero_copy_batch=True, | ||
| compute=compute, | ||
| num_gpus=1 if self.use_cuda() else 0, | ||
| batch_size=batch_size, | ||
| ) | ||
|
|
||
| dataset = dataset.map_batches( | ||
| band_existing_uid, | ||
| batch_format="pyarrow", | ||
| zero_copy_batch=True, | ||
| ) | ||
|
|
||
| dataset_count = dataset.count() | ||
| logger.info(f"Processed {dataset_count} samples for MinHash calculation.") | ||
|
|
||
| end_time = time.time() | ||
| logger.info(f"MinHash calculation and banding time = {end_time - start_time}") | ||
|
|
||
| start_time = time.time() | ||
| self.merge() | ||
| end_time = time.time() | ||
| logger.info(f"Union-Find merge time = {end_time - start_time}") | ||
|
|
||
| start_time = time.time() | ||
| result = dataset.map_batches( | ||
| self.filter_with_union_find, | ||
| batch_format="pyarrow", | ||
| zero_copy_batch=True, | ||
| ) | ||
|
|
||
| end_time = time.time() | ||
| logger.info(f"Filter graph construction time = {end_time - start_time}") | ||
|
|
||
| return result |
There was a problem hiding this comment.
The run method in RayImageBTSMinhashDeduplicatorWithUid duplicates a significant amount of logic from the parent class's run method, particularly the setup for concurrency and batch_size. This makes the code harder to maintain and prone to inconsistencies. This logic should be extracted into a shared private method (e.g., _calculate_run_parameters) in the base class to promote code reuse and reduce redundancy.
Key Args:
model_name (str): The Hugging Face model ID used for hash computation.
Defaults to "microsoft/beit-base-patch16-224-pt22k".
accelerator (str): The device accelerator to use, either "cpu" or "cuda".
Defaults to "cuda".
memory_per_sample (float): Estimated memory usage per image sample in MB.
Used to dynamically calculate batch size based on available GPU memory.
Defaults to 25 (approx. 25MB per decoded 1080p image).
jaccard_threshold (float): The minimum Jaccard similarity required to consider
two images as duplicates. A higher threshold implies stricter matching logic,
preserving more data. Defaults to 0.7.