@@ -101,62 +101,23 @@ async def find_by_permalinks(self, permalinks: List[str]) -> Sequence[Entity]:
101101 return list (result .scalars ().all ())
102102
103103 async def upsert_entity (self , entity : Entity ) -> Entity :
104- """Insert or update entity using a hybrid approach .
104+ """Insert or update entity using simple try/catch with database-level conflict resolution .
105105
106- This method provides a cleaner alternative to the try/catch approach
107- for handling permalink and file_path conflicts. It first tries direct
108- insertion, then handles conflicts intelligently.
106+ Handles file_path race conditions by checking for existing entity on IntegrityError.
107+ For permalink conflicts, generates a unique permalink with numeric suffix.
109108
110109 Args:
111110 entity: The entity to insert or update
112111
113112 Returns:
114113 The inserted or updated entity
115114 """
116-
117115 async with db .scoped_session (self .session_maker ) as session :
118116 # Set project_id if applicable and not already set
119117 self ._set_project_id_if_needed (entity )
120118
121- # Check for existing entity with same file_path first
122- existing_by_path = await session .execute (
123- select (Entity ).where (
124- Entity .file_path == entity .file_path , Entity .project_id == entity .project_id
125- )
126- )
127- existing_path_entity = existing_by_path .scalar_one_or_none ()
128-
129- if existing_path_entity :
130- # Update existing entity with same file path
131- for key , value in {
132- "title" : entity .title ,
133- "entity_type" : entity .entity_type ,
134- "entity_metadata" : entity .entity_metadata ,
135- "content_type" : entity .content_type ,
136- "permalink" : entity .permalink ,
137- "checksum" : entity .checksum ,
138- "updated_at" : entity .updated_at ,
139- }.items ():
140- setattr (existing_path_entity , key , value )
141-
142- await session .flush ()
143- # Return with relationships loaded
144- query = (
145- self .select ()
146- .where (Entity .file_path == entity .file_path )
147- .options (* self .get_load_options ())
148- )
149- result = await session .execute (query )
150- found = result .scalar_one_or_none ()
151- if not found : # pragma: no cover
152- raise RuntimeError (
153- f"Failed to retrieve entity after update: { entity .file_path } "
154- )
155- return found
156-
157- # No existing entity with same file_path, try insert
119+ # Try simple insert first
158120 try :
159- # Simple insert for new entity
160121 session .add (entity )
161122 await session .flush ()
162123
@@ -175,20 +136,20 @@ async def upsert_entity(self, entity: Entity) -> Entity:
175136 return found
176137
177138 except IntegrityError :
178- # Could be either file_path or permalink conflict
179139 await session .rollback ()
180140
181- # Check if it's a file_path conflict (race condition)
182- existing_by_path_check = await session .execute (
183- select (Entity ).where (
141+ # Re-query after rollback to get a fresh, attached entity
142+ existing_result = await session .execute (
143+ select (Entity )
144+ .where (
184145 Entity .file_path == entity .file_path , Entity .project_id == entity .project_id
185146 )
147+ .options (* self .get_load_options ())
186148 )
187- race_condition_entity = existing_by_path_check .scalar_one_or_none ()
149+ existing_entity = existing_result .scalar_one_or_none ()
188150
189- if race_condition_entity :
190- # Race condition: file_path conflict detected after our initial check
191- # Update the existing entity instead
151+ if existing_entity :
152+ # File path conflict - update the existing entity
192153 for key , value in {
193154 "title" : entity .title ,
194155 "entity_type" : entity .entity_type ,
@@ -198,25 +159,22 @@ async def upsert_entity(self, entity: Entity) -> Entity:
198159 "checksum" : entity .checksum ,
199160 "updated_at" : entity .updated_at ,
200161 }.items ():
201- setattr (race_condition_entity , key , value )
202-
203- await session .flush ()
204- # Return the updated entity with relationships loaded
205- query = (
206- self .select ()
207- .where (Entity .file_path == entity .file_path )
208- .options (* self .get_load_options ())
209- )
210- result = await session .execute (query )
211- found = result .scalar_one_or_none ()
212- if not found : # pragma: no cover
213- raise RuntimeError (
214- f"Failed to retrieve entity after race condition update: { entity .file_path } "
215- )
216- return found
162+ setattr (existing_entity , key , value )
163+
164+ # Clear and re-add observations
165+ existing_entity .observations .clear ()
166+ for obs in entity .observations :
167+ obs .entity_id = existing_entity .id
168+ existing_entity .observations .append (obs )
169+
170+ await session .commit ()
171+ return existing_entity
172+
217173 else :
218- # Must be permalink conflict - generate unique permalink
219- return await self ._handle_permalink_conflict (entity , session )
174+ # No file_path conflict - must be permalink conflict
175+ # Generate unique permalink and retry
176+ entity = await self ._handle_permalink_conflict (entity , session )
177+ return entity
220178
221179 async def _handle_permalink_conflict (self , entity : Entity , session : AsyncSession ) -> Entity :
222180 """Handle permalink conflicts by generating a unique permalink."""
@@ -237,18 +195,7 @@ async def _handle_permalink_conflict(self, entity: Entity, session: AsyncSession
237195 break
238196 suffix += 1
239197
240- # Insert with unique permalink (no conflict possible now)
198+ # Insert with unique permalink
241199 session .add (entity )
242200 await session .flush ()
243-
244- # Return the inserted entity with relationships loaded
245- query = (
246- self .select ()
247- .where (Entity .file_path == entity .file_path )
248- .options (* self .get_load_options ())
249- )
250- result = await session .execute (query )
251- found = result .scalar_one_or_none ()
252- if not found : # pragma: no cover
253- raise RuntimeError (f"Failed to retrieve entity after insert: { entity .file_path } " )
254- return found
201+ return entity
0 commit comments