|
29 | 29 | from tqdm.auto import tqdm |
30 | 30 |
|
31 | 31 | import datachain.sql.sqlite |
32 | | -from datachain import semver |
33 | 32 | from datachain.data_storage import AbstractDBMetastore, AbstractWarehouse |
34 | 33 | from datachain.data_storage.db_engine import DatabaseEngine |
35 | 34 | from datachain.data_storage.schema import DefaultSchema |
@@ -692,61 +691,6 @@ def get_dataset_sources( |
692 | 691 | for row in self.db.execute(query, cursor=cur) |
693 | 692 | ] |
694 | 693 |
|
695 | | - def merge_dataset_rows( |
696 | | - self, |
697 | | - src: DatasetRecord, |
698 | | - dst: DatasetRecord, |
699 | | - src_version: str, |
700 | | - dst_version: str, |
701 | | - ) -> None: |
702 | | - dst_empty = False |
703 | | - |
704 | | - if not self.db.has_table(self.dataset_table_name(src, src_version)): |
705 | | - # source table doesn't exist, nothing to do |
706 | | - return |
707 | | - |
708 | | - src_dr = self.dataset_rows(src, src_version).table |
709 | | - |
710 | | - if not self.db.has_table(self.dataset_table_name(dst, dst_version)): |
711 | | - # destination table doesn't exist, create it |
712 | | - self.create_dataset_rows_table( |
713 | | - self.dataset_table_name(dst, dst_version), |
714 | | - columns=src_dr.columns, |
715 | | - ) |
716 | | - dst_empty = True |
717 | | - |
718 | | - dst_dr = self.dataset_rows(dst, dst_version).table |
719 | | - merge_fields = [c.name for c in src_dr.columns if c.name != "sys__id"] |
720 | | - select_src = select(*(getattr(src_dr.columns, f) for f in merge_fields)) |
721 | | - |
722 | | - if dst_empty: |
723 | | - # we don't need union, but just select from source to destination |
724 | | - insert_query = sqlite.insert(dst_dr).from_select(merge_fields, select_src) |
725 | | - else: |
726 | | - dst_version_latest = None |
727 | | - # find the previous version of the destination dataset |
728 | | - dst_previous_versions = [ |
729 | | - v.version |
730 | | - for v in dst.versions # type: ignore [union-attr] |
731 | | - if semver.compare(v.version, dst_version) == -1 |
732 | | - ] |
733 | | - if dst_previous_versions: |
734 | | - dst_version_latest = max(dst_previous_versions) |
735 | | - |
736 | | - dst_dr_latest = self.dataset_rows(dst, dst_version_latest).table |
737 | | - |
738 | | - select_dst_latest = select( |
739 | | - *(getattr(dst_dr_latest.c, f) for f in merge_fields) |
740 | | - ) |
741 | | - union_query = sqlalchemy.union(select_src, select_dst_latest) |
742 | | - insert_query = ( |
743 | | - sqlite.insert(dst_dr) |
744 | | - .from_select(merge_fields, union_query) |
745 | | - .prefix_with("OR IGNORE") |
746 | | - ) |
747 | | - |
748 | | - self.db.execute(insert_query) |
749 | | - |
750 | 694 | def prepare_entries(self, entries: "Iterable[File]") -> Iterable[dict[str, Any]]: |
751 | 695 | return (e.model_dump() for e in entries) |
752 | 696 |
|
|
0 commit comments