Skip to content

Commit bd71976

Browse files
authored
🐛 Fix the issue of commercial edition database creation failure. #772
2 parents c2bf5e4 + ca6eb85 commit bd71976

File tree

7 files changed

+162
-33
lines changed

7 files changed

+162
-33
lines changed

backend/apps/data_process_app.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from contextlib import asynccontextmanager
3-
from fastapi import HTTPException, APIRouter, Form, File, UploadFile
3+
from fastapi import HTTPException, APIRouter, Form, File, UploadFile, Header
4+
from typing import Optional
45
import base64
56
import io
67
import time
@@ -35,10 +36,10 @@ async def lifespan(app: APIRouter):
3536

3637

3738
@router.post("", response_model=TaskResponse, status_code=201)
38-
async def create_task(request: TaskRequest):
39+
async def create_task(request: TaskRequest, authorization: Optional[str] = Header(None)):
3940
"""
4041
Create a new data processing task (Process → Forward chain)
41-
42+
4243
Returns task ID immediately. Processing happens in the background.
4344
Tasks are forwarded to Elasticsearch when complete.
4445
"""
@@ -50,7 +51,8 @@ async def create_task(request: TaskRequest):
5051
source_type=request.source_type,
5152
chunking_strategy=request.chunking_strategy,
5253
index_name=request.index_name,
53-
original_filename=request.original_filename
54+
original_filename=request.original_filename,
55+
authorization=authorization
5456
)
5557

