Skip to content

Commit 78b6bb8

Browse files
committed
feat: API async request functions
1 parent aec07be commit 78b6bb8

File tree

1 file changed

+84
-0
lines changed

1 file changed

+84
-0
lines changed

request/api_async_request.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package request
2+
3+
import (
4+
"github.com/DanielFillol/DataJUD_API_CALLER/models"
5+
"log"
6+
"sync"
7+
)
8+
9+
const API = "1/_search"
10+
11+
// AsyncAPIRequest makes API requests asynchronously
12+
func AsyncAPIRequest(users []models.ReadCsv, numberOfWorkers int, url string, method string, auth string) ([]models.ResponseBody, error) {
13+
// Create a channel to signal when the goroutines are done processing inputs
14+
done := make(chan struct{})
15+
defer close(done)
16+
// Create a channel to receive inputs from
17+
inputCh := StreamInputs(done, users)
18+
19+
// Create a wait group to wait for the worker goroutines to finish
20+
var wg sync.WaitGroup
21+
wg.Add(numberOfWorkers)
22+
23+
// Create a channel to receive results from
24+
resultCh := make(chan models.ResponseBody)
25+
26+
// Spawn worker goroutines to process inputs
27+
var errorOnApiRequests error
28+
for i := 0; i < numberOfWorkers; i++ {
29+
go func() {
30+
// Each worker goroutine consumes inputs from the shared input channel
31+
for input := range inputCh {
32+
// Make the API request and send the response to the result channel
33+
tj, err := defineTJ(input.CNJNumber)
34+
bodyStr, err := APIRequest(url+tj+API, method, auth, input)
35+
resultCh <- bodyStr
36+
if err != nil {
37+
// If there is an error making the API request, print the error
38+
log.Println("error sending request: " + err.Error())
39+
errorOnApiRequests = err
40+
break
41+
}
42+
}
43+
// When the worker goroutine is done processing inputs, signal the wait group
44+
wg.Done()
45+
}()
46+
}
47+
48+
// Wait for all worker goroutines to finish processing inputs
49+
go func() {
50+
wg.Wait()
51+
close(resultCh)
52+
}()
53+
54+
// Return early on error in any given call on API requests
55+
if errorOnApiRequests != nil {
56+
return nil, errorOnApiRequests
57+
}
58+
59+
// Collect results from the result channel and return them as a slice
60+
var results []models.ResponseBody
61+
for result := range resultCh {
62+
results = append(results, result)
63+
}
64+
65+
return results, nil
66+
}
67+
68+
// StreamInputs sends inputs from a slice to a channel
69+
func StreamInputs(done <-chan struct{}, inputs []models.ReadCsv) <-chan models.ReadCsv {
70+
// Create a channel to send inputs to
71+
inputCh := make(chan models.ReadCsv)
72+
go func() {
73+
defer close(inputCh)
74+
for _, input := range inputs {
75+
select {
76+
case inputCh <- input:
77+
case <-done:
78+
// If the done channel is closed prematurely, finish the loop (closing the input channel)
79+
break
80+
}
81+
}
82+
}()
83+
return inputCh
84+
}

0 commit comments

Comments
 (0)