Skip to content

Commit 0b00a81

Browse files
Rewrite Logger interface
1 parent b591c0b commit 0b00a81

File tree

14 files changed

+186
-124
lines changed

14 files changed

+186
-124
lines changed

driver/inmemory/matcher.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,10 @@ func (m *MetadataMatcher) Matches(metadata metadata.Metadata) bool {
8484
valid, err := c.Matches(metadata.Value(c.field))
8585
if err != nil {
8686
if m.logger != nil {
87-
m.logger.
88-
WithError(err).
89-
WithField("field", c.field).
90-
Warn("metadata constraint failed with error")
87+
m.logger.Warn("metadata constraint failed with error", func(e goengine.LoggerEntry) {
88+
e.Error(err)
89+
e.String("field", c.field)
90+
})
9191
}
9292
return false
9393
}

driver/sql/internal/background_processor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ func (b *BackgroundProcessor) startProcessor(ctx context.Context, handler Proces
124124
case notification := <-b.queue:
125125
// Execute the notification
126126
if err := handler(ctx, notification, b.Queue); err != nil {
127-
b.logger.
128-
WithError(err).
129-
WithField("notification", notification).
130-
Error("the ProcessHandler produced an error")
127+
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
128+
e.Error(err)
129+
e.Any("notification", notification)
130+
})
131131
}
132132
}
133133
}

driver/sql/internal/notification_projector.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ func (s *NotificationProjector) Execute(ctx context.Context, notification *drive
8585
}
8686
defer func() {
8787
if err := projectConn.Close(); err != nil {
88-
s.logger.WithError(err).Warn("failed to db close project connection")
88+
s.logger.Warn("failed to db close project connection", func(e goengine.LoggerEntry) {
89+
e.Error(err)
90+
})
8991
}
9092
}()
9193

@@ -95,7 +97,9 @@ func (s *NotificationProjector) Execute(ctx context.Context, notification *drive
9597
}
9698
defer func() {
9799
if err := streamConn.Close(); err != nil {
98-
s.logger.WithError(err).Warn("failed to db close stream connection")
100+
s.logger.Warn("failed to db close stream connection", func(e goengine.LoggerEntry) {
101+
e.Error(err)
102+
})
99103
}
100104
}()
101105

