@@ -2,6 +2,7 @@ package postgres
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "net/url"
78 "os"
@@ -14,9 +15,21 @@ import (
1415 "github.com/google/uuid"
1516 "github.com/jackc/pgx/v4"
1617 "github.com/jackc/pgx/v4/pgxpool"
18+
1719 "github.com/modernice/goes/codec"
1820 "github.com/modernice/goes/event"
1921 "github.com/modernice/goes/internal/slice"
22+ "github.com/modernice/goes/persistence/model"
23+ )
24+
25+ const (
26+ columnID = "id"
27+ columnName = "name"
28+ columnTime = "time"
29+ columnAggregateID = "aggregate_id"
30+ columnAggregateName = "aggregate_name"
31+ columnAggregateVersion = "aggregate_version"
32+ columnData = "data"
2033)
2134
2235var _ event.Store = & EventStore {}
@@ -217,15 +230,15 @@ func (store *EventStore) createIndexes(ctx context.Context) error {
217230 }{
218231 {
219232 name : "goes_name" ,
220- fields : []string {"name" },
233+ fields : []string {columnName },
221234 },
222235 {
223236 name : "goes_time" ,
224- fields : []string {"time" },
237+ fields : []string {columnTime },
225238 },
226239 {
227240 name : "goes_aggregate" ,
228- fields : []string {"aggregate_id" , "aggregate_name" , "aggregate_version" },
241+ fields : []string {columnAggregateID , columnAggregateName , columnAggregateVersion },
229242 unique : true ,
230243 },
231244 }
@@ -283,11 +296,26 @@ func (store *EventStore) Insert(ctx context.Context, events ...event.Event) erro
283296 aggregateVersionVal = "NULL"
284297 }
285298
286- if _ , err := tx .Exec (ctx , fmt .Sprintf (`INSERT INTO events (
287- id, name, time, aggregate_id, aggregate_name, aggregate_version, data
288- ) VALUES (
289- $1, $2, $3, %s, %s, %s, $4
290- )` , aggregateIDVal , aggregateName , aggregateVersionVal ), evt .ID (), evt .Name (), evt .Time ().UnixNano (), b ); err != nil {
299+ builder := squirrel .
300+ Insert (store .table ).
301+ Columns (
302+ columnID ,
303+ columnName ,
304+ columnTime ,
305+ columnAggregateID ,
306+ columnAggregateName ,
307+ columnAggregateVersion ,
308+ columnData ,
309+ ).
310+ Values (evt .ID (), evt .Name (), evt .Time ().UnixNano (), aggregateIDVal , aggregateName , aggregateVersionVal , b ).
311+ PlaceholderFormat (squirrel .Dollar )
312+
313+ sql , args , err := builder .ToSql ()
314+ if err != nil {
315+ return fmt .Errorf ("build sql: %w" , err )
316+ }
317+
318+ if _ , err := tx .Exec (ctx , sql , args ... ); err != nil {
291319 return fmt .Errorf ("insert %q event: %w" , evt .Name (), err )
292320 }
293321 }
@@ -301,12 +329,26 @@ func (store *EventStore) Find(ctx context.Context, id uuid.UUID) (event.Event, e
301329 return nil , fmt .Errorf ("connect: %w" , err )
302330 }
303331
332+ builder := squirrel .
333+ Select (
334+ columnID ,
335+ columnName ,
336+ columnTime ,
337+ columnAggregateID ,
338+ columnAggregateName ,
339+ columnAggregateVersion ,
340+ columnData ,
341+ ).
342+ From (store .table ).
343+ Where (squirrel.Eq {columnID : id })
344+
345+ sql , args , err := builder .ToSql ()
346+ if err != nil {
347+ return nil , fmt .Errorf ("build sql: %w" , err )
348+ }
349+
304350 var evt dbevent
305- if err := store .pool .QueryRow (
306- ctx ,
307- `SELECT id, name, time, aggregate_id, aggregate_name, aggregate_version, data FROM events WHERE id = $1` ,
308- id ,
309- ).Scan (
351+ if err := store .pool .QueryRow (ctx , sql , args ... ).Scan (
310352 & evt .ID ,
311353 & evt .Name ,
312354 & evt .Time ,
@@ -315,6 +357,10 @@ func (store *EventStore) Find(ctx context.Context, id uuid.UUID) (event.Event, e
315357 & evt .AggregateVersion ,
316358 & evt .Data ,
317359 ); err != nil {
360+ if errors .Is (err , pgx .ErrNoRows ) {
361+ return nil , model .ErrNotFound
362+ }
363+
318364 return nil , fmt .Errorf ("query event: %w" , err )
319365 }
320366
@@ -398,37 +444,37 @@ func (store *EventStore) Query(ctx context.Context, query event.Query) (<-chan e
398444
399445func (store * EventStore ) buildQuery (query event.Query ) (string , []any , error ) {
400446 builder := squirrel .
401- Select ("id" , "name" , "time" , "aggregate_id" , "aggregate_name" , "aggregate_version" , "data" ).
447+ Select (columnID , columnName , columnTime , columnAggregateID , columnAggregateName , columnAggregateVersion , columnData ).
402448 From (store .table ).
403449 PlaceholderFormat (squirrel .Dollar )
404450
405451 if ids := query .AggregateIDs (); len (ids ) > 0 {
406- builder = builder .Where (buildOREq ("aggregate_id" , ids ))
452+ builder = builder .Where (buildOREq (columnAggregateID , ids ))
407453 }
408454
409455 if names := query .AggregateNames (); len (names ) > 0 {
410- builder = builder .Where (squirrel.Eq {"aggregate_name" : names })
456+ builder = builder .Where (squirrel.Eq {columnAggregateName : names })
411457 }
412458
413459 if versions := query .AggregateVersions (); versions != nil {
414460 if exact := versions .Exact (); len (exact ) > 0 {
415- builder = builder .Where (squirrel.Eq {"aggregate_version" : exact })
461+ builder = builder .Where (squirrel.Eq {columnAggregateVersion : exact })
416462 }
417463
418464 if min := versions .Min (); len (min ) > 0 {
419- builder = builder .Where (buildORGte ("aggregate_version" , min ))
465+ builder = builder .Where (buildORGte (columnAggregateVersion , min ))
420466 }
421467
422468 if max := versions .Max (); len (max ) > 0 {
423- builder = builder .Where (buildORLte ("aggregate_version" , max ))
469+ builder = builder .Where (buildORLte (columnAggregateVersion , max ))
424470 }
425471
426472 if ranges := versions .Ranges (); len (ranges ) > 0 {
427473 or := make (squirrel.Or , len (ranges ))
428474 for i , r := range ranges {
429475 or [i ] = squirrel.And {
430- squirrel.GtOrEq {"aggregate_version" : r .Start ()},
431- squirrel.LtOrEq {"aggregate_version" : r .End ()},
476+ squirrel.GtOrEq {columnAggregateVersion : r .Start ()},
477+ squirrel.LtOrEq {columnAggregateVersion : r .End ()},
432478 }
433479 }
434480 builder = builder .Where (or )
@@ -438,10 +484,10 @@ func (store *EventStore) buildQuery(query event.Query) (string, []any, error) {
438484 if refs := query .Aggregates (); len (refs ) > 0 {
439485 or := make (squirrel.Or , len (refs ))
440486 for i , ref := range refs {
441- and := squirrel.And {squirrel.Eq {"aggregate_name" : ref .Name }}
487+ and := squirrel.And {squirrel.Eq {columnAggregateName : ref .Name }}
442488
443489 if ref .ID != uuid .Nil {
444- and = append (and , squirrel.Eq {"aggregate_id" : ref .ID })
490+ and = append (and , squirrel.Eq {columnAggregateID : ref .ID })
445491 }
446492
447493 or [i ] = and
@@ -450,34 +496,34 @@ func (store *EventStore) buildQuery(query event.Query) (string, []any, error) {
450496 }
451497
452498 if ids := query .IDs (); len (ids ) > 0 {
453- builder = builder .Where (buildOREq ("id" , ids ))
499+ builder = builder .Where (buildOREq (columnID , ids ))
454500 }
455501
456502 if names := query .Names (); len (names ) > 0 {
457- builder = builder .Where (squirrel.Eq {"name" : names })
503+ builder = builder .Where (squirrel.Eq {columnName : names })
458504 }
459505
460506 if times := query .Times (); times != nil {
461507 if exact := times .Exact (); len (exact ) > 0 {
462- builder = builder .Where (buildOREq ("time" , slice .Map (exact , func (t time.Time ) int64 {
508+ builder = builder .Where (buildOREq (columnTime , slice .Map (exact , func (t time.Time ) int64 {
463509 return t .UnixNano ()
464510 })))
465511 }
466512
467513 if min := times .Min (); ! min .IsZero () {
468- builder = builder .Where (squirrel.GtOrEq {"time" : min .UnixNano ()})
514+ builder = builder .Where (squirrel.GtOrEq {columnTime : min .UnixNano ()})
469515 }
470516
471517 if max := times .Max (); ! max .IsZero () {
472- builder = builder .Where (squirrel.LtOrEq {"time" : max .UnixNano ()})
518+ builder = builder .Where (squirrel.LtOrEq {columnTime : max .UnixNano ()})
473519 }
474520
475521 if ranges := times .Ranges (); len (ranges ) > 0 {
476522 or := make (squirrel.Or , len (ranges ))
477523 for i , r := range ranges {
478524 or [i ] = squirrel.And {
479- squirrel.GtOrEq {"time" : r .Start ().UnixNano ()},
480- squirrel.LtOrEq {"time" : r .End ().UnixNano ()},
525+ squirrel.GtOrEq {columnTime : r .Start ().UnixNano ()},
526+ squirrel.LtOrEq {columnTime : r .End ().UnixNano ()},
481527 }
482528 }
483529 builder = builder .Where (or )
@@ -495,13 +541,13 @@ func (store *EventStore) buildQuery(query event.Query) (string, []any, error) {
495541 var field string
496542 switch sorting .Sort {
497543 case event .SortAggregateID :
498- field = "aggregate_id"
544+ field = columnAggregateID
499545 case event .SortAggregateName :
500- field = "aggregate_name"
546+ field = columnAggregateName
501547 case event .SortAggregateVersion :
502- field = "aggregate_version"
548+ field = columnAggregateVersion
503549 case event .SortTime :
504- field = "time"
550+ field = columnTime
505551 }
506552
507553 orders [i ] = fmt .Sprintf ("%s %s" , field , dir )
@@ -531,7 +577,7 @@ func (store *EventStore) Delete(ctx context.Context, events ...event.Event) erro
531577 defer tx .Rollback (ctx )
532578
533579 for _ , evt := range events {
534- sql , args , err := squirrel .Delete (store .table ).Where (squirrel.Eq {"id" : evt .ID ()}).PlaceholderFormat (squirrel .Dollar ).ToSql ()
580+ sql , args , err := squirrel .Delete (store .table ).Where (squirrel.Eq {columnID : evt .ID ()}).PlaceholderFormat (squirrel .Dollar ).ToSql ()
535581 if err != nil {
536582 return fmt .Errorf ("delete event: %w [id=%s]" , err , evt .ID ())
537583 }
0 commit comments