Skip to content

Commit e54f94b

Browse files
authored
Merge pull request #1104 from NASA-IMPACT/branch_#1097
Pagination on the Sinequa sql.engine Api
2 parents 52b26a0 + b91405d commit e54f94b

File tree

4 files changed

+371
-87
lines changed

4 files changed

+371
-87
lines changed

sde_collections/sinequa_api.py

Lines changed: 122 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from collections.abc import Iterator
23
from typing import Any
34

45
import requests
@@ -61,15 +62,14 @@ class Api:
6162
def __init__(self, server_name: str = None, user: str = None, password: str = None, token: str = None) -> None:
6263
self.server_name = server_name
6364
if server_name not in server_configs:
64-
raise ValueError(f"Server name '{server_name}' is not in server_configs")
65+
raise ValueError(f"Invalid server configuration: '{server_name}' is not a recognized server name")
6566

6667
self.config = server_configs[server_name]
6768
self.app_name: str = self.config["app_name"]
6869
self.query_name: str = self.config["query_name"]
6970
self.base_url: str = self.config["base_url"]
7071
self.dev_servers = ["xli", "lrm_dev", "lrm_qa"]
7172

72-
# Store provided values only
7373
self._provided_user = user
7474
self._provided_password = password
7575
self._provided_token = token
@@ -113,7 +113,8 @@ def query(self, page: int, collection_config_folder: str | None = None, source:
113113
password = self._get_password()
114114
if not user or not password:
115115
raise ValueError(
116-
"User and password are required for the query endpoint on the following servers: {self.dev_servers}"
116+
f"Authentication error: Missing credentials for dev server '{self.server_name}'. "
117+
f"Both username and password are required for servers: {', '.join(self.dev_servers)}"
117118
)
118119
authentication = f"?Password={password}&User={user}"
119120
url = f"{url}{authentication}"
@@ -135,11 +136,22 @@ def query(self, page: int, collection_config_folder: str | None = None, source:
135136

136137
return self.process_response(url, payload)
137138

138-
def sql_query(self, sql: str) -> Any:
139-
"""Executes an SQL query on the configured server using token-based authentication."""
139+
def _execute_sql_query(self, sql: str) -> dict:
140+
"""
141+
Executes a SQL query against the Sinequa API.
142+
143+
Args:
144+
sql (str): The SQL query to execute
145+
146+
Returns:
147+
dict: The JSON response from the API containing 'Rows' and 'TotalRowCount'
148+
149+
Raises:
150+
ValueError: If no token is available for authentication
151+
"""
140152
token = self._get_token()
141153
if not token:
142-
raise ValueError("A token is required to use the SQL endpoint")
154+
raise ValueError("Authentication error: Token is required for SQL endpoint access")
143155

144156
url = f"{self.base_url}/api/v1/engine.sql"
145157
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
@@ -153,42 +165,120 @@ def sql_query(self, sql: str) -> Any:
153165

154166
return self.process_response(url, headers=headers, raw_data=raw_payload)
155167

156-
def get_full_texts(self, collection_config_folder: str, source: str = None) -> Any:
168+
def _process_rows_to_records(self, rows: list) -> list[dict]:
157169
"""
158-
Retrieves the full texts, URLs, and titles for a specified collection.
170+
Converts raw SQL row data into structured record dictionaries.
171+
172+
Args:
173+
rows (list): List of rows, where each row is [url, full_text, title]
159174
160175
Returns:
161-
dict: A JSON response containing the results of the SQL query,
162-
where each item has 'url', 'text', and 'title'.
163-
164-
Example:
165-
Calling get_full_texts("example_collection") might return:
166-
[
167-
{
168-
'url': 'http://example.com/article1',
169-
'text': 'Here is the full text of the first article...',
170-
'title': 'Article One Title'
171-
},
172-
{
173-
'url': 'http://example.com/article2',
174-
'text': 'Here is the full text of the second article...',
175-
'title': 'Article Two Title'
176-
}
177-
]
176+
list[dict]: List of processed records with url, full_text, and title keys
177+
178+
Raises:
179+
ValueError: If any row doesn't contain exactly 3 elements
180+
"""
181+
processed_records = []
182+
for idx, row in enumerate(rows):
183+
if len(row) != 3:
184+
raise ValueError(
185+
f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). "
186+
f"Received {len(row)} elements."
187+
)
188+
processed_records.append({"url": row[0], "full_text": row[1], "title": row[2]})
189+
return processed_records
190+
191+
def get_full_texts(self, collection_config_folder: str, source: str = None) -> Iterator[dict]:
192+
"""
193+
Retrieves and yields batches of text records from the SQL database for a given collection.
194+
Uses pagination to handle large datasets efficiently.
195+
196+
Args:
197+
collection_config_folder (str): The collection folder to query (e.g., "EARTHDATA", "SMD")
198+
source (str, optional): The source to query. If None, defaults to "scrapers" for dev servers
199+
or "SDE" for other servers.
200+
201+
Yields:
202+
list[dict]: Batches of records, where each record is a dictionary containing:
203+
{
204+
"url": str, # The URL of the document
205+
"full_text": str, # The full text content of the document
206+
"title": str # The title of the document
207+
}
208+
209+
Raises:
210+
ValueError: If the server's index is not defined in its configuration
211+
212+
Example batch:
213+
[
214+
{
215+
"url": "https://example.nasa.gov/doc1",
216+
"full_text": "This is the content of doc1...",
217+
"title": "Document 1 Title"
218+
},
219+
{
220+
"url": "https://example.nasa.gov/doc2",
221+
"full_text": "This is the content of doc2...",
222+
"title": "Document 2 Title"
223+
}
224+
]
225+
226+
Note:
227+
- Results are paginated in batches of 5000 records
228+
- Each batch is processed into clean dictionaries before being yielded
229+
- The iterator will stop when either:
230+
1. No more rows are returned from the query
231+
2. The total count of records has been reached
178232
"""
179233

180234
if not source:
181235
source = self._get_source_name()
182236

183237
if (index := self.config.get("index")) is None:
184-
raise ValueError("Index not defined for this server")
238+
raise ValueError(
239+
f"Configuration error: Index not defined for server '{self.server_name}'. "
240+
"Please update server configuration with the required index."
241+
)
185242

186243
sql = f"SELECT url1, text, title FROM {index} WHERE collection = '/{source}/{collection_config_folder}/'"
187-
full_text_response = self.sql_query(sql)
188-
return self._process_full_text_response(full_text_response)
244+
245+
page = 0
246+
page_size = 5000
247+
total_processed = 0
248+
249+
while True:
250+
paginated_sql = f"{sql} SKIP {total_processed} COUNT {page_size}"
251+
response = self._execute_sql_query(paginated_sql)
252+
253+
rows = response.get("Rows", [])
254+
if not rows: # Stop if we get an empty batch
255+
break
256+
257+
yield self._process_rows_to_records(rows)
258+
259+
total_processed += len(rows)
260+
total_count = response.get("TotalRowCount", 0)
261+
262+
if total_processed >= total_count: # Stop if we've processed all records
263+
break
264+
265+
page += 1
189266

190267
@staticmethod
191-
def _process_full_text_response(full_text_response: str):
192-
return [
193-
{"url": url, "full_text": full_text, "title": title} for url, full_text, title in full_text_response["Rows"]
194-
]
268+
def _process_full_text_response(batch_data: dict):
269+
if "Rows" not in batch_data or not isinstance(batch_data["Rows"], list):
270+
raise ValueError(
271+
"Invalid response format: Expected 'Rows' key with list data in Sinequa server response. "
272+
f"Received: {type(batch_data.get('Rows', None))}"
273+
)
274+
275+
processed_data = []
276+
for idx, row in enumerate(batch_data["Rows"]):
277+
if len(row) != 3:
278+
raise ValueError(
279+
f"Invalid row format at index {idx}: Expected exactly three elements (url, full_text, title). "
280+
f"Received {len(row)} elements."
281+
)
282+
url, full_text, title = row
283+
processed_data.append({"url": url, "full_text": full_text, "title": title})
284+
return processed_data

sde_collections/tasks.py

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from django.conf import settings
88
from django.core import management
99
from django.core.management.commands import loaddata
10-
from django.db import IntegrityError
10+
from django.db import transaction
1111

1212
from config import celery_app
1313

@@ -145,44 +145,46 @@ def resolve_title_pattern(title_pattern_id):
145145
title_pattern.apply()
146146

147147

148-
@celery_app.task
148+
@celery_app.task(soft_time_limit=600)
149149
def fetch_and_replace_full_text(collection_id, server_name):
150150
"""
151-
Task to fetch and replace full text and metadata for all URLs associated with a specified collection
152-
from a given server. This task deletes all existing DumpUrl entries for the collection and creates
153-
new entries based on the latest fetched data.
154-
155-
Args:
156-
collection_id (int): The identifier for the collection in the database.
157-
server_name (str): The name of the server.
158-
159-
Returns:
160-
str: A message indicating the result of the operation, including the number of URLs processed.
151+
Task to fetch and replace full text and metadata for a collection.
152+
Handles data in batches to manage memory usage.
161153
"""
162154
collection = Collection.objects.get(id=collection_id)
163155
api = Api(server_name)
164-
documents = api.get_full_texts(collection.config_folder)
165156

166-
# Step 1: Delete all existing DumpUrl entries for the collection
157+
# Step 1: Delete existing DumpUrl entries
167158
deleted_count, _ = DumpUrl.objects.filter(collection=collection).delete()
168-
169-
# Step 2: Create new DumpUrl entries from the fetched documents
170-
processed_count = 0
171-
for doc in documents:
172-
try:
173-
DumpUrl.objects.create(
174-
url=doc["url"],
175-
collection=collection,
176-
scraped_text=doc.get("full_text", ""),
177-
scraped_title=doc.get("title", ""),
178-
)
179-
processed_count += 1
180-
except IntegrityError:
181-
# Handle duplicate URL case if needed
182-
print(f"Duplicate URL found, skipping: {doc['url']}")
183-
184-
collection.migrate_dump_to_delta()
185-
186-
print(f"Processed {processed_count} new records.")
187-
188-
return f"Successfully processed {len(documents)} records and updated the database."
159+
print(f"Deleted {deleted_count} old records.")
160+
161+
# Step 2: Process data in batches
162+
total_processed = 0
163+
164+
try:
165+
for batch in api.get_full_texts(collection.config_folder):
166+
# Use bulk_create for efficiency, with a transaction per batch
167+
with transaction.atomic():
168+
DumpUrl.objects.bulk_create(
169+
[
170+
DumpUrl(
171+
url=record["url"],
172+
collection=collection,
173+
scraped_text=record["full_text"],
174+
scraped_title=record["title"],
175+
)
176+
for record in batch
177+
]
178+
)
179+
180+
total_processed += len(batch)
181+
print(f"Processed batch of {len(batch)} records. Total: {total_processed}")
182+
183+
# Step 3: Migrate dump URLs to delta URLs
184+
collection.migrate_dump_to_delta()
185+
186+
return f"Successfully processed {total_processed} records and updated the database."
187+
188+
except Exception as e:
189+
print(f"Error processing records: {str(e)}")
190+
raise

0 commit comments

Comments
 (0)