5658
return TaskResponse(task_id=task_result.id)
@@ -114,7 +116,7 @@ async def process_sync_endpoint(
114116

115117

116118
@router.post("/batch", response_model=BatchTaskResponse, status_code=201)
117-
async def create_batch_tasks(request: BatchTaskRequest):
119+
async def create_batch_tasks(request: BatchTaskRequest, authorization: Optional[str] = Header(None)):
118120
"""
119121
Create multiple data processing tasks at once (individual Process → Forward chains)
120122
@@ -147,7 +149,8 @@ async def create_batch_tasks(request: BatchTaskRequest):
147149
source_type=source_type,
148150
chunking_strategy=chunking_strategy,
149151
index_name=index_name,
150-
original_filename=original_filename
152+
original_filename=original_filename,
153+
authorization=authorization
151154
)
152155

153156
task_ids.append(task_result.id)

backend/apps/file_management_app.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ async def process_files(
150150
files: List[dict] = Body(..., description="List of file details to process, including path_or_url and filename"),
151151
chunking_strategy: Optional[str] = Body("basic"),
152152
index_name: str = Body(...),
153-
destination: str = Body(...)
153+
destination: str = Body(...),
154+
authorization: Optional[str] = Header(None)
154155
):
155156
"""
156157
Trigger data processing for a list of uploaded files.
@@ -162,7 +163,8 @@ async def process_files(
162163
process_params = ProcessParams(
163164
chunking_strategy=chunking_strategy,
164165
source_type=destination,
165-
index_name=index_name
166+
index_name=index_name,
167+
authorization=authorization
166168
)
167169

168170
process_result = await trigger_data_process(files, process_params)

backend/consts/model.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class ProcessParams(BaseModel):
191191
chunking_strategy: Optional[str] = "basic"
192192
source_type: str
193193
index_name: str
194+
authorization: Optional[str] = None
194195

195196

196197
class OpinionRequest(BaseModel):

backend/data_process/tasks.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ def process(self, source: str, source_type: str,
280280
@app.task(bind=True, base=LoggingTask, name='data_process.tasks.forward', queue='forward_q')
281281
def forward(self, processed_data: Dict, index_name: str,
282282
source: str, source_type: str = 'minio',
283-
original_filename: Optional[str] = None) -> Dict:
283+
original_filename: Optional[str] = None, authorization: Optional[str] = None) -> Dict:
284284
"""
285285
Vectorize and store processed chunks in Elasticsearch
286286
@@ -290,6 +290,7 @@ def forward(self, processed_data: Dict, index_name: str,
290290
source: Original source path (for metadata)
291291
source_type: The type of the source("local", "minio")
292292
original_filename: The original name of the file
293+
authorization: Authorization header for API calls
293294
294295
Returns:
295296
Dict containing storage results and metadata
@@ -381,6 +382,8 @@ async def index_documents():
381382
route_url = f"/indices/{original_index_name}/documents"
382383
full_url = elasticsearch_url + route_url
383384
headers = {"Content-Type": "application/json"}
385+
if authorization:
386+
headers["Authorization"] = authorization
384387

385388
max_retries = 5
386389
retry_delay = 5
@@ -540,8 +543,9 @@ async def index_documents():
540543
raise
541544

542545
@app.task(bind=True, base=LoggingTask, name='data_process.tasks.process_and_forward')
543-
def process_and_forward(self, source: str, source_type: str,
544-
chunking_strategy: str, index_name: Optional[str] = None, original_filename: Optional[str] = None) -> str:
546+
def process_and_forward(self, source: str, source_type: str,
547+
chunking_strategy: str, index_name: Optional[str] = None,
548+
original_filename: Optional[str] = None, authorization: Optional[str] = None) -> str:
545549
"""
546550
Combined task that chains processing and forwarding
547551
@@ -553,6 +557,7 @@ def process_and_forward(self, source: str, source_type: str,
553557
chunking_strategy: Strategy for chunking the document
554558
index_name: Name of the index to store documents
555559
original_filename: The original name of the file
560+
authorization: Authorization header for API calls
556561
**params: Additional parameters
557562
558563
Returns:
@@ -573,7 +578,8 @@ def process_and_forward(self, source: str, source_type: str,
573578
index_name=index_name,
574579
source=source,
575580
source_type=source_type,
576-
original_filename=original_filename
581+
original_filename=original_filename,
582+
authorization=authorization
577583
).set(queue='forward_q')
578584
)
579585

backend/utils/file_management_utils.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ async def trigger_data_process(files: List[dict], process_params: ProcessParams)
3333
if not files:
3434
return None
3535

36+
# Build headers with authorization
37+
headers = {
38+
"Authorization": f"Bearer {process_params.authorization}"
39+
}
40+
3641
# Build source data list
3742
if len(files) == 1:
3843
# Single file request
@@ -47,7 +52,7 @@ async def trigger_data_process(files: List[dict], process_params: ProcessParams)
4752

4853
try:
4954
async with httpx.AsyncClient() as client:
50-
response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks", json=payload, timeout=30.0)
55+
response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks", headers=headers, json=payload, timeout=30.0)
5156

5257
if response.status_code == 201:
5358
return response.json()
@@ -79,7 +84,7 @@ async def trigger_data_process(files: List[dict], process_params: ProcessParams)
7984

8085
try:
8186
async with httpx.AsyncClient() as client:
82-
response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks/batch", json=payload, timeout=30.0)
87+
response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks/batch", headers=headers, json=payload, timeout=30.0)
8388

8489
if response.status_code == 201:
8590
return response.json()

test/backend/app/test_data_process_app.py

Lines changed: 65 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
from unittest.mock import patch, MagicMock, AsyncMock
99
import os
1010
import sys
11-
from fastapi import FastAPI, HTTPException, Form, Body
12-
from typing import Dict, Any
11+
from fastapi import FastAPI, HTTPException, Form, Body, Header
12+
from typing import Dict, Any, Optional
1313
from fastapi.testclient import TestClient
1414
from PIL import Image
1515
import pytest
@@ -127,9 +127,16 @@ def setup_mock_routes(self):
127127
"""
128128
@self.app.post("/tasks")
129129
@pytest.mark.asyncio
130-
async def create_task(request: Dict[str, Any] = Body(None)):
131-
# Simulate task creation
132-
task_result = self.process_and_forward.delay.return_value
130+
async def create_task(request: Dict[str, Any] = Body(None), authorization: Optional[str] = Header(None)):
131+
# Simulate task creation by actually calling the mock
132+
task_result = self.process_and_forward.delay(
133+
source=request.get("source"),
134+
source_type=request.get("source_type"),
135+
chunking_strategy=request.get("chunking_strategy"),
136+
index_name=request.get("index_name"),
137+
original_filename=request.get("original_filename"),
138+
authorization=authorization
139+
)
133140
return {"task_id": task_result.id}
134141

135142
@self.app.post("/tasks/process")
@@ -302,16 +309,67 @@ def test_create_task(self):
302309
"source_type": "file",
303310
"chunking_strategy": "basic",
304311
"index_name": self.index_name,
312+
"original_filename": "test-file.pdf",
305313
"additional_params": {"param1": "value1"}
306314
}
307315

308-
# Execute request
316+
# Execute request with authorization header
317+
response = self.client.post(
318+
"/tasks",
319+
json=request_data,
320+
headers={"Authorization": "Bearer test-token"}
321+
)
322+
323+
# Assert expectations
324+
self.assertEqual(response.status_code, 200) # Mock route defaults to 200
325+
self.assertEqual(response.json(), {"task_id": self.task_id})
326+
327+
# Verify that process_and_forward.delay was called with authorization parameter
328+
self.process_and_forward.delay.assert_called_once_with(
329+
source=self.source,
330+
source_type="file",
331+
chunking_strategy="basic",
332+
index_name=self.index_name,
333+
original_filename="test-file.pdf",
334+
authorization="Bearer test-token"
335+
)
336+
337+
def test_create_task_without_authorization(self):
338+
"""
339+
Test creating a new task without authorization header.
340+
Verifies that the endpoint works when authorization is not provided.
341+
"""
342+
# Set up mock
343+
mock_task = MagicMock()
344+
mock_task.id = self.task_id
345+
self.process_and_forward.delay.return_value = mock_task
346+
347+
# Test data
348+
request_data = {
349+
"source": self.source,
350+
"source_type": "file",
351+
"chunking_strategy": "basic",
352+
"index_name": self.index_name,
353+
"original_filename": "test-file.pdf"
354+
}
355+
356+
# Execute request without authorization header
309357
response = self.client.post("/tasks", json=request_data)
310358

311359
# Assert expectations
312-
self.assertEqual(response.status_code, 200) # FastAPI test client defaults to 200
360+
self.assertEqual(response.status_code, 200) # Mock route defaults to 200
313361
self.assertEqual(response.json(), {"task_id": self.task_id})
314362

363+
# Verify that process_and_forward.delay was called with None authorization
364+
self.process_and_forward.delay.assert_called_once_with(
365+
source=self.source,
366+
source_type="file",
367+
chunking_strategy="basic",
368+
index_name=self.index_name,
369+
original_filename="test-file.pdf",
370+
authorization=None
371+
)
372+
315373
def test_process_sync_endpoint_success(self):
316374
"""
317375
Test synchronous processing endpoint success case.

test/backend/app/test_file_management_app.py

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -122,20 +122,45 @@ async def test_upload_files_success(mock_files):
122122
assert len(response.json()["uploaded_filenames"]) == 2
123123

124124
@pytest.mark.asyncio
125-
async def test_upload_files_processing_error(mock_files):
126-
with patch("backend.apps.file_management_app.save_upload_file") as mock_save, \
127-
patch("backend.apps.file_management_app.trigger_data_process") as mock_trigger, \
128-
patch("backend.apps.file_management_app.upload_dir", Path("test_uploads")):
125+
async def test_process_files_success(mock_files):
126+
with patch("backend.apps.file_management_app.trigger_data_process") as mock_trigger:
129127

130-
# Configure mocks
131-
mock_save.return_value = asyncio.Future()
132-
mock_save.return_value.set_result(True)
128+
# Configure mock for successful processing - return result directly
129+
mock_trigger.return_value = {
130+
"task_id": "task_123",
131+
"status": "processing"
132+
}
133+
134+
# Create test client
135+
with TestClient(app) as client:
136+
response = client.post(
137+
"/file/process",
138+
json={
139+
"files": [
140+
{"path_or_url": "/test/path/test.txt", "filename": "test.txt"}
141+
],
142+
"chunking_strategy": "basic",
143+
"index_name": "test_index",
144+
"destination": "local"
145+
},
146+
headers={"authorization": "Bearer test_token"}
147+
)
133148

134-
# Use AsyncMock to properly handle async function mocking
135-
mock_trigger.side_effect = AsyncMock(return_value={
149+
# Assertions
150+
assert response.status_code == 201
151+
assert "message" in response.json()
152+
assert "process_tasks" in response.json()
153+
assert response.json()["message"] == "Files processing triggered successfully"
154+
155+
@pytest.mark.asyncio
156+
async def test_process_files_processing_error(mock_files):
157+
with patch("backend.apps.file_management_app.trigger_data_process") as mock_trigger:
158+
159+
# Configure mock for processing error - return result directly
160+
mock_trigger.return_value = {
136161
"status": "error",
137162
"message": "Processing failed"
138-
})
163+
}
139164

140165
# Create test client
141166
with TestClient(app) as client:
@@ -145,15 +170,44 @@ async def test_upload_files_processing_error(mock_files):
145170
"files": [
146171
{"path_or_url": "/test/path/test.txt", "filename": "test.txt"}
147172
],
148-
"chunking_strategy": "paragraph",
173+
"chunking_strategy": "basic",
149174
"index_name": "test_index",
150175
"destination": "local"
151-
}
176+
},
177+
headers={"authorization": "Bearer test_token"}
178+
)
179+
180+
# Assertions
181+
assert response.status_code == 500
182+
assert "error" in response.json()
183+
assert response.json()["error"] == "Processing failed"
184+
185+
@pytest.mark.asyncio
186+
async def test_process_files_none_result(mock_files):
187+
with patch("backend.apps.file_management_app.trigger_data_process") as mock_trigger:
188+
189+
# Configure mock to return None directly
190+
mock_trigger.return_value = None
191+
192+
# Create test client
193+
with TestClient(app) as client:
194+
response = client.post(
195+
"/file/process",
196+
json={
197+
"files": [
198+
{"path_or_url": "/test/path/test.txt", "filename": "test.txt"}
199+
],
200+
"chunking_strategy": "by_title",
201+
"index_name": "test_index",
202+
"destination": "minio"
203+
},
204+
headers={"authorization": "Bearer test_token"}
152205
)
153206

154207
# Assertions
155208
assert response.status_code == 500
156209
assert "error" in response.json()
210+
assert response.json()["error"] == "Data process service failed"
157211

158212
@pytest.mark.asyncio
159213
async def test_upload_files_no_files(mock_files):

0 commit comments

Comments
 (0)