Skip to content

Commit 3f99e87

Browse files
committed
feat: impl pubsub v2 and general fixes
1 parent 880f093 commit 3f99e87

File tree

11 files changed

+66
-168
lines changed

11 files changed

+66
-168
lines changed

bootstrap/function/adapter/contrib/cloud.google.com/pubsub/v1/subscriber.go

Lines changed: 37 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package pubsub
22

33
import (
4+
"cloud.google.com/go/pubsub"
45
"context"
56
"fmt"
6-
"math"
7-
"time"
8-
9-
"cloud.google.com/go/pubsub"
107
"github.com/xgodev/boost/bootstrap/function"
118
"github.com/xgodev/boost/model/errors"
129
"github.com/xgodev/boost/wrapper/log"
10+
"github.com/xgodev/boost/wrapper/log/contrib/rs/zerolog/v1"
11+
12+
"math"
13+
"time"
1314

1415
"github.com/cloudevents/sdk-go/v2/event"
1516
"github.com/google/uuid"
@@ -62,63 +63,45 @@ func (l *Subscriber[T]) Subscribe(ctx context.Context) error {
6263

6364
// processMessage processes each message, retries if needed, and applies backoff
6465
func (l *Subscriber[T]) processMessage(ctx context.Context, msg *pubsub.Message) error {
66+
ctx = zerolog.NewLogger().ToContext(ctx)
67+
68+
retryCount := 0
69+
6570
in, err := l.generateCloudEvent(msg)
6671
if err != nil {
6772
msg.Nack()
6873
return errors.Wrap(err, errors.Internalf("could not generate CloudEvent: %s", err.Error()))
6974
}
7075

71-
if _, err := l.handler(ctx, in); err != nil {
72-
a := 0
73-
if msg.DeliveryAttempt != nil {
74-
a = *msg.DeliveryAttempt
76+
for {
77+
// Timeout por tentativa
78+
msgCtx, cancel := context.WithTimeout(ctx, l.options.ProcessTimeout)
79+
80+
// Processes the event via handler
81+
if _, err := l.handler(msgCtx, in); err != nil {
82+
cancel()
83+
retryCount++
84+
85+
log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d/%d): %v\nPayload: %s", msg.ID, retryCount, l.options.RetryLimit, err, string(msg.Data))
86+
87+
// Check retry limit
88+
if l.options.RetryLimit != -1 && retryCount >= l.options.RetryLimit {
89+
return errors.Wrap(err, errors.Internalf("max retry limit reached"))
90+
}
91+
92+
// Apply backoff if enabled
93+
if l.options.Backoff {
94+
l.applyBackoff(retryCount)
95+
}
96+
97+
// Retry processing the message
98+
continue
7599
}
76-
log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d): %v | Payload: %s", msg.ID, a, err, string(msg.Data))
77-
return err
78-
}
79100

80-
//if _, err := l.handler(ctx, in); err != nil {
81-
// log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d): %v | Payload: %s", msg.ID, *msg.DeliveryAttempt, err, string(msg.Data))
82-
// return err
83-
//}
84-
85-
//msg.DeliveryAttempt
86-
//
87-
//for {
88-
// // Timeout por tentativa
89-
// //msgCtx, cancel := context.WithTimeout(ctx, l.options.ProcessTimeout)
90-
//
91-
// // Processes the event via handler
92-
// if _, err := l.handler(ctx, in); err != nil {
93-
// a := 0
94-
// if msg.DeliveryAttempt != nil {
95-
// a = *msg.DeliveryAttempt
96-
// }
97-
//
98-
// log.Ctx(ctx, *l).Warnf("msgID=%s handler failed (attempt %d): %v | Payload: %s", msg.ID, a, err, string(msg.Data))
99-
// return err
100-
// //cancel()
101-
// retryCount++
102-
//
103-
// // Check retry limit
104-
// if l.options.RetryLimit != -1 && retryCount >= l.options.RetryLimit {
105-
// return errors.Wrap(err, errors.Internalf("max retry limit reached"))
106-
// }
107-
//
108-
// // Apply backoff if enabled
109-
// if l.options.Backoff {
110-
// l.applyBackoff(retryCount)
111-
// }
112-
//
113-
// // Retry processing the message
114-
// continue
115-
// }
116-
//
117-
// //cancel()
118-
// // Acknowledge the message after successful processing
119-
// msg.Ack()
120-
// break
121-
//}
101+
cancel()
102+
msg.Ack()
103+
break
104+
}
122105

123106
return nil
124107
}
@@ -195,4 +178,5 @@ func (l *Subscriber[T]) applyBackoff(retryCount int) {
195178
backoffTime = l.options.MaxBackoff
196179
}
197180

181+
time.Sleep(backoffTime)
198182
}

