Skip to content

Commit f5e5144

Browse files
authored
Merge pull request #285 from philips-software/bugfix/279
Bugfix: unpack batch with failed messages #279
2 parents 00e9c26 + 4164c35 commit f5e5144

File tree

5 files changed

+16
-43
lines changed

5 files changed

+16
-43
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ require (
1111
github.com/loafoe/go-rabbitmq v0.5.0
1212
github.com/opentracing/opentracing-go v1.2.0
1313
github.com/openzipkin/zipkin-go v0.4.1
14-
github.com/philips-software/go-hsdp-api v0.75.0
14+
github.com/philips-software/go-hsdp-api v0.75.6
1515
github.com/prometheus/client_golang v1.14.0
1616
github.com/sirupsen/logrus v1.9.0
1717
github.com/spf13/viper v1.14.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -831,8 +831,8 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb
831831
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
832832
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
833833
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
834-
github.com/philips-software/go-hsdp-api v0.75.0 h1:AjYazGCntd6jH2xtGhxqqgy49bjvvAC+4bljvFVDE8k=
835-
github.com/philips-software/go-hsdp-api v0.75.0/go.mod h1:rd6uphXchFcYW2ehT5xWGobAZrIod7qSOLZUqWh61y4=
834+
github.com/philips-software/go-hsdp-api v0.75.6 h1:ic04DulkTUgG3VQ0n2UdVy1cD8mjWgC3nVI//H4+Km4=
835+
github.com/philips-software/go-hsdp-api v0.75.6/go.mod h1:WLknlRw2GiSmtDufXcy28YHOcbQXn3RfB9RrT5cimxQ=
836836
github.com/philips-software/go-hsdp-signer v1.4.0 h1:yg7UILhmI4xJhr/tQiAiQwJL0EZFvLuMqpH2GZ9ygY4=
837837
github.com/philips-software/go-hsdp-signer v1.4.0/go.mod h1:/QehZ/+Aks2t1TFpjhF/7ZSB8PJIIJHzLc03rOqwLw0=
838838
github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=

queue/channel_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func (n *nilStorer) StoreResources(_ []logging.Resource, count int) (*logging.St
5050
Response: &http.Response{
5151
StatusCode: http.StatusBadRequest,
5252
},
53-
Failed: map[int]logging.Resource{
54-
10: {},
55-
20: {},
53+
Failed: []logging.Resource{
54+
{},
55+
{},
5656
},
5757
}, logging.ErrBatchErrors
5858
}

queue/deliverer.go

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -111,56 +111,29 @@ func BodyToResource(body []byte, m Metrics) (*logging.Resource, error) {
111111
return resource, nil
112112
}
113113

114-
func contains(s []int, e int) bool {
115-
for _, a := range s {
116-
if a == e {
117-
return true
118-
}
119-
}
120-
return false
121-
}
122-
123114
func (pl *Deliverer) flushBatch(ctx context.Context, resources []logging.Resource, count int, queue Queue) (int, error) {
124115
tracer := opentracing.GlobalTracer()
125116
span, _ := opentracing.StartSpanFromContextWithTracer(ctx, tracer, "deliverer_flush_batch")
126117
defer span.Finish()
127118
fmt.Printf("batch flushing %d messages\n", count)
128-
maxLoop := count
129-
l := 0
130119

131-
for {
132-
l++
133-
resp, err := pl.storer.StoreResources(resources, count)
134-
if err == nil { // Happy flow
135-
break
136-
}
120+
resp, err := pl.storer.StoreResources(resources, count)
121+
if err != nil { // Unpack and send individually
137122
if resp == nil {
138123
fmt.Printf("unexpected error for StoreResource(): %v\n", err)
139-
continue
140-
}
141-
nrErrors := len(resp.Failed)
142-
keys := make([]int, 0, nrErrors)
143-
for k := range resp.Failed {
144-
keys = append(keys, k)
124+
return count, err
145125
}
146-
// Remove offending messages and resend
147-
pos := 0
126+
// Unpack and send individual messages
148127
for i := 0; i < count; i++ {
149-
if contains(keys, i) {
128+
fmt.Printf("resending %d\n", i+1)
129+
_, err = pl.storer.StoreResources([]logging.Resource{resources[i]}, 1)
130+
if err != nil {
150131
_ = queue.DeadLetter(resources[i])
151-
continue
132+
fmt.Printf("permanent failure sending %d resource: [%v] error: %v\n", i+1, resources[i], err)
152133
}
153-
resources[pos] = resources[i]
154-
pos++
155-
}
156-
count = pos
157-
fmt.Printf("Found %d errors. Resending %d\n", nrErrors, count)
158-
159-
if l > maxLoop || count <= 0 {
160-
fmt.Printf("Maximum retries reached or nothingt to send. Bailing..\n")
161-
break
162134
}
163135
}
136+
164137
return count, nil
165138
}
166139

queue/deliverer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func TestDroppedMessages(t *testing.T) {
253253
_, _ = io.Copy(&buf, r)
254254

255255
assert.Regexp(t, regexp.MustCompile("batch flushing 23 messages"), buf.String())
256-
assert.Regexp(t, regexp.MustCompile("Found 2 errors. Resending 21"), buf.String())
256+
assert.Regexp(t, regexp.MustCompile("resending 4"), buf.String())
257257
}
258258

259259
func TestEncodeString(t *testing.T) {

0 commit comments

Comments
 (0)