11# Copyright (c) Microsoft Corporation.
22# Licensed under the MIT License.
3-
43import logging
54import json
65import os
76from azure .ai .textanalytics .aio import TextAnalyticsClient
87from azure .core .exceptions import HttpResponseError
9- from azure .core .credentials import AzureKeyCredential
10- import asyncio
118from azure .identity import DefaultAzureCredential
12- from environment import IdentityType , get_identity_type
9+ from tenacity import retry
10+ from tenacity .stop import stop_after_attempt
11+ from tenacity .wait import wait_exponential
12+ import asyncio
1313
1414MAX_TEXT_ELEMENTS = 5120
1515
1616
17- def split_document (document : str , max_size : int ) -> list [ str ] :
18- """Split a document into chunks of max_size.
17+ def split_document (document , max_size ) :
18+ """Split a document into chunks of max_size and filter out any empty strings
1919
2020 Args:
2121 document (str): The document to split.
22- max_size (int): The maximum size of each chunk."""
23- return [document [i : i + max_size ] for i in range (0 , len (document ), max_size )]
22+ max_size (int): The maximum size of each chunk.
2423
25-
26- async def extract_key_phrases_from_text (
27- data : list [str ], max_key_phrase_count : int , retries_left : int = 3
24+ Returns:
25+ list: The list of document chunks."""
26+ return [
27+ document [i : i + max_size ]
28+ for i in range (0 , len (document ), max_size )
29+ if len (document [i : i + max_size ]) > 0
30+ ]
31+
32+
33+ @retry (
34+ reraise = True ,
35+ stop = stop_after_attempt (3 ),
36+ wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
37+ )
38+ async def extract_key_phrases_from_batch (
39+ batch_data : list [str ], max_key_phrase_count : int
2840) -> list [str ]:
29- """Extract key phrases from the text.
41+ """Extract key phrases from text using Azure AI services .
3042
3143 Args:
32- data (list[str]): The text data .
33- max_key_phrase_count (int): The maximum number of key phrases to return.
44+ batch_data (list[str]): The list of text to process .
45+ max_key_phrase_count(int): no of keywords to return
3446
3547 Returns:
36- list[str]: The key phrases extracted from the text."""
37- logging .info ("Python HTTP trigger function processed a request." )
48+ list: The list of key phrases."""
3849
3950 key_phrase_list = []
4051
41- if get_identity_type () == IdentityType .SYSTEM_ASSIGNED :
42- credential = DefaultAzureCredential ()
43- elif get_identity_type () == IdentityType .USER_ASSIGNED :
44- credential = DefaultAzureCredential (
45- managed_identity_client_id = os .environ .get ("FunctionApp__ClientId" )
46- )
47- else :
48- credential = AzureKeyCredential (os .environ .get ("AIService__Language__Key" ))
4952 text_analytics_client = TextAnalyticsClient (
50- endpoint = os .environ .get ("AIService__Language__Endpoint" ),
51- credential = credential ,
53+ endpoint = os .environ ["AIService__Services__Endpoint" ],
54+ credential = DefaultAzureCredential (
55+ managed_identity_client_id = os .environ .get ("FunctionApp__ClientId" )
56+ ),
5257 )
5358
5459 async with text_analytics_client :
5560 try :
56- # Split large documents
57- split_documents = []
58- for doc in data :
59- if len (doc ) > MAX_TEXT_ELEMENTS :
60- split_documents .extend (split_document (doc , MAX_TEXT_ELEMENTS ))
61- else :
62- split_documents .append (doc )
63-
64- result = await text_analytics_client .extract_key_phrases (split_documents )
65- for idx , doc in enumerate (result ):
61+ result = await text_analytics_client .extract_key_phrases (batch_data )
62+ for doc in result :
6663 if not doc .is_error :
6764 key_phrase_list .extend (doc .key_phrases [:max_key_phrase_count ])
6865 else :
69- raise Exception (f"Document { idx } error: { doc .error } " )
66+ raise Exception (f"Document error: { doc .error } " )
7067 except HttpResponseError as e :
71- if e .status_code == 429 and retries_left > 0 : # Rate limiting error
72- wait_time = 2 ** retries_left # Exponential backoff
73- logging .info (
74- "%s Rate limit exceeded. Retrying in %s seconds..." , e , wait_time
75- )
76- await asyncio .sleep (wait_time )
77- return await extract_key_phrases_from_text (
78- data , max_key_phrase_count , retries_left - 1
79- )
80- else :
81- raise Exception (f"An error occurred: { e } " ) from e
68+ logging .error ("An error occurred: %s" , e )
69+ raise e
70+
71+ return key_phrase_list
72+
73+
74+ async def extract_key_phrases_from_text (
75+ data : list [str ], max_key_phrase_count : int
76+ ) -> list [str ]:
77+ """Extract key phrases from text using Azure AI services.
78+
79+ Args:
80+ data (list[str]): The list of text to process.
81+ max_key_phrase_count(int): no of keywords to return"""
82+ logging .info ("Python HTTP trigger function processed a request." )
83+ key_phrase_list = []
84+
85+ split_documents = []
86+ for doc in data :
87+ if len (doc ) > MAX_TEXT_ELEMENTS :
88+ split_documents .extend (split_document (doc , MAX_TEXT_ELEMENTS ))
89+ elif len (doc ) > 0 :
90+ split_documents .append (doc )
91+
92+ # Filter out any empty documents
93+ split_documents = [doc for doc in split_documents if len (doc ) > 0 ]
94+
95+ for i in range (0 , len (split_documents ), 10 ):
96+ key_phrase_list .extend (
97+ await extract_key_phrases_from_batch (
98+ split_documents [i : i + 10 ], max_key_phrase_count
99+ )
100+ )
101+
102+ if len (key_phrase_list ) > max_key_phrase_count :
103+ key_phrase_list = key_phrase_list [:max_key_phrase_count ]
104+ break
82105
83106 return key_phrase_list
84107
@@ -105,26 +128,40 @@ async def process_key_phrase_extraction(
105128 "errors" : None ,
106129 "warnings" : None ,
107130 }
108- extracted_record ["data" ]["key_phrases " ] = await extract_key_phrases_from_text (
131+ extracted_record ["data" ]["keyPhrases " ] = await extract_key_phrases_from_text (
109132 [record ["data" ]["text" ]], max_key_phrase_count
110133 )
111- except Exception as inner_e :
112- logging .error ("key phrase extraction Error: %s" , inner_e )
113- logging .error (
114- "Failed to extract key phrase. Check function app logs for more details of exact failure."
115- )
116- return {
117- "recordId" : record ["recordId" ],
118- "data" : {},
119- "errors" : [
120- {
121- "message" : "Failed to extract key phrase. Check function app logs for more details of exact failure."
122- }
123- ],
124- "warnings" : None ,
125- }
126- else :
127- json_str = json .dumps (extracted_record , indent = 4 )
128-
129- logging .info (f"key phrase extraction output: { json_str } " )
130- return extracted_record
134+ except Exception as e :
135+ logging .error ("key phrase extraction Error: %s" , e )
136+ await asyncio .sleep (10 )
137+ try :
138+ extracted_record = {
139+ "recordId" : record ["recordId" ],
140+ "data" : {},
141+ "errors" : None ,
142+ "warnings" : None ,
143+ }
144+ extracted_record ["data" ][
145+ "keyPhrases"
146+ ] = await extract_key_phrases_from_text (
147+ [record ["data" ]["text" ]], max_key_phrase_count
148+ )
149+ except Exception as inner_e :
150+ logging .error ("key phrase extraction Error: %s" , inner_e )
151+ logging .error (
152+ "Failed to extract key phrase. Check function app logs for more details of exact failure."
153+ )
154+ return {
155+ "recordId" : record ["recordId" ],
156+ "data" : {},
157+ "errors" : [
158+ {
159+ "message" : "Failed to extract key phrase. Check function app logs for more details of exact failure."
160+ }
161+ ],
162+ "warnings" : None ,
163+ }
164+ json_str = json .dumps (extracted_record , indent = 4 )
165+
166+ logging .info (f"key phrase extraction output: { json_str } " )
167+ return extracted_record
0 commit comments