@@ -97,6 +97,24 @@ class User(Base):
9797 memberships = relationship ("PartitionMembership" , back_populates = "user" , cascade = "all, delete-orphan" )
9898
9999
100+ class FileDomain (Base ):
101+ __tablename__ = "file_domains"
102+
103+ id = Column (Integer , primary_key = True )
104+ file_id = Column (String , nullable = False )
105+ partition_name = Column (
106+ String ,
107+ ForeignKey ("partitions.partition" , ondelete = "CASCADE" ),
108+ nullable = False ,
109+ )
110+ domain = Column (String , nullable = False )
111+
112+ __table_args__ = (
113+ UniqueConstraint ("file_id" , "partition_name" , "domain" , name = "uix_file_domain" ),
114+ Index ("ix_partition_domain" , "partition_name" , "domain" ),
115+ )
116+
117+
100118class PartitionMembership (Base ):
101119 __tablename__ = "partition_memberships"
102120
@@ -136,8 +154,8 @@ def __init__(self, database_url: str, logger=logger):
136154
137155 except Exception as e :
138156 raise VDBConnectionError (
139- f"Failed to connect to database: { e !s } " ,
140- db_url = database_url ,
157+ "An unexpected database error occurred " ,
158+ db_url = str ( database_url ) ,
141159 db_type = "SQLAlchemy" ,
142160 )
143161
@@ -225,10 +243,14 @@ def add_file_to_partition(
225243 session .commit ()
226244 log .info ("Added file successfully" )
227245 return True
228- except Exception :
246+ except Exception as e :
229247 session .rollback ()
230- log .exception ("Error adding file to partition" )
231- raise
248+ log .exception ("Error adding file to partition" , error = str (e ))
249+ raise VDBInsertError (
250+ "An unexpected database error occurred" ,
251+ file_id = file_id ,
252+ partition = partition ,
253+ )
232254
233255 def remove_file_from_partition (self , file_id : str , partition : str ):
234256 """Remove a file from its partition - Optimized without join"""
@@ -246,8 +268,12 @@ def remove_file_from_partition(self, file_id: str, partition: str):
246268 return False
247269 except Exception as e :
248270 session .rollback ()
249- log .error (f"Error removing file: { e } " )
250- raise e
271+ log .exception ("Error removing file" , error = str (e ))
272+ raise VDBDeleteError (
273+ "An unexpected database error occurred" ,
274+ file_id = file_id ,
275+ partition = partition ,
276+ )
251277
252278 def delete_partition (self , partition : str ):
253279 """Delete a partition and all its files"""
@@ -293,6 +319,58 @@ def file_exists_in_partition(self, file_id: str, partition: str):
293319 session .query (File ).filter (File .file_id == file_id , File .partition_name == partition ).exists ()
294320 ).scalar ()
295321
322+ # Domains
323+
324+ def get_file_ids_by_domains (self , partition : str | None , domains : list [str ]) -> list [str ]:
325+ """Get file_ids matching ANY of the given domains in a partition (or all partitions if None)."""
326+ with self .Session () as session :
327+ query = session .query (FileDomain .file_id ).filter (FileDomain .domain .in_ (domains ))
328+ if partition is not None :
329+ query = query .filter (FileDomain .partition_name == partition )
330+ rows = query .distinct ().all ()
331+ return [row [0 ] for row in rows ]
332+
333+ def set_file_domains (self , file_id : str , partition : str , domains : list [str ]):
334+ """Replace all domains for a file with the given list."""
335+ with self .Session () as session :
336+ try :
337+ session .query (FileDomain ).filter (
338+ FileDomain .file_id == file_id ,
339+ FileDomain .partition_name == partition ,
340+ ).delete ()
341+ for domain in domains :
342+ session .add (FileDomain (file_id = file_id , partition_name = partition , domain = domain ))
343+ session .commit ()
344+ except Exception as e :
345+ session .rollback ()
346+ self .logger .exception ("Error setting file domains" , error = str (e ))
347+ raise VDBInsertError (
348+ "An unexpected database error occurred" ,
349+ file_id = file_id ,
350+ partition = partition ,
351+ )
352+
353+ def get_file_domains (self , file_id : str , partition : str ) -> list [str ]:
354+ """Get domains for a file in a partition."""
355+ with self .Session () as session :
356+ rows = (
357+ session .query (FileDomain .domain )
358+ .filter (FileDomain .file_id == file_id , FileDomain .partition_name == partition )
359+ .all ()
360+ )
361+ return [row [0 ] for row in rows ]
362+
363+ def list_partition_domains (self , partition : str ) -> list [str ]:
364+ """List all unique domains in a partition."""
365+ with self .Session () as session :
366+ rows = (
367+ session .query (FileDomain .domain )
368+ .filter (FileDomain .partition_name == partition )
369+ .distinct ()
370+ .all ()
371+ )
372+ return [row [0 ] for row in rows ]
373+
296374 # Users
297375
298376 def create_user (
0 commit comments