bootstrap/function/middleware/logger/logger.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package logger
22

33
import (
4+
"encoding/json"
45
"fmt"
5-
6+
"github.com/cloudevents/sdk-go/v2/event"
67
"github.com/xgodev/boost/extra/middleware"
78
"github.com/xgodev/boost/model/errors"
89
"github.com/xgodev/boost/wrapper/log"
@@ -33,48 +34,39 @@ func NewAnyErrorMiddlewareWithOptions[T any](options *Options) middleware.AnyErr
3334
}
3435

3536
func (c *Logger[T]) Exec(ctx *middleware.AnyErrorContext[T], exec middleware.AnyErrorExecFunc[T], fallbackFunc middleware.AnyErrorReturnFunc[T]) (T, error) {
36-
//logCtx := zerolog.NewLogger().ToContext(ctx.GetContext())
37-
//ctx.SetContext(logCtx)
37+
logger := log.FromContext(ctx.GetContext()).WithTypeOf(*c)
38+
lm := c.logger(logger)
3839

3940
e, err := ctx.Next(exec, fallbackFunc)
4041
if err != nil {
41-
log.Ctx(ctx.GetContext(), *c).Warnf("handle with error: %s", err.Error())
42+
logger.Error(err.Error())
4243
if c.options.ErrorStack {
4344
fmt.Println(errors.ErrorStack(err))
4445
}
46+
}
4547

48+
var events []*event.Event
49+
50+
switch r := any(e).(type) {
51+
case []*event.Event:
52+
events = r
53+
case *event.Event:
54+
if r == nil {
55+
return e, err
56+
}
57+
events = []*event.Event{r}
58+
default:
4659
return e, err
4760
}
48-
//
49-
//var events []*event.Event
50-
//
51-
//switch r := any(e).(type) {
52-
//case []*event.Event:
53-
// events = r
54-
//case *event.Event:
55-
// if r == nil {
56-
// return e, err
57-
// }
58-
// events = []*event.Event{r}
59-
//default:
60-
// return e, err
61-
//}
62-
63-
//output, err := json.Marshal(e)
64-
//if err != nil {
65-
// log.FromContext(ctx.GetContext()).Errorf("error on marshall event for logging. %s", err.Error())
66-
//} else {
67-
// log.FromContext(ctx.GetContext()).WithField("output", output).Info("event sent")
68-
//}
69-
//
70-
//for _, ev := range events {
71-
// output, err := json.Marshal(ev)
72-
// if err != nil {
73-
// log.FromContext(ctx.GetContext()).Errorf("error on marshall event for logging. %s", err.Error())
74-
// } else {
75-
// log.FromContext(ctx.GetContext()).WithField("output", output).Info("event sent")
76-
// }
77-
//}
61+
62+
for _, ev := range events {
63+
j, err := json.Marshal(ev)
64+
if err != nil {
65+
logger.Errorf("error on marshall event for logging. %s", err.Error())
66+
} else {
67+
lm(string(j))
68+
}
69+
}
7870

7971
return e, err
8072
}

