Skip to content

Commit a2a4ecd

Browse files
authored
feat: Postgres log store implementation + Events API (#248)
* test: Improve logstore event test suite * feat: Logstore query events by destinations * feat: Logstore query events by status * chore: Migrator force * feat: Better cursor pagination with time_id column * refactor: Update init migration to support table partition * refactor: Better list event SQL query * feat: Persist delivery response * chore: Format delivery response * feat: Tenant authz check for delivery list query
1 parent 1487349 commit a2a4ecd

File tree

27 files changed

+768
-191
lines changed

27 files changed

+768
-191
lines changed

cmd/migrate/main.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,23 @@ func run(ctx context.Context, cfg *config.Config) error {
7878
} else {
7979
fmt.Println("no migrations rolled back")
8080
}
81+
case "force":
82+
// $ outpost migration force <version>
83+
84+
if len(os.Args) < 3 {
85+
return fmt.Errorf("version is required")
86+
}
87+
88+
version, err := strconv.Atoi(os.Args[2])
89+
if err != nil {
90+
return err
91+
}
92+
93+
if err := migrator.Force(ctx, version); err != nil {
94+
return err
95+
}
96+
fmt.Println("migrations forced")
97+
8198
default:
8299
return fmt.Errorf("invalid command: %s", command)
83100
}

internal/deliverymq/messagehandler.go

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"fmt"
77
"time"
88

9-
"github.com/google/uuid"
109
"github.com/hookdeck/outpost/internal/alert"
1110
"github.com/hookdeck/outpost/internal/backoff"
1211
"github.com/hookdeck/outpost/internal/consumer"
@@ -81,7 +80,7 @@ type messageHandler struct {
8180
}
8281

8382
type Publisher interface {
84-
PublishEvent(ctx context.Context, destination *models.Destination, event *models.Event) error
83+
PublishEvent(ctx context.Context, destination *models.Destination, event *models.Event) (*models.Delivery, error)
8584
}
8685

8786
type LogPublisher interface {
@@ -193,7 +192,14 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
193192
_, span := h.eventTracer.Deliver(ctx, &deliveryEvent, destination)
194193
defer span.End()
195194

196-
if err := h.publisher.PublishEvent(ctx, destination, &deliveryEvent.Event); err != nil {
195+
delivery, err := h.publisher.PublishEvent(ctx, destination, &deliveryEvent.Event)
196+
if err != nil {
197+
// If delivery is nil, it means no delivery was made.
198+
// This is an unexpected error and considered a pre-delivery error.
199+
if delivery == nil {
200+
return &PreDeliveryError{err: err}
201+
}
202+
197203
h.logger.Ctx(ctx).Error("failed to publish event",
198204
zap.Error(err),
199205
zap.String("delivery_event_id", deliveryEvent.ID),
@@ -202,10 +208,10 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
202208

203209
if h.shouldScheduleRetry(deliveryEvent, err) {
204210
if retryErr := h.scheduleRetry(ctx, deliveryEvent); retryErr != nil {
205-
return h.logDeliveryResult(ctx, &deliveryEvent, destination, errors.Join(err, retryErr))
211+
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, errors.Join(err, retryErr))
206212
}
207213
}
208-
return h.logDeliveryResult(ctx, &deliveryEvent, destination, deliveryErr)
214+
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, deliveryErr)
209215
}
210216

211217
// Handle successful delivery
@@ -216,38 +222,20 @@ func (h *messageHandler) doHandle(ctx context.Context, deliveryEvent models.Deli
216222
zap.Error(err),
217223
zap.String("delivery_event_id", deliveryEvent.ID),
218224
zap.String("retry_id", deliveryEvent.GetRetryID()))
219-
return h.logDeliveryResult(ctx, &deliveryEvent, destination, err)
225+
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, err)
220226
}
221227
logger.Audit("scheduled retry canceled",
222228
zap.String("delivery_event_id", deliveryEvent.ID),
223229
zap.String("retry_id", deliveryEvent.GetRetryID()))
224230
}
225-
return h.logDeliveryResult(ctx, &deliveryEvent, destination, nil)
226-
}
227-
228-
func (h *messageHandler) hasDeliveryError(err error) bool {
229-
var delErr *DeliveryError
230-
return errors.As(err, &delErr)
231+
return h.logDeliveryResult(ctx, &deliveryEvent, destination, delivery, nil)
231232
}
232233