@@ -110,8 +114,6 @@ func (s *NotificationProjector) project(
110114
streamConn *sql.Conn,
111115
notification *driverSQL.ProjectionNotification,
112116
) error {
113-
logger := s.logger.WithField("notification", notification)
114-
115117
// Acquire the projection
116118
releaseLock, rawState, err := s.storage.Acquire(ctx, conn, notification)
117119
if err != nil {
@@ -126,7 +128,10 @@ func (s *NotificationProjector) project(
126128
}
127129
defer func() {
128130
if err := eventStream.Close(); err != nil {
129-
logger.WithError(err).Warn("failed to close the event stream")
131+
s.logger.Warn("failed to close the event stream", func(e goengine.LoggerEntry) {
132+
e.Any("notification", notification)
133+
e.Error(err)
134+
})
130135
}
131136
}()
132137

@@ -167,9 +172,10 @@ func (s *NotificationProjector) projectStream(
167172
// Resolve the payload event name
168173
eventName, err := s.resolver.ResolveName(msg.Payload())
169174
if err != nil {
170-
s.logger.
171-
WithField("payload", msg.Payload()).
172-
Warn("skipping event: unable to resolve payload name")
175+
s.logger.Warn("skipping event: unable to resolve payload name", func(e goengine.LoggerEntry) {
176+
e.Error(err)
177+
e.Any("payload", msg.Payload())
178+
})
173179
continue
174180
}
175181

driver/sql/postgres/conjoined_eventstore.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ func (e *ConjoinedEventStore) AppendTo(ctx context.Context, streamName goengine.
4242
}
4343
defer func() {
4444
if err := tx.Rollback(); err != nil && err != sql.ErrTxDone {
45-
e.logger.
46-
WithError(err).
47-
Error("could not rollback transaction")
45+
e.logger.Error("could not rollback transaction", func(e goengine.LoggerEntry) {
46+
e.Error(err)
47+
})
4848
}
4949
}()
5050

@@ -58,9 +58,10 @@ func (e *ConjoinedEventStore) AppendTo(ctx context.Context, streamName goengine.
5858
// Resolve the payload event name
5959
eventName, err := e.resolver.ResolveName(msg.Payload())
6060
if err != nil {
61-
e.logger.
62-
WithField("payload", msg.Payload()).
63-
Warn("skipping event: unable to resolve payload name")
61+
e.logger.Warn("skipping event: unable to resolve payload name", func(e goengine.LoggerEntry) {
62+
e.Error(err)
63+
e.Any("payload", msg.Payload())
64+
})
6465
continue
6566
}
6667

driver/sql/postgres/eventstore.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ func (e *EventStore) Create(ctx context.Context, streamName goengine.StreamName)
9797
}
9898

9999
if errRollback := tx.Rollback(); errRollback != nil {
100-
e.logger.
101-
WithError(errRollback).
102-
WithField("query", q).
103-
Error("could not rollback transaction")
100+
e.logger.Error("could not rollback transaction", func(e goengine.LoggerEntry) {
101+
e.Error(errRollback)
102+
e.String("query", q)
103+
})
104104
}
105105

106106
return err
@@ -217,24 +217,21 @@ func (e *EventStore) AppendToWithExecer(ctx context.Context, conn driverSQL.Exec
217217
data...,
218218
)
219219
if err != nil {
220-
e.logger.
221-
WithError(err).
222-
WithFields(goengine.Fields{
223-
"streamName": streamName,
224-
"streamEvents": streamEvents,
225-
}).
226-
Warn("failed to insert messages into the event stream")
220+
e.logger.Warn("failed to insert messages into the event stream", func(e goengine.LoggerEntry) {
221+
e.Error(err)
222+
e.String("streamName", string(streamName))
223+
e.Any("streamEvents", streamEvents)
224+
})
227225

228226
return err
229227
}
230228

231-
e.logger.
232-
WithFields(goengine.Fields{
233-
"streamName": streamName,
234-
"streamEvents": streamEvents,
235-
"result": result,
236-
}).
237-
Debug("inserted messages into the event stream")
229+
e.logger.Debug("inserted messages into the event stream", func(e goengine.LoggerEntry) {
230+
e.Error(err)
231+
e.String("streamName", string(streamName))
232+
e.Any("streamEvents", streamEvents)
233+
e.Any("result", result)
234+
})
238235

239236
return nil
240237
}
@@ -306,10 +303,10 @@ func (e *EventStore) tableExists(ctx context.Context, tableName string) bool {
306303
).Scan(&exists)
307304

308305
if err != nil {
309-
e.logger.
310-
WithError(err).
311-
WithField("table", tableName).
312-
Warn("error on reading from information_schema")
306+
e.logger.Warn("error on reading from information_schema", func(e goengine.LoggerEntry) {
307+
e.Error(err)
308+
e.String("table", tableName)
309+
})
313310

314311
return false
315312
}

driver/sql/postgres/projector_aggregate.go

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ func NewAggregateProjector(
6060
if logger == nil {
6161
logger = goengine.NopLogger
6262
}
63-
logger = logger.WithField("projection", projection)
63+
logger = logger.WithFields(func(e goengine.LoggerEntry) {
64+
e.String("projection", projection.Name())
65+
})
6466

6567
processor, err := internal.NewBackgroundProcessor(10, 32, logger)
6668
if err != nil {
@@ -158,20 +160,23 @@ func (a *AggregateProjector) processNotification(
158160
}
159161

160162
// Resolve the action to take based on the error that occurred
161-
logger := a.logger.WithError(err).WithField("notification", notification)
163+
logFields := func(e goengine.LoggerEntry) {
164+
e.Error(err)
165+
e.Any("notification", notification)
166+
}
162167
switch resolveErrorAction(a.projectionErrorHandler, notification, err) {
163168
case errorFail:
164-
logger.Debug("ProcessHandler->ErrorHandler: marking projection as failed")
169+
a.logger.Debug("ProcessHandler->ErrorHandler: marking projection as failed", logFields)
165170
return a.markProjectionAsFailed(ctx, notification)
166171
case errorIgnore:
167-
logger.Debug("ProcessHandler->ErrorHandler: ignoring error")
172+
a.logger.Debug("ProcessHandler->ErrorHandler: ignoring error", logFields)
168173
return nil
169174
case errorRetry:
170-
logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification")
175+
a.logger.Debug("ProcessHandler->ErrorHandler: re-queueing notification", logFields)
171176
return queue(ctx, notification)
172177
}
173178

174-
logger.Debug("ProcessHandler->ErrorHandler: error fallthrough")
179+
a.logger.Debug("ProcessHandler->ErrorHandler: error fallthrough", logFields)
175180
return err
176181
}
177182

@@ -183,7 +188,9 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
183188
}
184189
defer func() {
185190
if err := conn.Close(); err != nil {
186-
a.logger.WithError(err).Warn("failed to db close LoadOutOfSync connection")
191+
a.logger.Warn("failed to db close LoadOutOfSync connection", func(e goengine.LoggerEntry) {
192+
e.Error(err)
193+
})
187194
}
188195
}()
189196

@@ -193,7 +200,9 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
193200
}
194201
defer func() {
195202
if err := rows.Close(); err != nil {
196-
a.logger.WithError(err).Error("failed to close LoadOutOfSync rows")
203+
a.logger.Error("failed to close LoadOutOfSync rows", func(e goengine.LoggerEntry) {
204+
e.Error(err)
205+
})
197206
}
198207
}()
199208

@@ -219,13 +228,17 @@ func (a *AggregateProjector) triggerOutOfSyncProjections(ctx context.Context, qu
219228
AggregateID: aggregateID,
220229
}
221230

222-
logger := a.logger.WithField("notification", notification)
223231
if err := queue(ctx, notification); err != nil {
224-
logger.WithError(err).Error("failed to queue notification")
232+
a.logger.Error("failed to queue notification", func(e goengine.LoggerEntry) {
233+
e.Error(err)
234+
e.Any("notification", notification)
235+
})
225236
return err
226237
}
227238

228-
a.logger.Debug("send catchup")
239+
a.logger.Debug("send catchup", func(e goengine.LoggerEntry) {
240+
e.Any("notification", notification)
241+
})
229242
}
230243

