1+ import json
12import os
2- import tempfile
33from contextlib import contextmanager
4- from dataclasses import dataclass
4+ from dataclasses import dataclass , field
55from pathlib import Path
66from typing import TYPE_CHECKING , Any , Generator , Optional
77
88from pydantic import Field
99
10- from unstructured_ingest .utils .data_prep import get_data_df , write_data
11- from unstructured_ingest .v2 .interfaces import Uploader , UploaderConfig
10+ from unstructured_ingest .utils .data_prep import get_json_data , write_data
11+ from unstructured_ingest .v2 .constants import RECORD_ID_LABEL
12+ from unstructured_ingest .v2 .interfaces import (
13+ Uploader ,
14+ UploaderConfig ,
15+ UploadStager ,
16+ UploadStagerConfig ,
17+ )
1218from unstructured_ingest .v2 .logger import logger
1319from unstructured_ingest .v2 .processes .connector_registry import (
1420 DestinationRegistryEntry ,
1521)
1622from unstructured_ingest .v2 .processes .connectors .databricks .volumes import DatabricksPathMixin
1723from unstructured_ingest .v2 .processes .connectors .sql .databricks_delta_tables import (
1824 DatabricksDeltaTablesConnectionConfig ,
19- DatabricksDeltaTablesUploadStager ,
2025 DatabricksDeltaTablesUploadStagerConfig ,
2126)
2227from unstructured_ingest .v2 .types .file_data import FileData
28+ from unstructured_ingest .v2 .utils import get_enhanced_element_id
2329
2430CONNECTOR_TYPE = "databricks_volume_delta_tables"
2531
2632if TYPE_CHECKING :
27- from pandas import DataFrame
33+ pass
2834
2935
3036class DatabricksVolumeDeltaTableUploaderConfig (UploaderConfig , DatabricksPathMixin ):
3137 database : str = Field (description = "Database name" , default = "default" )
32- table_name : str = Field (description = "Table name" )
38+ table_name : Optional [str ] = Field (description = "Table name" , default = None )
39+
40+
41+ class DatabricksVolumeDeltaTableStagerConfig (UploadStagerConfig ):
42+ pass
3343
3444
3545@dataclass
36- class DatabricksVolumeDeltaTableStager (DatabricksDeltaTablesUploadStager ):
37- def write_output (self , output_path : Path , data : list [dict ]) -> Path :
46+ class DatabricksVolumeDeltaTableStager (UploadStager ):
47+ upload_stager_config : DatabricksVolumeDeltaTableStagerConfig = field (
48+ default_factory = DatabricksVolumeDeltaTableStagerConfig
49+ )
50+
51+ def run (
52+ self ,
53+ elements_filepath : Path ,
54+ output_dir : Path ,
55+ output_filename : str ,
56+ file_data : FileData ,
57+ ** kwargs : Any ,
58+ ) -> Path :
3859 # To avoid new line issues when migrating from volumes into delta tables, omit indenting
3960 # and always write it as a json file
61+ output_dir .mkdir (exist_ok = True , parents = True )
62+ output_path = output_dir / output_filename
4063 final_output_path = output_path .with_suffix (".json" )
64+ data = get_json_data (path = elements_filepath )
65+ for element in data :
66+ element ["id" ] = get_enhanced_element_id (element_dict = element , file_data = file_data )
67+ element [RECORD_ID_LABEL ] = file_data .identifier
68+ element ["metadata" ] = json .dumps (element .get ("metadata" , {}))
4169 write_data (path = final_output_path , data = data , indent = None )
4270 return final_output_path
4371
@@ -49,6 +77,29 @@ class DatabricksVolumeDeltaTableUploader(Uploader):
4977 connector_type : str = CONNECTOR_TYPE
5078 _columns : Optional [dict [str , str ]] = None
5179
80+ def init (self , ** kwargs : Any ) -> None :
81+ self .create_destination (** kwargs )
82+
83+ def create_destination (
84+ self , destination_name : str = "unstructuredautocreated" , ** kwargs : Any
85+ ) -> bool :
86+ table_name = self .upload_config .table_name or destination_name
87+ self .upload_config .table_name = table_name
88+ connectors_dir = Path (__file__ ).parents [1 ]
89+ collection_config_file = connectors_dir / "assets" / "databricks_delta_table_schema.sql"
90+ with self .get_cursor () as cursor :
91+ cursor .execute ("SHOW TABLES" )
92+ table_names = [r [1 ] for r in cursor .fetchall ()]
93+ if table_name in table_names :
94+ return False
95+ with collection_config_file .open () as schema_file :
96+ data_lines = schema_file .readlines ()
97+ data_lines [0 ] = data_lines [0 ].replace ("elements" , table_name )
98+ destination_schema = "" .join ([line .strip () for line in data_lines ])
99+ logger .info (f"creating table { table_name } for user" )
100+ cursor .execute (destination_schema )
101+ return True
102+
52103 def precheck (self ) -> None :
53104 with self .connection_config .get_cursor () as cursor :
54105 cursor .execute ("SHOW CATALOGS" )
@@ -68,14 +119,6 @@ def precheck(self) -> None:
68119 self .upload_config .database , ", " .join (databases )
69120 )
70121 )
71- cursor .execute (f"SHOW TABLES IN { self .upload_config .database } " )
72- table_names = [r [1 ] for r in cursor .fetchall ()]
73- if self .upload_config .table_name not in table_names :
74- raise ValueError (
75- "Table {} not found in {}" .format (
76- self .upload_config .table_name , ", " .join (table_names )
77- )
78- )
79122
80123 def get_output_path (self , file_data : FileData , suffix : str = ".json" ) -> str :
81124 filename = Path (file_data .source_identifiers .filename )
@@ -98,51 +141,42 @@ def get_table_columns(self) -> dict[str, str]:
98141 self ._columns = {desc [0 ]: desc [1 ] for desc in cursor .description }
99142 return self ._columns
100143
101- def _fit_to_schema (self , df : "DataFrame" , add_missing_columns : bool = True ) -> "DataFrame" :
102- import pandas as pd
103-
104- table_columns = self .get_table_columns ()
105- columns = set (df .columns )
106- schema_fields = set (table_columns .keys ())
107- columns_to_drop = columns - schema_fields
108- missing_columns = schema_fields - columns
109-
110- if columns_to_drop :
111- logger .info (
112- "Following columns will be dropped to match the table's schema: "
113- f"{ ', ' .join (columns_to_drop )} "
114- )
115- if missing_columns and add_missing_columns :
116- logger .info (
117- "Following null filled columns will be added to match the table's schema:"
118- f" { ', ' .join (missing_columns )} "
144+ def can_delete (self ) -> bool :
145+ existing_columns = self .get_table_columns ()
146+ return RECORD_ID_LABEL in existing_columns
147+
148+ def delete_previous_content (self , file_data : FileData ) -> None :
149+ logger .debug (
150+ f"deleting any content with metadata "
151+ f"{ RECORD_ID_LABEL } ={ file_data .identifier } "
152+ f"from delta table: { self .upload_config .table_name } "
153+ )
154+ with self .get_cursor () as cursor :
155+ cursor .execute (
156+ f"DELETE FROM { self .upload_config .table_name } WHERE { RECORD_ID_LABEL } = '{ file_data .identifier } '" # noqa: E501
119157 )
120-
121- df = df .drop (columns = columns_to_drop )
122-
123- if add_missing_columns :
124- for column in missing_columns :
125- df [column ] = pd .Series ()
126- return df
158+ results = cursor .fetchall ()
159+ deleted_rows = results [0 ][0 ]
160+ logger .debug (f"deleted { deleted_rows } rows from table { self .upload_config .table_name } " )
127161
128162 def run (self , path : Path , file_data : FileData , ** kwargs : Any ) -> None :
129- with tempfile . TemporaryDirectory () as temp_dir :
130- df = get_data_df ( )
131- df = self ._fit_to_schema ( df = df )
132- temp_path = Path ( temp_dir ) / path . name
133- df . to_json ( temp_path , orient = "records" , lines = False )
134- with self . get_cursor ( staging_allowed_local_path = temp_dir ) as cursor :
135- catalog_path = self . get_output_path ( file_data = file_data )
136- logger . debug ( f"uploading { path . as_posix () } to { catalog_path } " )
137- cursor . execute ( f"PUT ' { temp_path . as_posix () } ' INTO ' { catalog_path } ' OVERWRITE" )
138- logger . debug (
139- f"migrating content from { catalog_path } to "
140- f"table { self . upload_config . table_name } "
141- )
142- columns = list ( df . columns )
143- column_str = ", " .join (columns )
144- sql_statment = f"INSERT INTO `{ self .upload_config .table_name } ` ({ column_str } ) SELECT { column_str } FROM json.`{ catalog_path } `" # noqa: E501
145- cursor .execute (sql_statment )
163+ if self . can_delete () :
164+ self . delete_previous_content ( file_data = file_data )
165+ with self .get_cursor ( staging_allowed_local_path = path . parent . as_posix ()) as cursor :
166+ catalog_path = self . get_output_path ( file_data = file_data )
167+ logger . debug ( f"uploading { path . as_posix () } to { catalog_path } " )
168+ cursor . execute ( f"PUT ' { path . as_posix () } ' INTO ' { catalog_path } ' OVERWRITE" )
169+ logger . debug (
170+ f"migrating content from { catalog_path } to "
171+ f"table { self . upload_config . table_name } "
172+ )
173+ data = get_json_data ( path = path )
174+ columns = data [ 0 ]. keys ()
175+ select_columns = [ "PARSE_JSON(metadata)" if c == "metadata" else c for c in columns ]
176+ column_str = ", " . join ( columns )
177+ select_column_str = ", " .join (select_columns )
178+ sql_statment = f"INSERT INTO `{ self .upload_config .table_name } ` ({ column_str } ) SELECT { select_column_str } FROM json.`{ catalog_path } `" # noqa: E501
179+ cursor .execute (sql_statment )
146180
147181
148182databricks_volumes_delta_tables_destination_entry = DestinationRegistryEntry (
0 commit comments