233-
func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *models.DeliveryEvent, destination *models.Destination, err error) error {
234+
func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *models.DeliveryEvent, destination *models.Destination, delivery *models.Delivery, err error) error {
234235
logger := h.logger.Ctx(ctx)
235236

236237
// Set up delivery record
237-
deliveryEvent.Delivery = &models.Delivery{
238-
ID: uuid.New().String(),
239-
DeliveryEventID: deliveryEvent.ID,
240-
EventID: deliveryEvent.Event.ID,
241-
DestinationID: deliveryEvent.DestinationID,
242-
Time: time.Now(),
243-
}
244-
245-
// Check for delivery failures in the error chain
246-
if h.hasDeliveryError(err) {
247-
deliveryEvent.Delivery.Status = models.DeliveryStatusFailed
248-
} else {
249-
deliveryEvent.Delivery.Status = models.DeliveryStatusOK
250-
}
238+
deliveryEvent.Delivery = delivery
251239

252240
logger.Audit("event delivered",
253241
zap.String("delivery_event_id", deliveryEvent.ID),
@@ -294,7 +282,7 @@ func (h *messageHandler) logDeliveryResult(ctx context.Context, deliveryEvent *m
294282

295283
func (h *messageHandler) handleAlertAttempt(ctx context.Context, deliveryEvent *models.DeliveryEvent, destination *models.Destination, err error) {
296284
attempt := alert.DeliveryAttempt{
297-
Success: deliveryEvent.Delivery.Status == models.DeliveryStatusOK,
285+
Success: deliveryEvent.Delivery.Status == models.DeliveryStatusSuccess,
298286
DeliveryEvent: deliveryEvent,
299287
Destination: destination,
300288
Timestamp: deliveryEvent.Delivery.Time,

internal/deliverymq/messagehandler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -484,7 +484,7 @@ func TestMessageHandler_RetryFlow(t *testing.T) {
484484
assert.Equal(t, 1, publisher.current, "publish should succeed once")
485485
assert.Equal(t, event.ID, eventGetter.lastRetrievedID, "event getter should be called with correct ID")
486486
require.Len(t, logPublisher.deliveries, 1, "should have one delivery")
487-
assert.Equal(t, models.DeliveryStatusOK, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
487+
assert.Equal(t, models.DeliveryStatusSuccess, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
488488
}
489489

490490
func TestMessageHandler_Idempotency(t *testing.T) {
@@ -882,7 +882,7 @@ func TestManualDelivery_Success(t *testing.T) {
882882
assert.Len(t, retryScheduler.canceled, 1, "should cancel pending retries")
883883
assert.Equal(t, deliveryEvent.GetRetryID(), retryScheduler.canceled[0], "should cancel with correct retry ID")
884884
require.Len(t, logPublisher.deliveries, 1, "should have one delivery")
885-
assert.Equal(t, models.DeliveryStatusOK, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
885+
assert.Equal(t, models.DeliveryStatusSuccess, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
886886
assertAlertMonitor(t, alertMonitor, true, &destination, nil)
887887
}
888888

@@ -1023,7 +1023,7 @@ func TestManualDelivery_CancelError(t *testing.T) {
10231023
assert.Len(t, retryScheduler.canceled, 1, "should attempt to cancel retry")
10241024
assert.Equal(t, deliveryEvent.GetRetryID(), retryScheduler.canceled[0], "should cancel with correct retry ID")
10251025
require.Len(t, logPublisher.deliveries, 1, "should have one delivery")
1026-
assert.Equal(t, models.DeliveryStatusOK, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK despite cancel error")
1026+
assert.Equal(t, models.DeliveryStatusSuccess, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK despite cancel error")
10271027
assertAlertMonitor(t, alertMonitor, true, &destination, nil)
10281028
}
10291029

@@ -1221,7 +1221,7 @@ func TestMessageHandler_AlertMonitorError(t *testing.T) {
12211221
assert.False(t, mockMsg.nacked, "message should not be nacked despite alert monitor error")
12221222
assert.Equal(t, 1, publisher.current, "should publish once")
12231223
require.Len(t, logPublisher.deliveries, 1, "should have one delivery")
1224-
assert.Equal(t, models.DeliveryStatusOK, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
1224+
assert.Equal(t, models.DeliveryStatusSuccess, logPublisher.deliveries[0].Delivery.Status, "delivery status should be OK")
12251225

12261226
// Verify alert monitor was called but error was ignored
12271227
alertMonitor.AssertCalled(t, "HandleAttempt", mock.Anything, mock.Anything)

internal/deliverymq/mock_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"sync"
88
"time"
99

10+
"github.com/google/uuid"
1011
"github.com/hookdeck/outpost/internal/alert"
1112
"github.com/hookdeck/outpost/internal/models"
1213
mqs "github.com/hookdeck/outpost/internal/mqs"
@@ -29,18 +30,48 @@ func newMockPublisher(responses []error) *mockPublisher {
2930
return &mockPublisher{responses: responses}
3031
}
3132

32-
func (m *mockPublisher) PublishEvent(ctx context.Context, destination *models.Destination, event *models.Event) error {
33+
func (m *mockPublisher) PublishEvent(ctx context.Context, destination *models.Destination, event *models.Event) (*models.Delivery, error) {
3334
m.mu.Lock()
3435
defer m.mu.Unlock()
3536

3637
if m.current >= len(m.responses) {
3738
m.current++
38-
return nil
39+
return &models.Delivery{
40+
ID: uuid.New().String(),
41+
DeliveryEventID: uuid.New().String(),
42+
EventID: event.ID,
43+
DestinationID: destination.ID,
44+
Status: models.DeliveryStatusSuccess,
45+
Code: "OK",
46+
ResponseData: map[string]interface{}{},
47+
Time: time.Now(),
48+
}, nil
3949
}
4050

4151
resp := m.responses[m.current]
4252
m.current++
43-
return resp
53+
if resp == nil {
54+
return &models.Delivery{
55+
ID: uuid.New().String(),
56+
DeliveryEventID: uuid.New().String(),
57+
EventID: event.ID,
58+
DestinationID: destination.ID,
59+
Status: models.DeliveryStatusSuccess,
60+
Code: "OK",
61+
ResponseData: map[string]interface{}{},
62+
Time: time.Now(),
63+
}, nil
64+
}
65+
return &models.Delivery{
66+
ID: uuid.New().String(),
67+
DeliveryEventID: uuid.New().String(),
68+
EventID: event.ID,
69+
DestinationID: destination.ID,
70+
Status: models.DeliveryStatusFailed,
71+
Code: "ERR",
72+
ResponseData: map[string]interface{}{},
73+
Time: time.Now(),
74+
}, resp
4475
}
4576

4677
func (m *mockPublisher) Current() int {

internal/destregistry/providers/destawssqs/destawssqs.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -154,24 +154,34 @@ func (p *AWSSQSPublisher) Format(ctx context.Context, event *models.Event) (*sqs
154154
}, nil
155155
}
156156

157-
func (p *AWSSQSPublisher) Publish(ctx context.Context, event *models.Event) error {
157+
func (p *AWSSQSPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) {
158158
if err := p.BasePublisher.StartPublish(); err != nil {
159-
return err
159+
return nil, err
160160
}
161161
defer p.BasePublisher.FinishPublish()
162162

163163
msg, err := p.Format(ctx, event)
164164
if err != nil {
165-
return err
165+
return nil, err
166166
}
167167

168168
if _, err = p.client.SendMessage(ctx, msg); err != nil {
169-
return destregistry.NewErrDestinationPublishAttempt(err, "aws_sqs", map[string]interface{}{
170-
"error": err.Error(),
171-
})
169+
return &destregistry.Delivery{
170+
Status: "failed",
171+
Code: "ERR",
172+
Response: map[string]interface{}{
173+
"error": err.Error(),
174+
},
175+
}, destregistry.NewErrDestinationPublishAttempt(err, "aws_sqs", map[string]interface{}{
176+
"error": err.Error(),
177+
})
172178
}
173179

174-
return nil
180+
return &destregistry.Delivery{
181+
Status: "success",
182+
Code: "OK",
183+
Response: map[string]interface{}{},
184+
}, nil
175185
}
176186

177187
// ParseQueueURL extracts the full URL into baseURL & region

internal/destregistry/providers/destrabbitmq/destrabbitmq.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,23 @@ func (p *RabbitMQPublisher) Close() error {
113113
return nil
114114
}
115115

116-
func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) error {
116+
func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) {
117117
if err := p.BasePublisher.StartPublish(); err != nil {
118-
return err
118+
return nil, err
119119
}
120120
defer p.BasePublisher.FinishPublish()
121121

122122
// Ensure we have a valid connection
123123
if err := p.ensureConnection(ctx); err != nil {
124-
return destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
124+
return nil, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
125125
"error": "connection_failed",
126126
"message": err.Error(),
127127
})
128128
}
129129

130130
dataBytes, err := json.Marshal(event.Data)
131131
if err != nil {
132-
return err
132+
return nil, err
133133
}
134134

135135
headers := make(amqp091.Table)
@@ -148,13 +148,23 @@ func (p *RabbitMQPublisher) Publish(ctx context.Context, event *models.Event) er
148148
Body: []byte(dataBytes),
149149
},
150150
); err != nil {
151-
return destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
152-
"error": "publish_failed",
153-
"message": err.Error(),
154-
})
151+
return &destregistry.Delivery{
152+
Status: "failed",
153+
Code: "ERR",
154+
Response: map[string]interface{}{
155+
"error": err.Error(),
156+
},
157+
}, destregistry.NewErrDestinationPublishAttempt(err, "rabbitmq", map[string]interface{}{
158+
"error": "publish_failed",
159+
"message": err.Error(),
160+
})
155161
}
156162

157-
return nil
163+
return &destregistry.Delivery{
164+
Status: "success",
165+
Code: "OK",
166+
Response: map[string]interface{}{},
167+
}, nil
158168
}
159169

160170
func (p *RabbitMQPublisher) ensureConnection(_ context.Context) error {

0 commit comments

Comments
 (0)