diff --git a/client/client.go b/client/client.go index d230641..4698f10 100644 --- a/client/client.go +++ b/client/client.go @@ -1,17 +1,11 @@ package client import ( - "bytes" - "encoding/json" + "context" "fmt" - "io" - "io/ioutil" "net" "net/http" "net/url" - "path" - "strconv" - "strings" "time" ) @@ -26,7 +20,9 @@ type Job struct { } type LmstfyClient struct { - scheme string + scheme string + endpoint string + Namespace string Token string Host string @@ -38,11 +34,6 @@ type LmstfyClient struct { errorOnNilJob bool // return error when job is nil } -const ( - maxReadTimeout = 600 // second - maxBatchConsumeSize = 100 -) - func NewLmstfyClient(host string, port int, namespace, token string) *LmstfyClient { cli := &http.Client{ Transport: &http.Transport{ @@ -72,8 +63,9 @@ func NewLmstfyWithClient(cli *http.Client, host string, port int, namespace, tok Host: host, Port: port, - scheme: scheme, - httpCli: cli, + scheme: scheme, + endpoint: fmt.Sprintf("%s:%d", host, port), + httpCli: cli, } } @@ -88,35 +80,12 @@ func (c *LmstfyClient) ConfigRetry(retryCount int, backOffMillisecond int) { c.backOff = backOffMillisecond } -func (c *LmstfyClient) getReq(method, relativePath string, query url.Values, body []byte) (req *http.Request, err error) { - targetUrl := url.URL{ - Scheme: c.scheme, - Host: fmt.Sprintf("%s:%d", c.Host, c.Port), - Path: path.Join("/api", c.Namespace, relativePath), - RawQuery: query.Encode(), - } - if body == nil { - req, err = http.NewRequest(method, targetUrl.String(), nil) - if err != nil { - return - } - req.Header.Add("X-Token", c.Token) - return - } - req, err = http.NewRequest(method, targetUrl.String(), bytes.NewReader(body)) - if err != nil { - return - } - req.Header.Add("X-Token", c.Token) - return -} - // Publish a new job to the queue. // - ttlSecond is the time-to-live of the job. If it's zero, job won't expire; if it's positive, the value is the TTL. // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(queue, "", data, ttlSecond, tries, delaySecond) + return c.publish(nil, queue, "", data, ttlSecond, tries, delaySecond) } // RePublish delete(ack) the job of the queue and publish the job again. @@ -124,80 +93,7 @@ func (c *LmstfyClient) Publish(queue string, data []byte, ttlSecond uint32, trie // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) RePublish(job *Job, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - return c.publish(job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond) -} - -func (c *LmstfyClient) publish(queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, delaySecond uint32) (jobID string, e error) { - query := url.Values{} - query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) - query.Add("tries", strconv.FormatUint(uint64(tries), 10)) - query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) - retryCount := 0 - relativePath := queue - if ackJobID != "" { - relativePath = path.Join(relativePath, "job", ackJobID) - } -RETRY: - req, err := c.getReq(http.MethodPut, relativePath, query, data) - if err != nil { - return "", &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - - resp, err := c.httpCli.Do(req) - if err != nil { - if retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - goto RETRY - } - return "", &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - if resp.StatusCode != http.StatusCreated { - if resp.StatusCode >= 500 && retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - resp.Body.Close() - goto RETRY - } - defer resp.Body.Close() - return "", &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - if retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - goto RETRY - } - return "", &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - var respData struct { - JobID string `json:"job_id"` - } - err = json.Unmarshal(respBytes, &respData) - if err != nil { - return "", &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return respData.JobID, nil + return c.publish(nil, job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond) } // BatchPublish publish lots of jobs at one time @@ -205,80 +101,7 @@ RETRY: // - tries is the maximum times the job can be fetched. // - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. func (c *LmstfyClient) BatchPublish(queue string, jobs []interface{}, ttlSecond uint32, tries uint16, delaySecond uint32) (jobIDs []string, e error) { - query := url.Values{} - query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) - query.Add("tries", strconv.FormatUint(uint64(tries), 10)) - query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) - retryCount := 0 - relativePath := path.Join(queue, "bulk") - data, err := json.Marshal(jobs) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } -RETRY: - req, err := c.getReq(http.MethodPut, relativePath, query, data) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - - resp, err := c.httpCli.Do(req) - if err != nil { - if retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - goto RETRY - } - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - if resp.StatusCode != http.StatusCreated { - if resp.StatusCode >= 500 && retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - resp.Body.Close() - goto RETRY - } - defer resp.Body.Close() - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - if retryCount < c.retry { - time.Sleep(time.Duration(c.backOff) * time.Millisecond) - retryCount++ - goto RETRY - } - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - var respData struct { - JobIDs []string `json:"job_ids"` - } - err = json.Unmarshal(respBytes, &respData) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return respData.JobIDs, nil + return c.batchPublish(nil, queue, jobs, ttlSecond, tries, delaySecond) } // Consume a job. Consuming will decrease the job's tries by 1 first. @@ -287,7 +110,7 @@ RETRY: // - timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately // with or without a job; if it's positive, this method would polling for new job until timeout. func (c *LmstfyClient) Consume(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error) { - return c.consume(queue, ttrSecond, timeoutSecond, false) + return c.consume(nil, queue, ttrSecond, timeoutSecond, false) } // ConsumeWithFreezeTries a job. Consuming with retries will not decrease the job's tries. @@ -296,101 +119,16 @@ func (c *LmstfyClient) Consume(queue string, ttrSecond, timeoutSecond uint32) (j // - timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately // with or without a job; if it's positive, this method would polling for new job until timeout. func (c *LmstfyClient) ConsumeWithFreezeTries(queue string, ttrSecond, timeoutSecond uint32) (job *Job, e error) { - return c.consume(queue, ttrSecond, timeoutSecond, true) -} - -// consume a job. Consuming will decrease the job's tries by 1 first. -// - ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, -// the job will be released for consuming again if the `(tries - 1) > 0`. -// - timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately -// with or without a job; if it's positive, this method would polling for new job until timeout. -// - free_tries was used to determine whether to decrease the tries or not when consuming, if the tries -// decreases to 0, the job would move into dead letter. Default was false. -func (c *LmstfyClient) consume(queue string, ttrSecond, timeoutSecond uint32, freezeTries bool) (job *Job, e error) { - if strings.TrimSpace(queue) == "" { - return nil, &APIError{ - Type: RequestErr, - Reason: "Queue name shouldn't be empty", - } - } - if ttrSecond <= 0 { - return nil, &APIError{ - Type: RequestErr, - Reason: "TTR should be > 0", - } - } - if timeoutSecond >= maxReadTimeout { - return nil, &APIError{ - Type: RequestErr, - Reason: fmt.Sprintf("timeout should be < %d", maxReadTimeout), - } - } - query := url.Values{} - query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) - query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) - query.Add("freeze_tries", strconv.FormatBool(freezeTries)) - req, err := c.getReq(http.MethodGet, queue, query, nil) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusNotFound: - discardResponseBody(resp.Body) - if c.errorOnNilJob { - return nil, &APIError{ - Type: ResponseErr, - Reason: "no job or queue was not found", - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return nil, nil - case http.StatusOK: - // continue - default: - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - job = &Job{} - err = json.Unmarshal(respBytes, job) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return job, nil + return c.consume(nil, queue, ttrSecond, timeoutSecond, true) } // BatchConsume consume some jobs. Consuming will decrease these jobs tries by 1 first. // - ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, -// these job will be released for consuming again if the `(tries - 1) > 0`. +// these jobs will be released for consuming again if the `(tries - 1) > 0`. // - count is the job count of this consume. If it's zero or over 100, this method will return an error. // If it's positive, this method would return some jobs, and it's count is between 0 and count. func (c *LmstfyClient) BatchConsume(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error) { - return c.batchConsume(queues, count, ttrSecond, timeoutSecond, false) + return c.batchConsume(nil, queues, count, ttrSecond, timeoutSecond, false) } // BatchConsume consume some jobs. Consuming with freeze tries will not decrease these jobs tries. @@ -399,524 +137,141 @@ func (c *LmstfyClient) BatchConsume(queues []string, count, ttrSecond, timeoutSe // - count is the job count of this consume. If it's zero or over 100, this method will return an error. // If it's positive, this method would return some jobs, and it's count is between 0 and count. func (c *LmstfyClient) BatchConsumeWithFreezeTries(queues []string, count, ttrSecond, timeoutSecond uint32) (jobs []*Job, e error) { - return c.batchConsume(queues, count, ttrSecond, timeoutSecond, true) -} - -// batchConsume consume some jobs. Consuming will decrease these jobs tries by 1 first. -// - ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, -// these job will be released for consuming again if the `(tries - 1) > 0`. -// - count is the job count of this consume. If it's zero or over 100, this method will return an error. -// If it's positive, this method would return some jobs, and it's count is between 0 and count. -// - free_tries was used to determine whether to decrease the tries or not when consuming, if the tries -// decreases to 0, the job would move into dead letter. Default was false. -func (c *LmstfyClient) batchConsume(queues []string, count, ttrSecond, timeoutSecond uint32, freezeTries bool) (jobs []*Job, e error) { - if len(queues) == 0 { - return nil, &APIError{ - Type: RequestErr, - Reason: "At least one queue was required", - } - } - if ttrSecond <= 0 { - return nil, &APIError{ - Type: RequestErr, - Reason: "TTR should be > 0", - } - } - if count <= 0 || count > maxBatchConsumeSize { - return nil, &APIError{ - Type: RequestErr, - Reason: "COUNT should be > 0", - } - } - if timeoutSecond >= maxReadTimeout { - return nil, &APIError{ - Type: RequestErr, - Reason: fmt.Sprintf("timeout should be < %d", maxReadTimeout), - } - } - - query := url.Values{} - query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) - query.Add("count", strconv.FormatUint(uint64(count), 10)) - query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) - query.Add("freeze_tries", strconv.FormatBool(freezeTries)) - req, err := c.getReq(http.MethodGet, strings.Join(queues, ","), query, nil) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusNotFound: - discardResponseBody(resp.Body) - return nil, nil - case http.StatusOK: - // continue - default: - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - if count == 1 { - job := &Job{} - err = json.Unmarshal(respBytes, job) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return []*Job{job}, nil - } - jobs = []*Job{} - err = json.Unmarshal(respBytes, &jobs) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return jobs, nil + return c.batchConsume(nil, queues, count, ttrSecond, timeoutSecond, true) } -// Consume from multiple queues with priority. +// ConsumeFromQueues consumes from multiple queues with priority. // The order of the queues in the params implies the priority. eg. // ConsumeFromQueues(120, 5, "queue-a", "queue-b", "queue-c") // if all the queues have jobs to be fetched, the job in `queue-a` will be return. func (c *LmstfyClient) ConsumeFromQueues(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) { - return c.consumeFromQueues(ttrSecond, timeoutSecond, false, queues...) + return c.consumeFromQueues(nil, ttrSecond, timeoutSecond, false, queues...) } +// ConsumeFromQueuesWithFreezeTries consumes from multiple queues with priority. func (c *LmstfyClient) ConsumeFromQueuesWithFreezeTries(ttrSecond, timeoutSecond uint32, queues ...string) (job *Job, e error) { - return c.consumeFromQueues(ttrSecond, timeoutSecond, true, queues...) + return c.consumeFromQueues(nil, ttrSecond, timeoutSecond, true, queues...) } -func (c *LmstfyClient) consumeFromQueues(ttrSecond, timeoutSecond uint32, freezeTries bool, queues ...string) (job *Job, e error) { - if len(queues) == 0 { - return nil, &APIError{ - Type: RequestErr, - Reason: "At least one queue was required", - } - } - if ttrSecond <= 0 { - return nil, &APIError{ - Type: RequestErr, - Reason: "TTR should be > 0", - } - } - if timeoutSecond >= maxReadTimeout { - return nil, &APIError{ - Type: RequestErr, - Reason: fmt.Sprintf("timeout must be < %d when fetch from multiple queues", maxReadTimeout), - } - } - query := url.Values{} - query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) - query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) - query.Add("freeze_tries", strconv.FormatBool(freezeTries)) - req, err := c.getReq(http.MethodGet, strings.Join(queues, ","), query, nil) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusNotFound: - discardResponseBody(resp.Body) - return nil, nil - case http.StatusOK: - // continue - default: - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - job = &Job{} - err = json.Unmarshal(respBytes, job) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return job, nil -} - -// Mark a job as finished, so it won't be retried by others. +// Ack Marks a job as finished, so it won't be retried by others. func (c *LmstfyClient) Ack(queue, jobID string) *APIError { - req, err := c.getReq(http.MethodDelete, path.Join(queue, "job", jobID), nil, nil) - if err != nil { - return &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - return &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return nil + return c.ack(nil, queue, jobID) } -// Get queue size. how many jobs are ready for consuming +// QueueSize Get queue size. It means how many jobs are waiting in the queue for consuming. func (c *LmstfyClient) QueueSize(queue string) (int, *APIError) { - req, err := c.getReq(http.MethodGet, path.Join(queue, "size"), nil, nil) - if err != nil { - return 0, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return 0, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - var respData struct { - Namespace string `json:"namespace"` - Queue string `json:"queue"` - Size int `json:"size"` - } - err = json.Unmarshal(respBytes, &respData) - if err != nil { - return 0, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return respData.Size, nil + return c.queueSize(nil, queue) } -// Peek the job in the head of the queue +// PeekQueue Peeks the job in the head of the queue func (c *LmstfyClient) PeekQueue(queue string) (job *Job, e *APIError) { - req, err := c.getReq(http.MethodGet, path.Join(queue, "peek"), nil, nil) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusNotFound: - discardResponseBody(resp.Body) - return nil, nil - case http.StatusOK: - // continue - default: - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - job = &Job{} - err = json.Unmarshal(respBytes, job) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return job, nil + return c.peekQueue(nil, queue) } -// Peek a specific job data +// PeekJob Peeks a specific job data func (c *LmstfyClient) PeekJob(queue, jobID string) (job *Job, e *APIError) { - req, err := c.getReq(http.MethodGet, path.Join(queue, "job", jobID), nil, nil) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return nil, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - switch resp.StatusCode { - case http.StatusNotFound: - discardResponseBody(resp.Body) - return nil, nil - case http.StatusOK: - // continue - default: - return nil, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - job = &Job{} - err = json.Unmarshal(respBytes, job) - if err != nil { - return nil, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return job, nil + return c.peekJob(nil, queue, jobID) } -// Peek the deadletter of the queue +// PeekDeadLetter Peeks the dead letter of the queue func (c *LmstfyClient) PeekDeadLetter(queue string) (deadLetterSize int, deadLetterHead string, e *APIError) { - req, err := c.getReq(http.MethodGet, path.Join(queue, "deadletter"), nil, nil) - if err != nil { - return 0, "", &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return 0, "", &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, "", &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, "", &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - var respData struct { - Namespace string `json:"namespace"` - Queue string `json:"queue"` - DeadLetterSize int `json:"deadletter_size"` - DeadLetterHead string `json:"deadletter_head"` - } - err = json.Unmarshal(respBytes, &respData) - if err != nil { - return 0, "", &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return respData.DeadLetterSize, respData.DeadLetterHead, nil + return c.peekDeadLetter(nil, queue) } +// RespawnDeadLetter respawns the jobs of the given queue's dead letter func (c *LmstfyClient) RespawnDeadLetter(queue string, limit, ttlSecond int64) (count int, e *APIError) { - if limit <= 0 { - return 0, &APIError{ - Type: RequestErr, - Reason: "limit should be > 0", - } - } - if ttlSecond < 0 { - return 0, &APIError{ - Type: RequestErr, - Reason: "TTL should be >= 0", - } - } - query := url.Values{} - query.Add("limit", strconv.FormatInt(limit, 10)) - query.Add("ttl", strconv.FormatInt(ttlSecond, 10)) - req, err := c.getReq(http.MethodPut, path.Join(queue, "deadletter"), query, nil) - if err != nil { - return 0, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return 0, &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return 0, &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return 0, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - var respData struct { - Count int `json:"count"` - } - err = json.Unmarshal(respBytes, &respData) - if err != nil { - return 0, &APIError{ - Type: ResponseErr, - Reason: err.Error(), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return respData.Count, nil + return c.respawnDeadLetter(nil, queue, limit, ttlSecond) } +// DeleteDeadLetter deletes the given queue's dead letter func (c *LmstfyClient) DeleteDeadLetter(queue string, limit int64) *APIError { - if limit <= 0 { - return &APIError{ - Type: RequestErr, - Reason: "limit should be > 0", - } - } - query := url.Values{} - query.Add("limit", strconv.FormatInt(limit, 10)) - req, err := c.getReq(http.MethodDelete, path.Join(queue, "deadletter"), query, nil) - if err != nil { - return &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - resp, err := c.httpCli.Do(req) - if err != nil { - return &APIError{ - Type: RequestErr, - Reason: err.Error(), - } - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusNoContent { - return &APIError{ - Type: ResponseErr, - Reason: parseResponseError(resp), - RequestID: resp.Header.Get("X-Request-ID"), - } - } - return nil + return c.deleteDeadLetter(nil, queue, limit) } -func discardResponseBody(resp io.ReadCloser) { - // discard response body, to make this connection reusable in the http connection pool - ioutil.ReadAll(resp) +// <---------------------------------- THE CONTEXT VERSIONS OF THE METHODS ARE BELOW ----------------------------------> + +// PublishWithContext a context version of Publish +func (c *LmstfyClient) PublishWithContext(ctx context.Context, queue string, data []byte, ttlSecond uint32, tries uint16, + delaySecond uint32) (jobID string, err error) { + return c.publish(nil, queue, "", data, ttlSecond, tries, delaySecond) } -func parseResponseError(resp *http.Response) string { - respBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Sprintf("Invalid response: %s", err) - } - var errData struct { - Error string `json:"error"` - } - err = json.Unmarshal(respBytes, &errData) - if err != nil { - return fmt.Sprintf("Invalid JSON: %s", err) - } - return fmt.Sprintf("[%d]%s", resp.StatusCode, errData.Error) +// RePublishWithContext a context version of RePublish +func (c *LmstfyClient) RePublishWithContext(ctx context.Context, job *Job, ttlSecond uint32, tries uint16, + delaySecond uint32) (jobID string, err error) { + return c.publish(ctx, job.Queue, job.ID, job.Data, ttlSecond, tries, delaySecond) +} + +// BatchPublishWithContext a context version of BatchPublish +func (c *LmstfyClient) BatchPublishWithContext(ctx context.Context, queue string, jobDataSet []interface{}, + ttlSecond uint32, tries uint16, delaySecond uint32) (jobIDs []string, err error) { + return c.batchPublish(ctx, queue, jobDataSet, ttlSecond, tries, delaySecond) +} + +// ConsumeWithContext a context version of Consume +func (c *LmstfyClient) ConsumeWithContext(ctx context.Context, queue string, ttrSecond, timeoutSecond uint32) (*Job, error) { + return c.consume(ctx, queue, ttrSecond, timeoutSecond, false) +} + +// ConsumeWithFreezeTriesWithContext a context version of ConsumeWithFreezeTries +func (c *LmstfyClient) ConsumeWithFreezeTriesWithContext(ctx context.Context, queue string, ttrSecond, timeoutSecond uint32) (*Job, error) { + return c.consume(ctx, queue, ttrSecond, timeoutSecond, true) +} + +// BatchConsumeWithContext a context version of BatchConsume +func (c *LmstfyClient) BatchConsumeWithContext(ctx context.Context, queues []string, count, ttrSecond, timeoutSecond uint32) ([]*Job, error) { + return c.batchConsume(ctx, queues, count, ttrSecond, timeoutSecond, false) +} + +// BatchConsumeWithFreezeTriesWithContext a context version of BatchConsumeWithFreezeTries +func (c *LmstfyClient) BatchConsumeWithFreezeTriesWithContext(ctx context.Context, queues []string, + count, ttrSecond, timeoutSecond uint32) ([]*Job, error) { + return c.batchConsume(ctx, queues, count, ttrSecond, timeoutSecond, true) +} + +// ConsumeFromQueuesWithContext a context version of ConsumeFromQueues +func (c *LmstfyClient) ConsumeFromQueuesWithContext(ctx context.Context, ttrSecond, timeoutSecond uint32, + queues ...string) (*Job, error) { + return c.consumeFromQueues(ctx, ttrSecond, timeoutSecond, false, queues...) +} + +// ConsumeFromQueuesWithFreezeTriesWithContext a context version of ConsumeFromQueuesWithFreezeTries +func (c *LmstfyClient) ConsumeFromQueuesWithFreezeTriesWithContext(ctx context.Context, ttrSecond, timeoutSecond uint32, + queues ...string) (*Job, error) { + return c.consumeFromQueues(ctx, ttrSecond, timeoutSecond, true, queues...) +} + +// AckWithContext a context version of Ack +func (c *LmstfyClient) AckWithContext(ctx context.Context, queue, jobID string) *APIError { + return c.ack(ctx, queue, jobID) +} + +// QueueSizeWithContext a context version of QueueSize +func (c *LmstfyClient) QueueSizeWithContext(ctx context.Context, queue string) (int, *APIError) { + return c.queueSize(ctx, queue) +} + +// PeekQueueWithContext a context version of PeekQueue +func (c *LmstfyClient) PeekQueueWithContext(ctx context.Context, queue string) (*Job, *APIError) { + return c.peekQueue(ctx, queue) +} + +// PeekJobWithContext a context version of PeekJob +func (c *LmstfyClient) PeekJobWithContext(ctx context.Context, queue, jobID string) (*Job, *APIError) { + return c.peekJob(ctx, queue, jobID) +} + +// PeekDeadLetterWithContext a context version of PeekDeadLetter +func (c *LmstfyClient) PeekDeadLetterWithContext(ctx context.Context, queue string) (int, string, *APIError) { + return c.peekDeadLetter(ctx, queue) +} + +// RespawnDeadLetterWithContext a context version of RespawnDeadLetter +func (c *LmstfyClient) RespawnDeadLetterWithContext(ctx context.Context, queue string, limit, ttlSecond int64) (int, *APIError) { + return c.respawnDeadLetter(ctx, queue, limit, ttlSecond) +} + +// DeleteDeadLetterWithContext a context version of DeleteDeadLetter +func (c *LmstfyClient) DeleteDeadLetterWithContext(ctx context.Context, queue string, limit int64) *APIError { + return c.deleteDeadLetter(ctx, queue, limit) } diff --git a/client/client_impl.go b/client/client_impl.go new file mode 100644 index 0000000..59b214f --- /dev/null +++ b/client/client_impl.go @@ -0,0 +1,808 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" +) + +const ( + maxReadTimeout = 600 // second + maxBatchConsumeSize = 100 +) + +// getReq generates an HTTP request +// - ctx is the context of the request, nil if no context +// - method is the HTTP method +// - relativePath is the relative path of the request +// - query is the query parameters +// - body is the request body, nil if no body to send +func (c *LmstfyClient) getReq(ctx context.Context, method, relativePath string, query url.Values, body []byte) ( + req *http.Request, err error) { + targetUrl := (&url.URL{ + Scheme: c.scheme, + Host: c.endpoint, + Path: path.Join("/api", c.Namespace, relativePath), + RawQuery: query.Encode(), + }).String() + + var bodyReader io.Reader + if len(body) > 0 { + bodyReader = bytes.NewReader(body) + } + + if ctx != nil { + req, err = http.NewRequestWithContext(ctx, method, targetUrl, bodyReader) + } else { + req, err = http.NewRequest(method, targetUrl, bodyReader) + } + if err != nil { + return nil, err + } + + req.Header.Add("X-Token", c.Token) + return req, nil +} + +// publish publishes a job to the given queue +func (c *LmstfyClient) publish(ctx context.Context, queue, ackJobID string, data []byte, ttlSecond uint32, tries uint16, + delaySecond uint32) (jobID string, e error) { + query := url.Values{} + query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) + query.Add("tries", strconv.FormatUint(uint64(tries), 10)) + query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) + retryCount := 0 + relativePath := queue + if ackJobID != "" { + relativePath = path.Join(relativePath, "job", ackJobID) + } +RETRY: + req, err := c.getReq(ctx, http.MethodPut, relativePath, query, data) + if err != nil { + return "", &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + + resp, err := c.httpCli.Do(req) + if err != nil { + if retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + goto RETRY + } + return "", &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + if resp.StatusCode != http.StatusCreated { + if resp.StatusCode >= 500 && retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + resp.Body.Close() + goto RETRY + } + defer resp.Body.Close() + return "", &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + if retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + goto RETRY + } + return "", &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + var respData struct { + JobID string `json:"job_id"` + } + err = json.Unmarshal(respBytes, &respData) + if err != nil { + return "", &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return respData.JobID, nil +} + +// batchPublish publish lots of jobs at one time +// - ttlSecond is the time-to-live of the job. If it's zero, the job won't be expired; if it's positive, the value is the TTL. +// - tries is the maximum retries that the job can be reached +// - delaySecond is the duration before the job is released for consuming. When it's zero, no delay is applied. +func (c *LmstfyClient) batchPublish(ctx context.Context, queue string, jobs []interface{}, ttlSecond uint32, tries uint16, + delaySecond uint32) (jobIDs []string, e error) { + query := url.Values{} + query.Add("ttl", strconv.FormatUint(uint64(ttlSecond), 10)) + query.Add("tries", strconv.FormatUint(uint64(tries), 10)) + query.Add("delay", strconv.FormatUint(uint64(delaySecond), 10)) + retryCount := 0 + relativePath := path.Join(queue, "bulk") + data, err := json.Marshal(jobs) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } +RETRY: + req, err := c.getReq(ctx, http.MethodPut, relativePath, query, data) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + + resp, err := c.httpCli.Do(req) + if err != nil { + if retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + goto RETRY + } + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + if resp.StatusCode != http.StatusCreated { + if resp.StatusCode >= 500 && retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + resp.Body.Close() + goto RETRY + } + defer resp.Body.Close() + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + if retryCount < c.retry { + time.Sleep(time.Duration(c.backOff) * time.Millisecond) + retryCount++ + goto RETRY + } + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + var respData struct { + JobIDs []string `json:"job_ids"` + } + err = json.Unmarshal(respBytes, &respData) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return respData.JobIDs, nil +} + +// consume a job. Consuming will decrease the job's tries by 1 first. +// - ttrSecond is the time-to-run of the job. If the job is not finished before the TTR expires, +// the job will be released for consuming again if the `(tries - 1) > 0`. +// - timeoutSecond is the long-polling wait time. If it's zero, this method will return immediately +// with or without a job; if it's positive, this method would poll for a new job until timeout. +// - free_tries was used to determine whether to decrease the tries or not when consuming, if the tries +// decrease to 0, the job would move into a dead letter. Default was false. +func (c *LmstfyClient) consume(ctx context.Context, queue string, ttrSecond, timeoutSecond uint32, freezeTries bool) ( + job *Job, e error) { + if strings.TrimSpace(queue) == "" { + return nil, &APIError{ + Type: RequestErr, + Reason: "Queue name shouldn't be empty", + } + } + if ttrSecond <= 0 { + return nil, &APIError{ + Type: RequestErr, + Reason: "TTR should be > 0", + } + } + if timeoutSecond >= maxReadTimeout { + return nil, &APIError{ + Type: RequestErr, + Reason: fmt.Sprintf("timeout should be < %d", maxReadTimeout), + } + } + query := url.Values{} + query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) + query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) + query.Add("freeze_tries", strconv.FormatBool(freezeTries)) + req, err := c.getReq(ctx, http.MethodGet, queue, query, nil) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotFound: + discardResponseBody(resp.Body) + if c.errorOnNilJob { + return nil, &APIError{ + Type: ResponseErr, + Reason: "no job or queue was not found", + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return nil, nil + case http.StatusOK: + // continue + default: + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + job = &Job{} + err = json.Unmarshal(respBytes, job) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return job, nil +} + +// batchConsume consumes some jobs. Consuming will decrease these jobs tries by 1 first. +// - ttrSecond is the time-to-run of these jobs. If these jobs are not finished before the TTR expires, +// these jobs will be released to consume again if the `(tries - 1) > 0`. +// - count is the job count of this consume. If it's zero or over 100, this method will return an error. +// If it's positive, this method would return some jobs, and its count is between 0 and count. +// - freezeTries was used to determine whether to decrease the tries or not when consuming, if the tries +// decrease to 0, the job would move into a dead letter. Default was false. +func (c *LmstfyClient) batchConsume(ctx context.Context, queues []string, count, ttrSecond, timeoutSecond uint32, + freezeTries bool) (jobs []*Job, e error) { + if len(queues) == 0 { + return nil, &APIError{ + Type: RequestErr, + Reason: "At least one queue was required", + } + } + if ttrSecond <= 0 { + return nil, &APIError{ + Type: RequestErr, + Reason: "TTR should be > 0", + } + } + if count <= 0 || count > maxBatchConsumeSize { + return nil, &APIError{ + Type: RequestErr, + Reason: "COUNT should be > 0", + } + } + if timeoutSecond >= maxReadTimeout { + return nil, &APIError{ + Type: RequestErr, + Reason: fmt.Sprintf("timeout should be < %d", maxReadTimeout), + } + } + + query := url.Values{} + query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) + query.Add("count", strconv.FormatUint(uint64(count), 10)) + query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) + query.Add("freeze_tries", strconv.FormatBool(freezeTries)) + req, err := c.getReq(ctx, http.MethodGet, strings.Join(queues, ","), query, nil) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotFound: + discardResponseBody(resp.Body) + return nil, nil + case http.StatusOK: + // continue + default: + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + if count == 1 { + job := &Job{} + err = json.Unmarshal(respBytes, job) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return []*Job{job}, nil + } + jobs = []*Job{} + err = json.Unmarshal(respBytes, &jobs) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return jobs, nil +} + +func (c *LmstfyClient) consumeFromQueues(ctx context.Context, ttrSecond, timeoutSecond uint32, freezeTries bool, + queues ...string) (job *Job, e error) { + if len(queues) == 0 { + return nil, &APIError{ + Type: RequestErr, + Reason: "At least one queue was required", + } + } + if ttrSecond <= 0 { + return nil, &APIError{ + Type: RequestErr, + Reason: "TTR should be > 0", + } + } + if timeoutSecond >= maxReadTimeout { + return nil, &APIError{ + Type: RequestErr, + Reason: fmt.Sprintf("timeout must be < %d when fetch from multiple queues", maxReadTimeout), + } + } + query := url.Values{} + query.Add("ttr", strconv.FormatUint(uint64(ttrSecond), 10)) + query.Add("timeout", strconv.FormatUint(uint64(timeoutSecond), 10)) + query.Add("freeze_tries", strconv.FormatBool(freezeTries)) + req, err := c.getReq(ctx, http.MethodGet, strings.Join(queues, ","), query, nil) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotFound: + discardResponseBody(resp.Body) + return nil, nil + case http.StatusOK: + // continue + default: + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + job = &Job{} + err = json.Unmarshal(respBytes, job) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return job, nil +} + +// ack Marks a job as finished, so it won't be retried by others. +func (c *LmstfyClient) ack(ctx context.Context, queue, jobID string) *APIError { + req, err := c.getReq(ctx, http.MethodDelete, path.Join(queue, "job", jobID), nil, nil) + if err != nil { + return &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return nil +} + +// queueSize Gets the queue size. it means how many jobs are ready to consume +func (c *LmstfyClient) queueSize(ctx context.Context, queue string) (int, *APIError) { + req, err := c.getReq(ctx, http.MethodGet, path.Join(queue, "size"), nil, nil) + if err != nil { + return 0, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return 0, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return 0, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + var respData struct { + Namespace string `json:"namespace"` + Queue string `json:"queue"` + Size int `json:"size"` + } + err = json.Unmarshal(respBytes, &respData) + if err != nil { + return 0, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return respData.Size, nil +} + +// peekQueue Peek the job in the head of the queue +func (c *LmstfyClient) peekQueue(ctx context.Context, queue string) (job *Job, e *APIError) { + req, err := c.getReq(ctx, http.MethodGet, path.Join(queue, "peek"), nil, nil) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotFound: + discardResponseBody(resp.Body) + return nil, nil + case http.StatusOK: + // continue + default: + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + job = &Job{} + err = json.Unmarshal(respBytes, job) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return job, nil +} + +// peekJob Peek a specific job data +func (c *LmstfyClient) peekJob(ctx context.Context, queue, jobID string) (job *Job, e *APIError) { + req, err := c.getReq(ctx, http.MethodGet, path.Join(queue, "job", jobID), nil, nil) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return nil, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + switch resp.StatusCode { + case http.StatusNotFound: + discardResponseBody(resp.Body) + return nil, nil + case http.StatusOK: + // continue + default: + return nil, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + job = &Job{} + err = json.Unmarshal(respBytes, job) + if err != nil { + return nil, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return job, nil +} + +// peekDeadLetter Peek the dead letter of the queue +func (c *LmstfyClient) peekDeadLetter(ctx context.Context, queue string) ( + deadLetterSize int, deadLetterHead string, e *APIError) { + req, err := c.getReq(ctx, http.MethodGet, path.Join(queue, "deadletter"), nil, nil) + if err != nil { + return 0, "", &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return 0, "", &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return 0, "", &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, "", &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + var respData struct { + Namespace string `json:"namespace"` + Queue string `json:"queue"` + DeadLetterSize int `json:"deadletter_size"` + DeadLetterHead string `json:"deadletter_head"` + } + err = json.Unmarshal(respBytes, &respData) + if err != nil { + return 0, "", &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return respData.DeadLetterSize, respData.DeadLetterHead, nil +} + +func (c *LmstfyClient) respawnDeadLetter(ctx context.Context, queue string, limit, ttlSecond int64) ( + count int, e *APIError) { + if limit <= 0 { + return 0, &APIError{ + Type: RequestErr, + Reason: "limit should be > 0", + } + } + if ttlSecond < 0 { + return 0, &APIError{ + Type: RequestErr, + Reason: "TTL should be >= 0", + } + } + query := url.Values{} + query.Add("limit", strconv.FormatInt(limit, 10)) + query.Add("ttl", strconv.FormatInt(ttlSecond, 10)) + req, err := c.getReq(ctx, http.MethodPut, path.Join(queue, "deadletter"), query, nil) + if err != nil { + return 0, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return 0, &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return 0, &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return 0, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + var respData struct { + Count int `json:"count"` + } + err = json.Unmarshal(respBytes, &respData) + if err != nil { + return 0, &APIError{ + Type: ResponseErr, + Reason: err.Error(), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return respData.Count, nil +} + +func (c *LmstfyClient) deleteDeadLetter(ctx context.Context, queue string, limit int64) *APIError { + if limit <= 0 { + return &APIError{ + Type: RequestErr, + Reason: "limit should be > 0", + } + } + query := url.Values{} + query.Add("limit", strconv.FormatInt(limit, 10)) + req, err := c.getReq(ctx, http.MethodDelete, path.Join(queue, "deadletter"), query, nil) + if err != nil { + return &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + resp, err := c.httpCli.Do(req) + if err != nil { + return &APIError{ + Type: RequestErr, + Reason: err.Error(), + } + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { + return &APIError{ + Type: ResponseErr, + Reason: parseResponseError(resp), + RequestID: resp.Header.Get("X-Request-ID"), + } + } + return nil +} + +func discardResponseBody(resp io.ReadCloser) { + // discard response body, to make this connection reusable in the http connection pool + _, _ = ioutil.ReadAll(resp) +} + +func parseResponseError(resp *http.Response) string { + respBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Sprintf("Invalid response: %s", err) + } + var errData struct { + Error string `json:"error"` + } + err = json.Unmarshal(respBytes, &errData) + if err != nil { + return fmt.Sprintf("Invalid JSON: %s", err) + } + return fmt.Sprintf("[%d]%s", resp.StatusCode, errData.Error) +}