231244
return rows.Close()
@@ -239,7 +252,9 @@ func (a *AggregateProjector) markProjectionAsFailed(ctx context.Context, notific
239252

240253
defer func() {
241254
if err := conn.Close(); err != nil {
242-
a.logger.WithError(err).Warn("failed to db close failure connection")
255+
a.logger.Warn("failed to db close failure connection", func(e goengine.LoggerEntry) {
256+
e.Error(err)
257+
})
243258
}
244259
}()
245260

driver/sql/postgres/projector_aggregate_storage.go

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,10 @@ func (a *aggregateProjectionStorage) PersistState(conn driverSQL.Execer, notific
129129
return err
130130
}
131131

132-
a.logger.WithFields(goengine.Fields{
133-
"notification": notification,
134-
"state": state,
135-
}).Debug("updated projection state")
132+
a.logger.Debug("updated projection state", func(e goengine.LoggerEntry) {
133+
e.Any("notification", notification)
134+
e.Any("state", state)
135+
})
136136
return nil
137137
}
138138

@@ -149,7 +149,9 @@ func (a *aggregateProjectionStorage) Acquire(
149149
conn *sql.Conn,
150150
notification *driverSQL.ProjectionNotification,
151151
) (func(), *driverSQL.ProjectionRawState, error) {
152-
logger := a.logger.WithField("notification", notification)
152+
logFields := func(e goengine.LoggerEntry) {
153+
e.Any("notification", notification)
154+
}
153155
aggregateID := notification.AggregateID
154156

155157
res := conn.QueryRowContext(ctx, a.queryAcquireLock, aggregateID, notification.No)
@@ -178,9 +180,12 @@ func (a *aggregateProjectionStorage) Acquire(
178180
// The projection was locked by another process that died and for this reason not unlocked
179181
// In this case a application needs to decide what to do to avoid invalid projection states
180182
if err := a.releaseProjectionConnectionLock(conn, aggregateID); err != nil {
181-
logger.WithError(err).Error("failed to release lock for a projection with a locked row")
183+
a.logger.Error("failed to release lock for a projection with a locked row", func(e goengine.LoggerEntry) {
184+
logFields(e)
185+
e.Error(err)
186+
})
182187
} else {
183-
logger.Debug("released connection lock for a locked projection")
188+
a.logger.Debug("released connection lock for a locked projection", logFields)
184189
}
185190

186191
return nil, nil, driverSQL.ErrProjectionPreviouslyLocked
@@ -190,20 +195,26 @@ func (a *aggregateProjectionStorage) Acquire(
190195
_, err := conn.ExecContext(ctx, a.querySetRowLocked, aggregateID, true)
191196
if err != nil {
192197
if releaseErr := a.releaseProjection(conn, aggregateID); releaseErr != nil {
193-
logger.WithError(releaseErr).Error("failed to release lock while setting projection rows as locked")
198+
a.logger.Error("failed to release lock while setting projection rows as locked", func(e goengine.LoggerEntry) {
199+
logFields(e)
200+
e.Error(releaseErr)
201+
})
194202
} else {
195-
logger.Debug("failed to set projection as locked")
203+
a.logger.Debug("failed to set projection as locked", logFields)
196204
}
197205

198206
return nil, nil, err
199207
}
200-
logger.Debug("acquired projection lock")
208+
a.logger.Debug("acquired projection lock", logFields)
201209

202210
return func() {
203211
if err := a.releaseProjection(conn, aggregateID); err != nil {
204-
logger.WithError(err).Error("failed to release projection lock")
212+
a.logger.Error("failed to release projection lock", func(e goengine.LoggerEntry) {
213+
logFields(e)
214+
e.Error(err)
215+
})
205216
} else {
206-
logger.Debug("released projection lock")
217+
a.logger.Debug("released projection lock", logFields)
207218
}
208219
}, &driverSQL.ProjectionRawState{ProjectionState: rawState, Position: position}, nil
209220
}

driver/sql/postgres/projector_stream.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ func NewStreamProjector(
5454
if logger == nil {
5555
logger = goengine.NopLogger
5656
}
57-
logger = logger.WithField("projection", projection)
57+
logger = logger.WithFields(func(e goengine.LoggerEntry) {
58+
e.String("projection", projection.Name())
59+
})
5860

5961
var (
6062
stateDecoder driverSQL.ProjectionStateDecoder
@@ -144,16 +146,19 @@ func (s *StreamProjector) processNotification(
144146
}
145147

146148
// Resolve the action to take based on the error that occurred
147-
logger := s.logger.WithError(err).WithField("notification", notification)
149+
logFields := func(e goengine.LoggerEntry) {
150+
e.Error(err)
151+
e.Any("notification", notification)
152+
}
148153
switch resolveErrorAction(s.projectionErrorHandler, notification, err) {
149154
case errorRetry:
150-
logger.Debug("Trigger->ErrorHandler: retrying notification")
155+
s.logger.Debug("Trigger->ErrorHandler: retrying notification", logFields)
151156
continue
152157
case errorIgnore:
153-
logger.Debug("Trigger->ErrorHandler: ignoring error")
158+
s.logger.Debug("Trigger->ErrorHandler: ignoring error", logFields)
154159
return nil
155160
case errorFail, errorFallthrough:
156-
logger.Debug("Trigger->ErrorHandler: error fallthrough")
161+
s.logger.Debug("Trigger->ErrorHandler: error fallthrough", logFields)
157162
return err
158163
}
159164
}

0 commit comments

Comments
 (0)