Skip to content

Commit c8cedb2

Browse files
committed
fix: worker cancel
1 parent a5787c0 commit c8cedb2

File tree

1 file changed

+74
-84
lines changed

1 file changed

+74
-84
lines changed

worker/worker.go

Lines changed: 74 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -181,106 +181,96 @@ func (w *Worker) Run() {
181181

182182
func (w *Worker) runWorker() {
183183
for job := range w.jobs {
184-
select {
185-
case <-w.ctx.Done():
186-
return
187-
default:
188-
func() {
189-
defer func() {
190-
w.jobResult <- job
191-
}()
192-
filePath := job.FilePath
193-
req := new(blazehttp.Request)
194-
if w.useEmbedFS {
195-
if err := req.ReadFileFromFS(testcases.EmbedTestCasesFS, filePath); err != nil {
196-
job.Result.Err = fmt.Sprintf("read request file: %s from embed fs error: %s\n", filePath, err)
197-
return
198-
}
199-
} else {
200-
if err := req.ReadFile(filePath); err != nil {
201-
job.Result.Err = fmt.Sprintf("read request file: %s error: %s\n", filePath, err)
202-
return
203-
}
184+
func() {
185+
defer func() {
186+
w.jobResult <- job
187+
}()
188+
filePath := job.FilePath
189+
req := new(blazehttp.Request)
190+
if w.useEmbedFS {
191+
if err := req.ReadFileFromFS(testcases.EmbedTestCasesFS, filePath); err != nil {
192+
job.Result.Err = fmt.Sprintf("read request file: %s from embed fs error: %s\n", filePath, err)
193+
return
204194
}
205-
206-
if w.reqHost != "" {
207-
req.SetHost(w.reqHost)
208-
} else {
209-
req.SetHost(w.addr)
195+
} else {
196+
if err := req.ReadFile(filePath); err != nil {
197+
job.Result.Err = fmt.Sprintf("read request file: %s error: %s\n", filePath, err)
198+
return
210199
}
200+
}
211201

212-
if w.reqPerSession {
213-
// one http request one connection
214-
req.SetHeader("Connection", "close")
215-
}
202+
if w.reqHost != "" {
203+
req.SetHost(w.reqHost)
204+
} else {
205+
req.SetHost(w.addr)
206+
}
216207

217-
req.CalculateContentLength()
208+
if w.reqPerSession {
209+
// one http request one connection
210+
req.SetHeader("Connection", "close")
211+
}
218212

219-
start := time.Now()
220-
conn := blazehttp.Connect(w.addr, w.isHttps, w.timeout)
221-
if conn == nil {
222-
job.Result.Err = fmt.Sprintf("connect to %s failed!\n", w.addr)
223-
return
224-
}
225-
nWrite, err := req.WriteTo(*conn)
226-
if err != nil {
227-
job.Result.Err = fmt.Sprintf("send request poc: %s length: %d error: %s", filePath, nWrite, err)
228-
return
229-
}
213+
req.CalculateContentLength()
230214

231-
rsp := new(blazehttp.Response)
232-
if err = rsp.ReadConn(*conn); err != nil {
233-
job.Result.Err = fmt.Sprintf("read poc file: %s response, error: %s", filePath, err)
234-
return
235-
}
236-
elap := time.Since(start).Nanoseconds()
237-
(*conn).Close()
238-
job.Result.Success = true
239-
if strings.HasSuffix(job.FilePath, "white") {
240-
job.Result.IsWhite = true // white case
241-
}
215+
start := time.Now()
216+
conn := blazehttp.Connect(w.addr, w.isHttps, w.timeout)
217+
if conn == nil {
218+
job.Result.Err = fmt.Sprintf("connect to %s failed!\n", w.addr)
219+
return
220+
}
221+
nWrite, err := req.WriteTo(*conn)
222+
if err != nil {
223+
job.Result.Err = fmt.Sprintf("send request poc: %s length: %d error: %s", filePath, nWrite, err)
224+
return
225+
}
242226

243-
code := rsp.GetStatusCode()
244-
job.Result.StatusCode = code
245-
if code != w.blockStatusCode {
246-
job.Result.IsPass = true
247-
}
248-
job.Result.TimeCost = elap
249-
}()
250-
}
227+
rsp := new(blazehttp.Response)
228+
if err = rsp.ReadConn(*conn); err != nil {
229+
job.Result.Err = fmt.Sprintf("read poc file: %s response, error: %s", filePath, err)
230+
return
231+
}
232+
elap := time.Since(start).Nanoseconds()
233+
(*conn).Close()
234+
job.Result.Success = true
235+
if strings.HasSuffix(job.FilePath, "white") {
236+
job.Result.IsWhite = true // white case
237+
}
238+
239+
code := rsp.GetStatusCode()
240+
job.Result.StatusCode = code
241+
if code != w.blockStatusCode {
242+
job.Result.IsPass = true
243+
}
244+
job.Result.TimeCost = elap
245+
}()
251246
}
252247
}
253248

254249
func (w *Worker) processJobResult() {
255250
for job := range w.jobResult {
256-
select {
257-
case <-w.ctx.Done():
258-
return
259-
default:
260-
if job.Result.Success {
261-
w.result.Success++
262-
w.result.SuccessTimeCost += job.Result.TimeCost
263-
if job.Result.IsWhite {
264-
if job.Result.IsPass {
265-
w.result.TN++
266-
} else {
267-
w.result.FP++
268-
}
251+
if job.Result.Success {
252+
w.result.Success++
253+
w.result.SuccessTimeCost += job.Result.TimeCost
254+
if job.Result.IsWhite {
255+
if job.Result.IsPass {
256+
w.result.TN++
269257
} else {
270-
if job.Result.IsPass {
271-
w.result.FN++
272-
} else {
273-
w.result.TP++
274-
}
258+
w.result.FP++
275259
}
276260
} else {
277-
w.result.Error++
278-
}
279-
if w.resultCh != nil {
280-
r := *w.result
281-
r.Job = job
282-
w.resultCh <- &r
261+
if job.Result.IsPass {
262+
w.result.FN++
263+
} else {
264+
w.result.TP++
265+
}
283266
}
267+
} else {
268+
w.result.Error++
269+
}
270+
if w.resultCh != nil {
271+
r := *w.result
272+
r.Job = job
273+
w.resultCh <- &r
284274
}
285275
}
286276
}

0 commit comments

Comments
 (0)