Skip to content

Commit 461c196

Browse files
Let the DB check if projection row exists
There is no need to check if a projection exists when we can just ignore an insert.
1 parent f243f96 commit 461c196

File tree

1 file changed

+5
-70
lines changed

1 file changed

+5
-70
lines changed

driver/sql/postgres/projector_stream.go

Lines changed: 5 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,10 @@ import (
88
"strings"
99
"sync"
1010

11-
"github.com/pkg/errors"
12-
1311
"github.com/hellofresh/goengine"
1412
driverSQL "github.com/hellofresh/goengine/driver/sql"
1513
internalSQL "github.com/hellofresh/goengine/driver/sql/internal"
14+
"github.com/pkg/errors"
1615
)
1716

1817
// StreamProjector is a postgres projector used to execute a projection against an event stream.
@@ -172,80 +171,16 @@ func (s *StreamProjector) processNotification(
172171

173172
// setupProjection Creates the projection if none exists
174173
func (s *StreamProjector) setupProjection(ctx context.Context) error {
175-
conn, err := internalSQL.AcquireConn(ctx, s.db)
176-
if err != nil {
177-
return err
178-
}
179-
defer func() {
180-
if err := conn.Close(); err != nil {
181-
s.logger.WithError(err).Warn("failed to db close connection")
182-
}
183-
}()
184-
185-
if s.projectionExists(ctx, conn) {
186-
return nil
187-
}
188-
if err := s.createProjection(ctx, conn); err != nil {
189-
return err
190-
}
191-
192-
return nil
193-
}
194-
195-
func (s *StreamProjector) projectionExists(ctx context.Context, conn *sql.Conn) bool {
196-
rows, err := conn.QueryContext(
197-
ctx,
198-
fmt.Sprintf(
199-
`SELECT 1 FROM %s WHERE name = $1 LIMIT 1`,
200-
QuoteIdentifier(s.projectionTable),
201-
),
202-
s.projectionName,
203-
)
204-
if err != nil {
205-
s.logger.
206-
WithError(err).
207-
WithField("table", s.projectionTable).
208-
Error("failed to query projection table")
209-
return false
210-
}
211-
defer func() {
212-
if err := rows.Close(); err != nil {
213-
s.logger.
214-
WithError(err).
215-
WithField("table", s.projectionTable).
216-
Warn("failed to close rows")
217-
}
218-
}()
219-
220-
if !rows.Next() {
221-
return false
222-
}
223-
224-
var found bool
225-
if err := rows.Scan(&found); err != nil {
226-
s.logger.
227-
WithError(err).
228-
WithField("table", s.projectionTable).
229-
Error("failed to scan projection table")
230-
return false
231-
}
232-
233-
return found
234-
}
235-
236-
func (s *StreamProjector) createProjection(ctx context.Context, conn *sql.Conn) error {
237-
// Ignore duplicate inserts. This can occur when multiple projectors are started at the same time.
238-
_, err := conn.ExecContext(
174+
// Ignore duplicate inserts
175+
_, err := s.db.ExecContext(
239176
ctx,
177+
/* #nosec */
240178
fmt.Sprintf(
241179
`INSERT INTO %s (name) VALUES ($1) ON CONFLICT DO NOTHING`,
242180
QuoteIdentifier(s.projectionTable),
243181
),
244182
s.projectionName,
245183
)
246-
if err != nil {
247-
return err
248-
}
249184

250-
return nil
185+
return err
251186
}

0 commit comments

Comments
 (0)