Skip to content

Commit decc4e0

Browse files
Merge pull request #33 from hellofresh/patch/logger-update
Update logger interface
2 parents b591c0b + 91d2c0f commit decc4e0

17 files changed

+583
-177
lines changed

Gopkg.lock

Lines changed: 4 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

driver/inmemory/matcher.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func (m *MetadataMatcher) Matches(metadata metadata.Metadata) bool {
8484
valid, err := c.Matches(metadata.Value(c.field))
8585
if err != nil {
8686
if m.logger != nil {
87-
m.logger.
88-
WithError(err).
89-
WithField("field", c.field).
90-
Warn("metadata constraint failed with error")
87+
m.logger.Warn("metadata constraint failed with error", func(e goengine.LoggerEntry) {
88+
e.Error(err)
89+
e.String("field", c.field)
90+
})
9191
}
9292
return false
9393
}

driver/sql/internal/background_processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func (b *BackgroundProcessor) startProcessor(ctx context.Context, handler Proces
124124
case notification := <-b.queue:
125125
// Execute the notification
126126
if err := handler(ctx, notification, b.Queue); err != nil {
127-
b.logger.
128-
WithError(err).
129-
WithField("notification", notification).
130-
Error("the ProcessHandler produced an error")
127+
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
128+
e.Error(err)
129+
e.Any("notification", notification)
130+
})
131131
}
132132
}
133133
}

driver/sql/internal/notification_projector.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ func (s *NotificationProjector) Execute(ctx context.Context, notification *drive
8585
}
8686
defer func() {
8787
if err := projectConn.Close(); err != nil {
88-
s.logger.WithError(err).Warn("failed to db close project connection")
88+
s.logger.Warn("failed to db close project connection", func(e goengine.LoggerEntry) {
89+
e.Error(err)
90+
})
8991
}
9092
}()
9193

@@ -95,7 +97,9 @@ func (s *NotificationProjector) Execute(ctx context.Context, notification *drive
9597
}
9698
defer func() {
9799
if err := streamConn.Close(); err != nil {
98-
s.logger.WithError(err).Warn("failed to db close stream connection")
100+
s.logger.Warn("failed to db close stream connection", func(e goengine.LoggerEntry) {
101+
e.Error(err)
102+
})
99103
}
100104
}()
101105

