Skip to content

Commit a5fe6d4

Browse files
authored
Merge pull request #29 from ks6088ts-labs/feature/issue-28_queue-storage
add queue storage APIs
2 parents 017ba8b + f0eda74 commit a5fe6d4

File tree

9 files changed

+281
-1
lines changed

9 files changed

+281
-1
lines changed

azure_storage_queue.env.sample

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
AZURE_STORAGE_QUEUE_CONNECTION_STRING = "<connection-string>"

backend/fastapi.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from backend.routers import azure_event_grid as azure_event_grid_router
66
from backend.routers import azure_openai as azure_openai_router
77
from backend.routers import azure_storage as azure_storage_router
8+
from backend.routers import azure_storage_queue as azure_storage_queue_router
89
from backend.routers import document_intelligence as document_intelligence_router
910

1011
app = FastAPI(
@@ -16,6 +17,7 @@
1617
app.include_router(azure_storage_router.router)
1718
app.include_router(azure_ai_vision_router.router)
1819
app.include_router(azure_event_grid_router.router)
20+
app.include_router(azure_storage_queue_router.router)
1921

2022

2123
def custom_openapi():
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from logging import getLogger
2+
3+
from azure.core.paging import ItemPaged
4+
from azure.storage.queue import QueueMessage, QueueServiceClient
5+
6+
from backend.settings.azure_storage_queue import AzureStorageQueueSettings
7+
8+
logger = getLogger(__name__)
9+
10+
11+
class AzureStorageQueueClient:
12+
def __init__(self, settings: AzureStorageQueueSettings):
13+
self.client = QueueServiceClient.from_connection_string(settings.azure_storage_queue_connection_string)
14+
15+
def create_queue(
16+
self,
17+
queue_name: str,
18+
):
19+
queue_client = self.client.get_queue_client(queue_name)
20+
queue_client.create_queue()
21+
logger.info(f"Created queue {queue_name}")
22+
23+
def delete_queue(
24+
self,
25+
queue_name: str,
26+
):
27+
queue_client = self.client.get_queue_client(queue_name)
28+
queue_client.delete_queue()
29+
logger.info(f"Deleted queue {queue_name}")
30+
31+
def send_message(
32+
self,
33+
queue_name: str,
34+
message: str,
35+
) -> QueueMessage:
36+
queue_client = self.client.get_queue_client(queue_name)
37+
sent_message = queue_client.send_message(
38+
content=message,
39+
)
40+
logger.info(f"Sent message to queue {queue_name}")
41+
return sent_message
42+
43+
def get_queue_properties(
44+
self,
45+
queue_name: str,
46+
):
47+
queue_client = self.client.get_queue_client(queue_name)
48+
properties = queue_client.get_queue_properties()
49+
logger.info(f"Got properties of queue {queue_name}")
50+
return properties
51+
52+
def receive_messages(
53+
self,
54+
queue_name: str,
55+
max_messages: int = 1,
56+
) -> ItemPaged[QueueMessage]:
57+
queue_client = self.client.get_queue_client(queue_name)
58+
messages = queue_client.receive_messages(
59+
max_messages=max_messages,
60+
)
61+
logger.info(f"Received messages from queue {queue_name}")
62+
return messages
63+
64+
def delete_message(
65+
self,
66+
queue_name: str,
67+
message_id: str,
68+
pop_receipt: str,
69+
):
70+
queue_client = self.client.get_queue_client(queue_name)
71+
queue_client.delete_message(
72+
message=message_id,
73+
pop_receipt=pop_receipt,
74+
)
75+
logger.info(f"Deleted message from queue {queue_name}")
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from logging import getLogger
2+
3+
from fastapi import APIRouter
4+
from fastapi.responses import JSONResponse
5+
6+
from backend.internals.azure_storage_queue import AzureStorageQueueClient
7+
from backend.schemas import azure_storage_queue as azure_storage_queue_schemas
8+
from backend.settings.azure_storage_queue import AzureStorageQueueSettings
9+
10+
logger = getLogger(__name__)
11+
client = AzureStorageQueueClient(
12+
settings=AzureStorageQueueSettings(),
13+
)
14+
15+
router = APIRouter(
16+
prefix="/azure_storage_queue",
17+
tags=["azure_storage_queue"],
18+
responses={404: {"description": "Not found"}},
19+
)
20+
21+
22+
@router.post(
23+
"/queues/",
24+
response_model=azure_storage_queue_schemas.CreateQueueResponse,
25+
status_code=200,
26+
)
27+
async def create_queue(
28+
body: azure_storage_queue_schemas.CreateQueueRequest,
29+
):
30+
try:
31+
client.create_queue(
32+
queue_name=body.queue_name,
33+
)
34+
except Exception as e:
35+
logger.error(f"Failed to create queue: {e}")
36+
raise
37+
return azure_storage_queue_schemas.CreateQueueResponse(
38+
queue_name=body.queue_name,
39+
)
40+
41+
42+
@router.delete(
43+
"/queues/",
44+
response_model=azure_storage_queue_schemas.DeleteQueueResponse,
45+
status_code=200,
46+
)
47+
async def delete_queue(
48+
body: azure_storage_queue_schemas.DeleteQueueRequest,
49+
):
50+
try:
51+
client.delete_queue(
52+
queue_name=body.queue_name,
53+
)
54+
except Exception as e:
55+
logger.error(f"Failed to delete queue: {e}")
56+
raise
57+
return azure_storage_queue_schemas.DeleteQueueResponse(
58+
queue_name=body.queue_name,
59+
)
60+
61+
62+
@router.post(
63+
"/messages/",
64+
response_model=azure_storage_queue_schemas.SendMessageResponse,
65+
status_code=200,
66+
)
67+
async def send_message(
68+
body: azure_storage_queue_schemas.SendMessageRequest,
69+
):
70+
try:
71+
sent_message = client.send_message(
72+
queue_name=body.queue_name,
73+
message=body.message,
74+
)
75+
logger.info(f"Sent message: {sent_message}")
76+
except Exception as e:
77+
logger.error(f"Failed to send message: {e}")
78+
raise
79+
return azure_storage_queue_schemas.SendMessageResponse()
80+
81+
82+
@router.get(
83+
"/messages/",
84+
status_code=200,
85+
)
86+
async def receive_messages(
87+
queue_name: str,
88+
max_messages: int = 1,
89+
):
90+
try:
91+
messages = client.receive_messages(
92+
queue_name=queue_name,
93+
max_messages=max_messages,
94+
)
95+
logger.info(f"Received messages: {messages}")
96+
message_list = []
97+
for message in messages:
98+
message_list.append(
99+
{
100+
"id": message.id,
101+
"message": message.content,
102+
"pop_receipt": message.pop_receipt,
103+
}
104+
)
105+
except Exception as e:
106+
logger.error(f"Failed to receive messages: {e}")
107+
raise
108+
return JSONResponse(
109+
content=message_list,
110+
)
111+
112+
113+
@router.delete(
114+
"/messages/",
115+
response_model=azure_storage_queue_schemas.DeleteMessageResponse,
116+
status_code=200,
117+
)
118+
async def delete_message(
119+
body: azure_storage_queue_schemas.DeleteMessageRequest,
120+
):
121+
try:
122+
client.delete_message(
123+
queue_name=body.queue_name,
124+
message_id=body.message_id,
125+
pop_receipt=body.pop_receipt,
126+
)
127+
except Exception as e:
128+
logger.error(f"Failed to delete message: {e}")
129+
raise
130+
return azure_storage_queue_schemas.DeleteMessageResponse()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from logging import getLogger
2+
3+
from pydantic import BaseModel
4+
5+
logger = getLogger(__name__)
6+
7+
8+
class CreateQueueRequest(BaseModel):
9+
queue_name: str
10+
11+
12+
class CreateQueueResponse(BaseModel):
13+
queue_name: str
14+
15+
16+
class DeleteQueueRequest(BaseModel):
17+
queue_name: str
18+
19+
20+
class DeleteQueueResponse(BaseModel):
21+
queue_name: str
22+
23+
24+
class SendMessageRequest(BaseModel):
25+
queue_name: str
26+
message: str
27+
28+
29+
class SendMessageResponse(BaseModel):
30+
pass
31+
32+
33+
class DeleteMessageRequest(BaseModel):
34+
queue_name: str
35+
message_id: str
36+
pop_receipt: str
37+
38+
39+
class DeleteMessageResponse(BaseModel):
40+
pass
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from pydantic_settings import BaseSettings, SettingsConfigDict
2+
3+
4+
class AzureStorageQueueSettings(BaseSettings):
5+
azure_storage_queue_connection_string: str = "<connection-string>"
6+
7+
model_config = SettingsConfigDict(
8+
env_file="azure_storage_queue.env",
9+
env_file_encoding="utf-8",
10+
)

docs/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
- [FastAPI > UploadFile class](https://fastapi.tiangolo.com/reference/uploadfile/)
1515
- [FastAPI > Extending OpenAPI](https://fastapi.tiangolo.com/how-to/extending-openapi/)
1616
- [Get started with Azure Blob Storage and Python](https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blob-python-get-started?tabs=sas-token)
17+
- [Quickstart: Azure Queue Storage client library for Python](https://learn.microsoft.com/en-us/azure/storage/queues/storage-quickstart-queues-python?tabs=passwordless%2Croles-azure-portal%2Cenvironment-variable-windows%2Csign-in-azure-cli)
1718
- [Azure Event Grid client library for Python - version 4.19.0](https://learn.microsoft.com/en-us/python/api/overview/azure/eventgrid-readme?view=azure-python)
1819
- [Azure Event Grid Client Library Python Samples](https://learn.microsoft.com/en-us/samples/azure/azure-sdk-for-python/eventgrid-samples/)
1920

poetry.lock

Lines changed: 21 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ python-multipart = "^0.0.9"
3232
azure-storage-blob = "^12.19.1"
3333
azure-ai-vision-imageanalysis = "^1.0.0b2"
3434
azure-eventgrid = "^4.19.0"
35+
azure-storage-queue = "^12.9.0"
3536

3637

3738
[tool.poetry.group.frontend.dependencies]

0 commit comments

Comments
 (0)