bootstrap/function/middleware/recovery/recover.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func NewAnyErrorMiddleware[T any]() middleware.AnyErrorMiddleware[T] {
2323
func (c *Recovery[T]) Exec(ctx *middleware.AnyErrorContext[T], exec middleware.AnyErrorExecFunc[T], fallbackFunc middleware.AnyErrorReturnFunc[T]) (res T, err error) {
2424
defer func() {
2525
if r := recover(); r != nil {
26-
runtimeCallback := FancyHandleError(r)
26+
runtimeCallback := GenerateStackTrace(r)
2727

2828
log.FromContext(ctx.GetContext()).WithTypeOf(*c).WithField("callers", runtimeCallback).Errorf("recovering: %v", r)
2929
err = fmt.Errorf(runtimeCallback)
@@ -32,36 +32,22 @@ func (c *Recovery[T]) Exec(ctx *middleware.AnyErrorContext[T], exec middleware.A
3232

3333
res, err = ctx.Next(exec, fallbackFunc)
3434

35-
//if r := recover(); r != nil {
36-
// log.FromContext(ctx.GetContext()).WithTypeOf(*c).Errorf("recovering: %v", r)
37-
// err = fmt.Errorf(FancyHandleError(r))
38-
//}
3935
return res, err
4036
}
4137

42-
// this logs the function name as well.
43-
func FancyHandleError(err any) string {
44-
// notice that we're using 1, so it will actually log the where
45-
// the error happened, 0 = this function, we don't want that.
46-
//pc, filename, line, _ := runtime.Caller(1)
38+
func GenerateStackTrace(err any) string {
4739
var pcs [10]uintptr
4840
n := runtime.Callers(1, pcs[:])
4941
iter := runtime.CallersFrames(pcs[:n])
5042

5143
b := strings.Builder{}
5244
for {
5345
f, more := iter.Next()
54-
//fmt.Printf(" %s %s:%d\n", f.Function, f.File, f.Line)
55-
5646
b.WriteString(fmt.Sprintf("%s %s:%d;", f.Function, f.File, f.Line))
5747
if !more {
5848
break
5949
}
6050
}
6151

6252
return b.String()
63-
64-
//log.Printf("[error] in %s[%s:%d] %v", runtime.FuncForPC(pc).Name(), filename, line, err)
65-
//return fmt.Sprintf("[error] in %s[%s:%d] %v %v", runtime.FuncForPC(pc).Name(), filename, line, err, n)
66-
6753
}
Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package pubsub
22

33
import (
4-
"context"
54
"sync"
65

76
fxpubsub "github.com/xgodev/boost/fx/modules/factory/contrib/cloud.google.com/pubsub/v2"
8-
"github.com/xgodev/boost/wrapper/publisher"
97
"github.com/xgodev/boost/wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v2"
108
"go.uber.org/fx"
119
)
@@ -23,21 +21,9 @@ func Module() fx.Option {
2321
fxpubsub.Module(),
2422
fx.Provide(
2523
pubsub.New,
26-
shut,
2724
),
2825
)
2926
})
3027

3128
return options
3229
}
33-
34-
func shut(lc fx.Lifecycle, pub publisher.Driver) fx.Option {
35-
lc.Append(fx.Hook{
36-
OnStop: func(ctx context.Context) error {
37-
pub.Close()
38-
return nil
39-
},
40-
})
41-
42-
return fx.Options()
43-
}

wrapper/publisher/driver.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,4 @@ import (
99

1010
type Driver interface {
1111
Publish(context.Context, []*cloudevents.Event) ([]PublishOutput, error)
12-
Close()
1312
}

wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v1/pubsub.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ type client struct {
2424
topics map[string]*pubsub.Topic
2525
}
2626

27-
func (p *client) Stop() {
28-
//TODO implement me
29-
panic("implement me")
30-
}
31-
3227
// NewWithConfigPath returns a publisher configured by a file path.
3328
func NewWithConfigPath(ctx context.Context, c *pubsub.Client, path string) (publisher.Driver, error) {
3429
options, err := NewOptionsWithPath(path)

wrapper/publisher/driver/contrib/cloud.google.com/pubsub/v2/pubsub.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,6 @@ type client struct {
2222
topics map[string]*pb.Publisher
2323
}
2424

25-
func (p *client) Stop() {
26-
//TODO implement me
27-
panic("implement me")
28-
}
29-
3025
// NewWithConfigPath returns a publisher configured by a file path.
3126
func NewWithConfigPath(ctx context.Context, c *pb.Client, path string) (publisher.Driver, error) {
3227
options, err := NewOptionsWithPath(path)

wrapper/publisher/driver/contrib/confluentinc/confluent-kafka-go/v2/confluent.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,6 @@ type client struct {
1919
options *Options
2020
}
2121

22-
func (p *client) Close() {
23-
//TODO implement me
24-
panic("implement me")
25-
}
26-
27-
func (p *client) Stop() {
28-
//TODO implement me
29-
panic("implement me")
30-
}
31-
3222
// NewWithConfigPath returns connection with options from config path.
3323
func NewWithConfigPath(ctx context.Context, producer *kafka.Producer, path string) (publisher.Driver, error) {
3424
options, err := NewOptionsWithPath(path)

wrapper/publisher/driver/contrib/lovoo/goka/v1/goka.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,6 @@ type client struct {
1717
emitter *g.Emitter
1818
}
1919

20-
func (p *client) Close() {
21-
//TODO implement me
22-
panic("implement me")
23-
}
24-
25-
func (p *client) Stop() {
26-
//TODO implement me
27-
panic("implement me")
28-
}
29-
3020
// New creates a new Kafka client.
3121
func New(emitter *g.Emitter) publisher.Driver {
3222
return &client{emitter: emitter}

wrapper/publisher/driver/contrib/nats-io/nats.go/v1/nats.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,6 @@ type client struct {
1818
conn *nats.Conn
1919
}
2020

21-
func (p *client) Close() {
22-
//TODO implement me
23-
panic("implement me")
24-
}
25-
26-
func (p *client) Stop() {
27-
//TODO implement me
28-
panic("implement me")
29-
}
30-
3121
// New creates a new NATS client.
3222
func New(conn *nats.Conn) publisher.Driver {
3323
return &client{conn: conn}

0 commit comments

Comments
 (0)