@@ -183,10 +183,8 @@ worker_main(Datum main_arg)
183
183
184
184
while (!got_sigterm )
185
185
{
186
- StringInfoData select_query ;
187
- StringInfoData query_insert_response_ok ;
188
- StringInfoData query_insert_response_bad ;
189
- StringInfoData delete_query ;
186
+ int queue_query_rc ;
187
+ int ttl_query_rc ;
190
188
191
189
WaitLatch (& MyProc -> procLatch ,
192
190
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH ,
@@ -197,7 +195,7 @@ worker_main(Datum main_arg)
197
195
CHECK_FOR_INTERRUPTS ();
198
196
199
197
if (!is_extension_loaded ()){
200
- elog (DEBUG2 , "worker_main : extension not yet loaded" );
198
+ elog (DEBUG2 , "pg_net worker : extension not yet loaded" );
201
199
continue ;
202
200
}
203
201
@@ -211,20 +209,6 @@ worker_main(Datum main_arg)
211
209
PushActiveSnapshot (GetTransactionSnapshot ());
212
210
SPI_connect ();
213
211
214
- initStringInfo (& delete_query );
215
-
216
- appendStringInfo (& delete_query , "\
217
- WITH\
218
- rows AS (\
219
- SELECT ctid\
220
- FROM net._http_response\
221
- WHERE created < now() - $1\
222
- ORDER BY created\
223
- LIMIT $2\
224
- )\
225
- DELETE FROM net._http_response r\
226
- USING rows WHERE r.ctid = rows.ctid" );
227
-
228
212
{
229
213
int argCount = 2 ;
230
214
Oid argTypes [2 ];
@@ -236,27 +220,25 @@ worker_main(Datum main_arg)
236
220
argTypes [1 ] = INT4OID ;
237
221
argValues [1 ] = Int32GetDatum (batch_size );
238
222
239
- if (SPI_execute_with_args (delete_query .data , argCount , argTypes , argValues , NULL ,
240
- false, 0 ) != SPI_OK_DELETE )
223
+ ttl_query_rc = SPI_execute_with_args ("\
224
+ WITH\
225
+ rows AS (\
226
+ SELECT ctid\
227
+ FROM net._http_response\
228
+ WHERE created < now() - $1\
229
+ ORDER BY created\
230
+ LIMIT $2\
231
+ )\
232
+ DELETE FROM net._http_response r\
233
+ USING rows WHERE r.ctid = rows.ctid" ,
234
+ argCount , argTypes , argValues , NULL , false, 0 );
235
+
236
+ if (ttl_query_rc != SPI_OK_DELETE )
241
237
{
242
- elog (ERROR , "SPI_exec failed : %s" , delete_query . data );
238
+ ereport (ERROR , errmsg ( "Error expiring response table rows : %s" , SPI_result_code_string ( ttl_query_rc )) );
243
239
}
244
240
}
245
241
246
- initStringInfo (& select_query );
247
-
248
- appendStringInfo (& select_query , "\
249
- WITH\
250
- rows AS (\
251
- SELECT id\
252
- FROM net.http_request_queue\
253
- ORDER BY id\
254
- LIMIT $1\
255
- )\
256
- DELETE FROM net.http_request_queue q\
257
- USING rows WHERE q.id = rows.id\
258
- RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body" );
259
-
260
242
{
261
243
int argCount = 1 ;
262
244
Oid argTypes [1 ];
@@ -265,7 +247,20 @@ worker_main(Datum main_arg)
265
247
argTypes [0 ] = INT4OID ;
266
248
argValues [0 ] = Int32GetDatum (batch_size );
267
249
268
- if (SPI_execute_with_args (select_query .data , argCount , argTypes , argValues , NULL , false, 0 ) == SPI_OK_DELETE_RETURNING )
250
+ queue_query_rc = SPI_execute_with_args ("\
251
+ WITH\
252
+ rows AS (\
253
+ SELECT id\
254
+ FROM net.http_request_queue\
255
+ ORDER BY id\
256
+ LIMIT $1\
257
+ )\
258
+ DELETE FROM net.http_request_queue q\
259
+ USING rows WHERE q.id = rows.id\
260
+ RETURNING q.id, q.method, q.url, timeout_milliseconds, array(select key || ': ' || value from jsonb_each_text(q.headers)), q.body" ,
261
+ argCount , argTypes , argValues , NULL , false, 0 );
262
+
263
+ if (queue_query_rc == SPI_OK_DELETE_RETURNING )
269
264
{
270
265
bool tupIsNull = false;
271
266
@@ -315,7 +310,7 @@ worker_main(Datum main_arg)
315
310
}
316
311
else
317
312
{
318
- elog (ERROR , "SPI_exec failed : %s" , select_query . data );
313
+ ereport (ERROR , errmsg ( "Error getting http request queue : %s" , SPI_result_code_string ( queue_query_rc )) );
319
314
}
320
315
}
321
316
@@ -337,20 +332,13 @@ worker_main(Datum main_arg)
337
332
}
338
333
} while (still_running );
339
334
340
- initStringInfo (& query_insert_response_ok );
341
- appendStringInfo (& query_insert_response_ok , "\
342
- insert into net._http_response(id, status_code, content, headers, content_type, timed_out) values ($1, $2, $3, $4, $5, $6)" );
343
-
344
- initStringInfo (& query_insert_response_bad );
345
- appendStringInfo (& query_insert_response_bad , "\
346
- insert into net._http_response(id, error_msg) values ($1, $2)" );
347
-
348
335
while ((msg = curl_multi_info_read (cm , & msgs_left ))) {
349
336
if (msg -> msg == CURLMSG_DONE ) {
350
337
CURLcode return_code = msg -> data .result ;
351
338
eh = msg -> easy_handle ;
352
339
353
340
if (return_code != CURLE_OK ) {
341
+ int failed_query_rc ;
354
342
int argCount = 2 ;
355
343
Oid argTypes [2 ];
356
344
Datum argValues [2 ];
@@ -365,12 +353,16 @@ worker_main(Datum main_arg)
365
353
argTypes [1 ] = CSTRINGOID ;
366
354
argValues [1 ] = CStringGetDatum (error_msg );
367
355
368
- if (SPI_execute_with_args (query_insert_response_bad .data , argCount , argTypes , argValues , NULL ,
369
- false, 1 ) != SPI_OK_INSERT )
356
+ failed_query_rc = SPI_execute_with_args ("\
357
+ insert into net._http_response(id, error_msg) values ($1, $2)" ,
358
+ argCount , argTypes , argValues , NULL , false, 1 );
359
+
360
+ if (failed_query_rc != SPI_OK_INSERT )
370
361
{
371
- elog (ERROR , "SPI_exec failed: %s" , query_insert_response_bad . data );
362
+ ereport (ERROR , errmsg ( "Error when inserting failed response : %s" , SPI_result_code_string ( failed_query_rc )) );
372
363
}
373
364
} else {
365
+ int succ_query_rc ;
374
366
int argCount = 6 ;
375
367
Oid argTypes [6 ];
376
368
Datum argValues [6 ];
@@ -413,10 +405,13 @@ worker_main(Datum main_arg)
413
405
argValues [5 ] = BoolGetDatum (timedOut );
414
406
nulls [5 ] = ' ' ;
415
407
416
- if (SPI_execute_with_args (query_insert_response_ok .data , argCount , argTypes , argValues , nulls ,
417
- false, 1 ) != SPI_OK_INSERT )
408
+ succ_query_rc = SPI_execute_with_args ("\
409
+ insert into net._http_response(id, status_code, content, headers, content_type, timed_out) values ($1, $2, $3, $4, $5, $6)" ,
410
+ argCount , argTypes , argValues , nulls , false, 1 );
411
+
412
+ if ( succ_query_rc != SPI_OK_INSERT )
418
413
{
419
- elog (ERROR , "SPI_exec failed : %s" , query_insert_response_ok . data );
414
+ ereport (ERROR , errmsg ( "Error when inserting successful response : %s" , SPI_result_code_string ( succ_query_rc )) );
420
415
}
421
416
422
417
pfree (cdata -> body -> data );
0 commit comments