Skip to content

Commit 6a687f4

Browse files
committed
data_process_complete
1 parent 37740da commit 6a687f4

File tree

10 files changed

+169
-174
lines changed

10 files changed

+169
-174
lines changed

Containerfile

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,11 @@ WORKDIR /app
3232
# Copy environment definition file first
3333
COPY environment.yml environment.yml
3434

35-
# Create Conda environment from environment.yml (excluding pip for now)
36-
RUN echo "Creating Conda environment AIGraphX (conda parts)..." && \
37-
conda env create -f environment.yml --name AIGraphX
38-
39-
# Extract pip requirements to a temporary file
40-
RUN echo "Extracting pip requirements..." && \
41-
grep -A 100 -- '- pip:' environment.yml | tail -n +2 | sed 's/^- *//' | sed 's/"//g' > /tmp/requirements_pip.txt
42-
43-
# Install pip dependencies using the temporary file inside the Conda env
44-
RUN echo "Installing pip dependencies..." && \
45-
conda run -n AIGraphX pip install --no-cache-dir -r /tmp/requirements_pip.txt && \
46-
rm /tmp/requirements_pip.txt # Clean up temporary file
35+
# Create Conda environment from environment.yml (Handles both Conda and Pip deps)
36+
RUN echo "Creating Conda environment AIGraphX from environment.yml..." && \
37+
conda env create -f environment.yml --name AIGraphX && \
38+
# Clean up conda caches after creating environment
39+
conda clean -afy
4740

4841
# --- Verification Step (depends on environment install) ---
4942
RUN echo "Verifying uvicorn installation in AIGraphX environment..." && \