@@ -110,8 +114,6 @@ func (s *NotificationProjector) project(
110114
streamConn *sql.Conn,
111115
notification *driverSQL.ProjectionNotification,
112116
) error {
113-
logger := s.logger.WithField("notification", notification)
114-
115117
// Acquire the projection
116118
releaseLock, rawState, err := s.storage.Acquire(ctx, conn, notification)
117119
if err != nil {
@@ -126,7 +128,10 @@ func (s *NotificationProjector) project(
126128
}
127129
defer func() {
128130
if err := eventStream.Close(); err != nil {
129-
logger.WithError(err).Warn("failed to close the event stream")
131+
s.logger.Warn("failed to close the event stream", func(e goengine.LoggerEntry) {
132+
e.Any("notification", notification)
133+
e.Error(err)
134+
})
130135
}
131136
}()
132137

@@ -167,9 +172,10 @@ func (s *NotificationProjector) projectStream(
167172
// Resolve the payload event name
168173
eventName, err := s.resolver.ResolveName(msg.Payload())
169174
if err != nil {
170-
s.logger.
171-
WithField("payload", msg.Payload()).
172-
Warn("skipping event: unable to resolve payload name")
175+
s.logger.Warn("skipping event: unable to resolve payload name", func(e goengine.LoggerEntry) {
176+
e.Error(err)
177+
e.Any("payload", msg.Payload())
178+
})
173179
continue
174180
}
175181

driver/sql/postgres/conjoined_eventstore.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ func (e *ConjoinedEventStore) AppendTo(ctx context.Context, streamName goengine.
4242
}
4343
defer func() {
4444
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
45-
e.logger.
46-
WithError(err).
47-
Error("could not rollback transaction")
45+
e.logger.Error("could not rollback transaction", func(e goengine.LoggerEntry) {
46+
e.Error(err)
47+
})
4848
}
4949
}()
5050

@@ -58,9 +58,10 @@ func (e *ConjoinedEventStore) AppendTo(ctx context.Context, streamName goengine.
5858
// Resolve the payload event name
5959
eventName, err := e.resolver.ResolveName(msg.Payload())
6060
if err != nil {
61-
e.logger.
62-
WithField("payload", msg.Payload()).
63-
Warn("skipping event: unable to resolve payload name")
61+
e.logger.Warn("skipping event: unable to resolve payload name", func(e goengine.LoggerEntry) {
62+
e.Error(err)
63+
e.Any("payload", msg.Payload())
64+
})
6465
continue
6566
}
6667

driver/sql/postgres/eventstore.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ func (e *EventStore) Create(ctx context.Context, streamName goengine.StreamName)
9797
}
9898

9999
if errRollback := tx.Rollback(); errRollback != nil {
100-
e.logger.
101-
WithError(errRollback).
102-
WithField("query", q).
103-
Error("could not rollback transaction")
100+
e.logger.Error("could not rollback transaction", func(e goengine.LoggerEntry) {
101+
e.Error(errRollback)
102+
e.String("query", q)
103+
})
104104
}
105105

106106
return err
@@ -217,24 +217,21 @@ func (e *EventStore) AppendToWithExecer(ctx context.Context, conn driverSQL.Exec
217217
data...,
218218
)
219219
if err != nil {
220-
e.logger.
221-
WithError(err).
222-
WithFields(goengine.Fields{
223-
"streamName": streamName,
224-
"streamEvents": streamEvents,
225-
}).
226-
Warn("failed to insert messages into the event stream")
220+
e.logger.Warn("failed to insert messages into the event stream", func(e goengine.LoggerEntry) {
221+
e.Error(err)
222+
e.String("streamName", string(streamName))
223+
e.Any("streamEvents", streamEvents)
224+
})
227225

228226
return err
229227
}
230228

231-
e.logger.
232-
WithFields(goengine.Fields{
233-
"streamName": streamName,
234-
"streamEvents": streamEvents,
235-
"result": result,
236-
}).
237-
Debug("inserted messages into the event stream")
229+
e.logger.Debug("inserted messages into the event stream", func(e goengine.LoggerEntry) {
230+
e.Error(err)
231+
e.String("streamName", string(streamName))
232+
e.Any("streamEvents", streamEvents)
233+
e.Any("result", result)
234+
})
238235

239236
return nil
240237
}
@@ -306,10 +303,10 @@ func (e *EventStore) tableExists(ctx context.Context, tableName string) bool {
306303
).Scan(&exists)
307304

308305
if err != nil {
309-
e.logger.
310-
WithError(err).
311-
WithField("table", tableName).
312-
Warn("error on reading from information_schema")
306+
e.logger.Warn("error on reading from information_schema", func(e goengine.LoggerEntry) {
307+
e.Error(err)
308+
e.String("table", tableName)
309+
})
313310

314311
return false
315312
}

driver/sql/postgres/projector_aggregate.go

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ func NewAggregateProjector(
6060
if logger == nil {
6161
logger = goengine.NopLogger
6262
}
63-
logger = logger.WithField("projection", projection)
63+
logger = logger.WithFields(func(e goengine.LoggerEntry) {
64+
e.String("projection", projection.Name())
65+
})
6466

6567
processor, err := internal.NewBackgroundProcessor(10, 32, logger)
6668
if err != nil {
@@ -158,20 +160,24 @@ func (a *AggregateProjector) processNotification(
158160
}
159161

160162
// Resolve the action to take based on the error that occurred
161-
logger := a.logger.WithError(err).WithField("notification", notification)
163+
logFields := func(e goengine.LoggerEntry) {
164+
e.Error(err)
165+
e.Int64("notification.no", notification.No)
166+
e.String("notification.aggregate_id", notification.AggregateID)
167+
}
162168
switch resolveErrorAction(a.projectionErrorHandler, notification, err) {
163169
case errorFail:
164-
logger.Debug("ProcessHandler->ErrorHandler: marking projection as failed")
170+
a.logger.Debug("ProcessHandler->ErrorHandler: marking projection as failed", logFields)
165171
return a.markProjectionAsFailed(ctx, notification)
166172
case errorIgnore:
167-
logger.Debug("ProcessHandler->ErrorHandler: ignoring error")
173+
a.logger.Debug("ProcessHandler->ErrorHandler: ignoring error", logFields)
168174
return nil
169175
case errorRetry:
170-
logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification")
176+
a.logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification", logFields)
171177
return queue(ctx, notification)
172178
}
173179

174-
logger.Debug("ProcessHandler->ErrorHandler: error fallthrough")
180+
a.logger.Debug("ProcessHandler->ErrorHandler: error fallthrough", logFields)
175181
return err
176182
}
177183

@@ -183,7 +189,9 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
183189
}
184190
defer func() {
185191
if err := conn.Close(); err != nil {
186-
a.logger.WithError(err).Warn("failed to db close LoadOutOfSync connection")
192+
a.logger.Warn("failed to db close LoadOutOfSync connection", func(e goengine.LoggerEntry) {
193+
e.Error(err)
194+
})
187195
}
188196
}()
189197

@@ -193,7 +201,9 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
193201
}
194202
defer func() {
195203
if err := rows.Close(); err != nil {
196-
a.logger.WithError(err).Error("failed to close LoadOutOfSync rows")
204+
a.logger.Error("failed to close LoadOutOfSync rows", func(e goengine.LoggerEntry) {
205+
e.Error(err)
206+
})
197207
}
198208
}()
199209

