@@ -49,7 +49,7 @@ async def make_snowflake_request(
49
49
url = f"{ SNOWFLAKE_BASE_URL } /{ endpoint } "
50
50
51
51
try :
52
- async with httpx .AsyncClient (timeout = 30.0 ) as client :
52
+ async with httpx .AsyncClient (timeout = 60.0 , http2 = True ) as client :
53
53
if method .upper () == "GET" :
54
54
response = await client .request (method , url , headers = headers , params = data )
55
55
else :
@@ -102,8 +102,41 @@ async def execute_snowflake_query(sql: str, snowflake_token: Optional[str] = Non
102
102
# Parse the response to extract data
103
103
if response and "data" in response :
104
104
logger .info (f"Successfully got { len (response ['data' ])} rows from Snowflake" )
105
+
106
+ all_data = response ["data" ]
107
+
108
+ # Check for pagination/partitions
109
+ metadata = response .get ('resultSetMetaData' , {})
110
+ partition_info = metadata .get ('partitionInfo' , [])
111
+
112
+ if len (partition_info ) > 1 :
113
+ logger .info (f"Found { len (partition_info )} partitions, fetching remaining data..." )
114
+
115
+ # Get the statement handle for pagination
116
+ statement_handle = response .get ('statementHandle' )
117
+ if statement_handle :
118
+ # Fetch remaining partitions
119
+ for partition_index in range (1 , len (partition_info )):
120
+ try :
121
+ partition_endpoint = f"statements/{ statement_handle } ?partition={ partition_index } "
122
+ partition_response = await make_snowflake_request (
123
+ partition_endpoint , "GET" , None , snowflake_token
124
+ )
125
+
126
+ if partition_response and "data" in partition_response :
127
+ partition_data = partition_response ["data" ]
128
+ logger .info (f"Fetched partition { partition_index } : { len (partition_data )} rows" )
129
+ all_data .extend (partition_data )
130
+ else :
131
+ logger .warning (f"Failed to fetch partition { partition_index } " )
132
+
133
+ except Exception as e :
134
+ logger .error (f"Error fetching partition { partition_index } : { e } " )
135
+
136
+ logger .info (f"Total rows after fetching all partitions: { len (all_data )} " )
137
+
105
138
success = True
106
- return response [ "data" ]
139
+ return all_data
107
140
elif response and "resultSet" in response :
108
141
# Handle different response formats
109
142
result_set = response ["resultSet" ]
@@ -155,7 +188,7 @@ async def get_issue_labels(issue_ids: List[str], snowflake_token: Optional[str]
155
188
156
189
sql = f"""
157
190
SELECT ISSUE, LABEL
158
- FROM JIRA_LABEL_RHAI
191
+ FROM { SNOWFLAKE_DATABASE } . { SNOWFLAKE_SCHEMA } . JIRA_LABEL_RHAI
159
192
WHERE ISSUE IN ({ ids_str } ) AND LABEL IS NOT NULL
160
193
"""
161
194
@@ -201,7 +234,7 @@ async def get_issue_comments(issue_ids: List[str], snowflake_token: Optional[str
201
234
202
235
sql = f"""
203
236
SELECT ID, ISSUEID, ROLELEVEL, BODY, CREATED, UPDATED
204
- FROM JIRA_COMMENT_NON_PII
237
+ FROM { SNOWFLAKE_DATABASE } . { SNOWFLAKE_SCHEMA } . JIRA_COMMENT_NON_PII
205
238
WHERE ISSUEID IN ({ ids_str } ) AND BODY IS NOT NULL
206
239
ORDER BY ISSUEID, CREATED ASC
207
240
"""
0 commit comments