aigraphx/repositories/neo4j_repo.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async def _execute_query(
8181
logger.error("Neo4j driver not available or invalid in _execute_query")
8282
raise ConnectionError("Neo4j driver is not available.")
8383

84-
async with self.driver.session() as session:
84+
async with self.driver.session(database=self.db_name) as session:
8585
try:
8686
# Use execute_write for automatic transaction management
8787
await session.execute_write(lambda tx: tx.run(query, parameters))
@@ -95,6 +95,22 @@ async def _execute_query(
9595
logger.error(f"Parameters: {parameters}")
9696
raise # Re-raise to indicate failure
9797

98+
async def reset_database(self) -> None:
99+
"""Clears all nodes and relationships from the Neo4j database."""
100+
if not self.driver or not hasattr(self.driver, "session"): # Basic check
101+
logger.error("Neo4j driver not available or invalid in reset_database")
102+
raise ConnectionError("Neo4j driver is not available.")
103+
104+
logger.warning("Executing query to delete all nodes and relationships...")
105+
query = "MATCH (n) DETACH DELETE n"
106+
try:
107+
# Use _execute_query to handle the transaction
108+
await self._execute_query(query)
109+
logger.info("Successfully cleared the Neo4j database.")
110+
except Exception as e:
111+
logger.error(f"Failed to clear Neo4j database: {e}")
112+
raise # Re-raise after logging
113+
98114
async def create_or_update_paper_node(
99115
self, pwc_id: str, title: Optional[str] = None
100116
) -> None:

compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ services:
5151
volumes:
5252
- .:/app:z # ':z' is important for SELinux systems like Fedora
5353
# Mount data and logs directories as named volumes
54-
- aigraphx_data:/app/data
54+
- ./data:/app/data
5555
- aigraphx_logs:/app/logs
5656
# Define dependencies: The app service depends on the database services
5757
depends_on:

data/faiss_index.bin

1.64 MB
Binary file not shown.

data/models_faiss.index

7.39 MB
Binary file not shown.

data/models_faiss_ids.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

data/papers_faiss_ids.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

data/pg_load_checkpoint.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3429
1+
5044

scripts/load_postgres.py

Lines changed: 79 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,9 @@ async def insert_pwc_relation(
295295
"""Inserts related items (tasks, datasets) for a paper."""
296296
if not items:
297297
return
298-
table_name = f"pwc_{relation_type}" # e.g., pwc_tasks, pwc_datasets
299-
column_name = f"{relation_type[:-1]}_name" # e.g., task_name, dataset_name
298+
# Correctly generate table and column names
299+
table_name = f"pwc_{relation_type}s" # e.g., pwc_tasks, pwc_datasets
300+
column_name = f"{relation_type}_name" # e.g., task_name, dataset_name
300301

301302
# Prepare data tuples: (paper_id, item_name)
302303
data_tuples = [(paper_id, item) for item in items]
@@ -372,69 +373,104 @@ async def insert_model_paper_link(
372373
async def process_batch(
373374
conn: psycopg.AsyncConnection, batch: List[Tuple[int, Dict[str, Any]]]
374375
) -> int:
375-
"""Processes a batch of model data within a single transaction."""
376+
"""Processes a batch of records within a single transaction."""
376377
processed_in_batch = 0
377-
# Start a transaction for the batch
378+
successful_lines_in_batch = 0 # Track successful lines within the batch
378379
async with conn.transaction():
379-
for line_num, model_record in batch:
380-
hf_model_id = model_record.get("hf_model_id")
381-
if not hf_model_id:
382-
logger.warning(f"Skipping line {line_num}: Missing 'hf_model_id'.")
383-
continue
384-
380+
logger.debug(f"Starting transaction for batch of {len(batch)} records.")
381+
for line_num, record in batch:
385382
try:
386-
# 1. Insert/Update HF Model
387-
await insert_hf_model(conn, model_record)
383+
# Assume record processing starts successfully unless exception occurs
384+
record_processed_successfully = True
385+
386+
# --- Process Hugging Face Model ---
387+
hf_model_id = record.get("hf_model_id")
388+
if hf_model_id:
389+
await insert_hf_model(conn, record)
390+
else:
391+
logger.warning(f"Record on line {line_num} missing hf_model_id.")
392+
record_processed_successfully = False # Mark as unsuccessful if critical ID missing
393+
# continue # Optionally skip further processing for this record
388394

389-
# 2. Process linked papers
390-
linked_papers = model_record.get("linked_papers", [])
395+
# --- Process Linked Papers (Iterate through the list) ---
396+
linked_papers = record.get("linked_papers", [])
391397
if not isinstance(linked_papers, list):
392-
logger.warning(
393-
f"Skipping papers for model {hf_model_id} on line {line_num}: 'linked_papers' is not a list."
394-
)
395-
linked_papers = []
398+
logger.warning(f"Record on line {line_num}: 'linked_papers' is not a list. Skipping paper processing.")
399+
linked_papers = [] # Treat as empty list
400+
401+
# Use a flag to track if *any* paper was successfully processed for this model record
402+
at_least_one_paper_processed = False
396403

397404
for paper_data in linked_papers:
398405
if not isinstance(paper_data, dict):
399-
logger.warning(
400-
f"Skipping invalid paper entry for model {hf_model_id} on line {line_num}: not a dictionary."
401-
)
402-
continue
406+
logger.warning(f"Skipping invalid paper entry for model {hf_model_id} on line {line_num}: not a dictionary.")
407+
continue # Skip this invalid paper entry
403408

404-
# 3. Get or Insert Paper
409+
# --- Process Single Paper (Get or Insert) ---
410+
# Pass the individual paper_data dictionary here
405411
paper_id = await get_or_insert_paper(conn, paper_data)
406412

413+
# --- Link Model and Paper (only if both exist) ---
414+
if hf_model_id and paper_id:
415+
await insert_model_paper_link(conn, hf_model_id, paper_id)
416+
at_least_one_paper_processed = True # Mark success if linked
417+
# Log cases where linking didn't happen (optional)
418+
# elif paper_id and not hf_model_id:
419+
# logger.debug(f"Paper on line {line_num} processed but no HF model ID.")
420+
# elif hf_model_id and not paper_id:
421+
# logger.debug(f"HF model {hf_model_id} on line {line_num} processed but paper insertion failed.")
422+
423+
# --- Process PWC Relations and Repositories (only if paper was successfully inserted/found) ---
407424
if paper_id:
408-
# 4. Link Model to Paper (Call updated function)
409-
await insert_model_paper_link(
410-
conn,
411-
hf_model_id,
412-
paper_id,
413-
)
414-
415-
# 5. Insert PWC Relations (Tasks, Datasets, Repos)
425+
# IMPORTANT: Get pwc_entry from paper_data, not the top-level record
416426
pwc_entry = paper_data.get("pwc_entry") or {}
417427
await insert_pwc_relation(
418-
conn, paper_id, "tasks", pwc_entry.get("tasks")
428+
conn, paper_id, "task", pwc_entry.get("tasks")
429+
)
430+
await insert_pwc_relation(
431+
conn, paper_id, "method", pwc_entry.get("methods")
419432
)
420433
await insert_pwc_relation(
421-
conn, paper_id, "datasets", pwc_entry.get("datasets")
434+
conn,
435+
paper_id,
436+
"dataset",
437+
pwc_entry.get("datasets_used"), # Assuming field name
422438
)
423-
# await insert_pwc_relation(conn, paper_id, "methods", pwc_entry.get("methods")) # Uncomment if methods are added
424439
await insert_pwc_repositories(
425440
conn, paper_id, pwc_entry.get("repositories")
426441
)
442+
at_least_one_paper_processed = True # Also mark success here
443+
else:
444+
# If get_or_insert_paper returned None, the paper processing failed for this entry
445+
logger.warning(f"Failed to get or insert paper for entry in linked_papers on line {line_num}. Paper data: {paper_data}")
446+
# Consider if this failure should mark the whole model record as failed
447+
# record_processed_successfully = False
448+
449+
# Increment the main counter only if the model record itself was deemed successful
450+
# (e.g., hf_model_id was present, and potentially if at least one paper linked if required)
451+
if record_processed_successfully: # Adjust this condition based on requirements
452+
processed_in_batch += 1
453+
successful_lines_in_batch += 1 # Increment success counter
454+
logger.debug(f"Successfully processed record from line {line_num} (including linked papers if any).")
455+
else:
456+
logger.warning(f"Marked record from line {line_num} as processed with errors/skips.")
457+
# Even if marked as error, we might count it towards the total lines *attempted* in the batch
458+
processed_in_batch += 1
427459

428-
processed_in_batch += 1
429460
except Exception as e:
430-
logger.error(
431-
f"Error processing record for model {hf_model_id} on line {line_num}: {e}"
432-
)
433-
logger.error(traceback.format_exc())
434-
# Decide whether to continue batch or raise error to rollback transaction
435-
# For robustness, log error and continue processing other records in batch
436-
437-
return processed_in_batch
461+
# Log detailed error including traceback and the problematic record line number
462+
tb_str = traceback.format_exc()
463+
logger.error(f"Error processing record from line {line_num}: {e}\\nRecord: {record}\\nTraceback:\\n{tb_str}")
464+
# Re-raise the exception to trigger the automatic rollback of conn.transaction()
465+
raise # This will rollback the *entire* batch
466+
467+
# If the 'with conn.transaction()' block completes without exceptions, commit is automatic.
468+
# If an exception occurs, rollback is automatic.
469+
logger.debug(f"Transaction for batch completed (Commit or Rollback occurred). Successfully processed {successful_lines_in_batch} lines in this attempt.")
470+
# Return the count of lines successfully processed within the transaction
471+
# If an exception caused rollback, this will be 0 from the perspective of the DB, but we return the count *attempted* before failure.
472+
# Let's return successful_lines_in_batch to be more accurate about what potentially committed.
473+
return successful_lines_in_batch
438474

439475

440476
async def main(input_file_path: str, reset_db: bool, reset_checkpoint: bool) -> None:

0 commit comments

Comments
 (0)