@@ -219,13 +229,19 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
219229
AggregateID: aggregateID,
220230
}
221231

222-
logger := a.logger.WithField("notification", notification)
223232
if err := queue(ctx, notification); err != nil {
224-
logger.WithError(err).Error("failed to queue notification")
233+
a.logger.Error("failed to queue notification", func(e goengine.LoggerEntry) {
234+
e.Error(err)
235+
e.Int64("notification.no", notification.No)
236+
e.String("notification.aggregate_id", notification.AggregateID)
237+
})
225238
return err
226239
}
227240

228-
a.logger.Debug("send catchup")
241+
a.logger.Debug("send catchup", func(e goengine.LoggerEntry) {
242+
e.Int64("notification.no", notification.No)
243+
e.String("notification.aggregate_id", notification.AggregateID)
244+
})
229245
}
230246

231247
return rows.Close()
@@ -239,7 +255,9 @@ func (a *AggregateProjector) markProjectionAsFailed(ctx context.Context, notific
239255

240256
defer func() {
241257
if err := conn.Close(); err != nil {
242-
a.logger.WithError(err).Warn("failed to db close failure connection")
258+
a.logger.Warn("failed to db close failure connection", func(e goengine.LoggerEntry) {
259+
e.Error(err)
260+
})
243261
}
244262
}()
245263

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,11 @@ func (a *aggregateProjectionStorage) PersistState(conn driverSQL.Execer, notific
129129
return err
130130
}
131131

132-
a.logger.WithFields(goengine.Fields{
133-
"notification": notification,
134-
"state": state,
135-
}).Debug("updated projection state")
132+
a.logger.Debug("updated projection state", func(e goengine.LoggerEntry) {
133+
e.Int64("notification.no", notification.No)
134+
e.String("notification.aggregate_id", notification.AggregateID)
135+
e.Any("state", state)
136+
})
136137
return nil
137138
}
138139

@@ -149,7 +150,10 @@ func (a *aggregateProjectionStorage) Acquire(
149150
conn *sql.Conn,
150151
notification *driverSQL.ProjectionNotification,
151152
) (func(), *driverSQL.ProjectionRawState, error) {
152-
logger := a.logger.WithField("notification", notification)
153+
logFields := func(e goengine.LoggerEntry) {
154+
e.Int64("notification.no", notification.No)
155+
e.String("notification.aggregate_id", notification.AggregateID)
156+
}
153157
aggregateID := notification.AggregateID
154158

155159
res := conn.QueryRowContext(ctx, a.queryAcquireLock, aggregateID, notification.No)
@@ -178,9 +182,12 @@ func (a *aggregateProjectionStorage) Acquire(
178182
// The projection was locked by another process that died and for this reason not unlocked
179183
// In this case a application needs to decide what to do to avoid invalid projection states
180184
if err := a.releaseProjectionConnectionLock(conn, aggregateID); err != nil {
181-
logger.WithError(err).Error("failed to release lock for a projection with a locked row")
185+
a.logger.Error("failed to release lock for a projection with a locked row", func(e goengine.LoggerEntry) {
186+
logFields(e)
187+
e.Error(err)
188+
})
182189
} else {
183-
logger.Debug("released connection lock for a locked projection")
190+
a.logger.Debug("released connection lock for a locked projection", logFields)
184191
}
185192

186193
return nil, nil, driverSQL.ErrProjectionPreviouslyLocked
@@ -190,20 +197,26 @@ func (a *aggregateProjectionStorage) Acquire(
190197
_, err := conn.ExecContext(ctx, a.querySetRowLocked, aggregateID, true)
191198
if err != nil {
192199
if releaseErr := a.releaseProjection(conn, aggregateID); releaseErr != nil {
193-
logger.WithError(releaseErr).Error("failed to release lock while setting projection rows as locked")
200+
a.logger.Error("failed to release lock while setting projection rows as locked", func(e goengine.LoggerEntry) {
201+
logFields(e)
202+
e.Error(releaseErr)
203+
})
194204
} else {
195-
logger.Debug("failed to set projection as locked")
205+
a.logger.Debug("failed to set projection as locked", logFields)
196206
}
197207

198208
return nil, nil, err
199209
}
200-
logger.Debug("acquired projection lock")
210+
a.logger.Debug("acquired projection lock", logFields)
201211

202212
return func() {
203213
if err := a.releaseProjection(conn, aggregateID); err != nil {
204-
logger.WithError(err).Error("failed to release projection lock")
214+
a.logger.Error("failed to release projection lock", func(e goengine.LoggerEntry) {
215+
logFields(e)
216+
e.Error(err)
217+
})
205218
} else {
206-
logger.Debug("released projection lock")
219+
a.logger.Debug("released projection lock", logFields)
207220
}
208221
}, &driverSQL.ProjectionRawState{ProjectionState: rawState, Position: position}, nil
209222
}

0 commit comments

Comments
 (0)