1- # Copyright (c) Microsoft Corporation.
2- # Licensed under the MIT License.
3-
41import logging
52import json
63import os
74from azure .ai .textanalytics .aio import TextAnalyticsClient
85from azure .core .exceptions import HttpResponseError
9- from azure .core .credentials import AzureKeyCredential
10- import asyncio
116from azure .identity import DefaultAzureCredential
12- from environment import IdentityType , get_identity_type
7+ from tenacity import retry
8+ from tenacity .stop import stop_after_attempt
9+ from tenacity .wait import wait_exponential
10+ import asyncio
1311
1412MAX_TEXT_ELEMENTS = 5120
1513
1614
17- def split_document (document : str , max_size : int ) -> list [ str ] :
18- """Split a document into chunks of max_size.
15+ def split_document (document , max_size ) :
16+ """Split a document into chunks of max_size and filter out any empty strings
1917
2018 Args:
2119 document (str): The document to split.
22- max_size (int): The maximum size of each chunk."""
23- return [document [i : i + max_size ] for i in range (0 , len (document ), max_size )]
20+ max_size (int): The maximum size of each chunk.
2421
25-
26- async def extract_key_phrases_from_text (
27- data : list [str ], max_key_phrase_count : int , retries_left : int = 3
22+ Returns:
23+ list: The list of document chunks."""
24+ return [
25+ document [i : i + max_size ]
26+ for i in range (0 , len (document ), max_size )
27+ if len (document [i : i + max_size ]) > 0
28+ ]
29+
30+
31+ @retry (
32+ reraise = True ,
33+ stop = stop_after_attempt (3 ),
34+ wait = wait_exponential (multiplier = 1 , min = 1 , max = 10 ),
35+ )
36+ async def extract_key_phrases_from_batch (
37+ batch_data : list [str ], max_key_phrase_count : int
2838) -> list [str ]:
29- """Extract key phrases from the text.
39+ """Extract key phrases from text using Azure AI services .
3040
3141 Args:
32- data (list[str]): The text data .
33- max_key_phrase_count (int): The maximum number of key phrases to return.
42+ batch_data (list[str]): The list of text to process .
43+ max_key_phrase_count(int): no of keywords to return
3444
3545 Returns:
36- list[str]: The key phrases extracted from the text."""
37- logging .info ("Python HTTP trigger function processed a request." )
46+ list: The list of key phrases."""
3847
3948 key_phrase_list = []
4049
41- if get_identity_type () == IdentityType .SYSTEM_ASSIGNED :
42- credential = DefaultAzureCredential ()
43- elif get_identity_type () == IdentityType .USER_ASSIGNED :
44- credential = DefaultAzureCredential (
45- managed_identity_client_id = os .environ .get ("FunctionApp__ClientId" )
46- )
47- else :
48- credential = AzureKeyCredential (os .environ .get ("AIService__Language__Key" ))
4950 text_analytics_client = TextAnalyticsClient (
50- endpoint = os .environ .get ("AIService__Language__Endpoint" ),
51- credential = credential ,
51+ endpoint = os .environ ["AIService__Services__Endpoint" ],
52+ credential = DefaultAzureCredential (
53+ managed_identity_client_id = os .environ .get ("FunctionApp__ClientId" )
54+ ),
5255 )
5356
5457 async with text_analytics_client :
5558 try :
56- # Split large documents
57- split_documents = []
58- for doc in data :
59- if len (doc ) > MAX_TEXT_ELEMENTS :
60- split_documents .extend (split_document (doc , MAX_TEXT_ELEMENTS ))
61- else :
62- split_documents .append (doc )
63-
64- result = await text_analytics_client .extract_key_phrases (split_documents )
65- for idx , doc in enumerate (result ):
59+ result = await text_analytics_client .extract_key_phrases (batch_data )
60+ for doc in result :
6661 if not doc .is_error :
6762 key_phrase_list .extend (doc .key_phrases [:max_key_phrase_count ])
6863 else :
69- raise Exception (f"Document { idx } error: { doc .error } " )
64+ raise Exception (f"Document error: { doc .error } " )
7065 except HttpResponseError as e :
71- if e .status_code == 429 and retries_left > 0 : # Rate limiting error
72- wait_time = 2 ** retries_left # Exponential backoff
73- logging .info (
74- "%s Rate limit exceeded. Retrying in %s seconds..." , e , wait_time
75- )
76- await asyncio .sleep (wait_time )
77- return await extract_key_phrases_from_text (
78- data , max_key_phrase_count , retries_left - 1
79- )
80- else :
81- raise Exception (f"An error occurred: { e } " ) from e
66+ logging .error ("An error occurred: %s" , e )
67+ raise e
68+
69+ return key_phrase_list
70+
71+
72+ async def extract_key_phrases_from_text (
73+ data : list [str ], max_key_phrase_count : int
74+ ) -> list [str ]:
75+ """Extract key phrases from text using Azure AI services.
76+
77+ Args:
78+ data (list[str]): The list of text to process.
79+ max_key_phrase_count(int): no of keywords to return"""
80+ logging .info ("Python HTTP trigger function processed a request." )
81+ key_phrase_list = []
82+
83+ split_documents = []
84+ for doc in data :
85+ if len (doc ) > MAX_TEXT_ELEMENTS :
86+ split_documents .extend (split_document (doc , MAX_TEXT_ELEMENTS ))
87+ elif len (doc ) > 0 :
88+ split_documents .append (doc )
89+
90+ # Filter out any empty documents
91+ split_documents = [doc for doc in split_documents if len (doc ) > 0 ]
92+
93+ for i in range (0 , len (split_documents ), 10 ):
94+ key_phrase_list .extend (
95+ await extract_key_phrases_from_batch (
96+ split_documents [i : i + 10 ], max_key_phrase_count
97+ )
98+ )
99+
100+ if len (key_phrase_list ) > max_key_phrase_count :
101+ key_phrase_list = key_phrase_list [:max_key_phrase_count ]
102+ break
82103
83104 return key_phrase_list
84105
@@ -105,26 +126,40 @@ async def process_key_phrase_extraction(
105126 "errors" : None ,
106127 "warnings" : None ,
107128 }
108- extracted_record ["data" ]["key_phrases " ] = await extract_key_phrases_from_text (
129+ extracted_record ["data" ]["keyPhrases " ] = await extract_key_phrases_from_text (
109130 [record ["data" ]["text" ]], max_key_phrase_count
110131 )
111- except Exception as inner_e :
112- logging .error ("key phrase extraction Error: %s" , inner_e )
113- logging .error (
114- "Failed to extract key phrase. Check function app logs for more details of exact failure."
115- )
116- return {
117- "recordId" : record ["recordId" ],
118- "data" : {},
119- "errors" : [
120- {
121- "message" : "Failed to extract key phrase. Check function app logs for more details of exact failure."
122- }
123- ],
124- "warnings" : None ,
125- }
126- else :
127- json_str = json .dumps (extracted_record , indent = 4 )
128-
129- logging .info (f"key phrase extraction output: { json_str } " )
130- return extracted_record
132+ except Exception as e :
133+ logging .error ("key phrase extraction Error: %s" , e )
134+ await asyncio .sleep (10 )
135+ try :
136+ extracted_record = {
137+ "recordId" : record ["recordId" ],
138+ "data" : {},
139+ "errors" : None ,
140+ "warnings" : None ,
141+ }
142+ extracted_record ["data" ][
143+ "keyPhrases"
144+ ] = await extract_key_phrases_from_text (
145+ [record ["data" ]["text" ]], max_key_phrase_count
146+ )
147+ except Exception as inner_e :
148+ logging .error ("key phrase extraction Error: %s" , inner_e )
149+ logging .error (
150+ "Failed to extract key phrase. Check function app logs for more details of exact failure."
151+ )
152+ return {
153+ "recordId" : record ["recordId" ],
154+ "data" : {},
155+ "errors" : [
156+ {
157+ "message" : "Failed to extract key phrase. Check function app logs for more details of exact failure."
158+ }
159+ ],
160+ "warnings" : None ,
161+ }
162+ json_str = json .dumps (extracted_record , indent = 4 )
163+
164+ logging .info (f"key phrase extraction output: { json_str } " )
165+ return extracted_record
0 commit comments