Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/files/mock_repo_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ func (r *MockRepository) Record(_ context.Context, file AcceptedFile) error {
return r.Err
}

func (r *MockRepository) Cleanup(ctx context.Context, file AcceptedFile) error {
return r.Err
}

func (r *MockRepository) Cancel(_ context.Context, fileID string) error {
return r.Err
}
35 changes: 35 additions & 0 deletions internal/files/repo_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

type Repository interface {
Record(ctx context.Context, file AcceptedFile) error
Cleanup(ctx context.Context, file AcceptedFile) error
Cancel(ctx context.Context, fileID string) error
}

Expand Down Expand Up @@ -56,6 +57,24 @@ func (r *sqlRepository) Record(ctx context.Context, file AcceptedFile) error {
return nil
}

func (r *sqlRepository) Cleanup(ctx context.Context, file AcceptedFile) error {
ctx, span := telemetry.StartSpan(ctx, "files-cleanup", trace.WithAttributes(
attribute.String("achgateway.file_id", file.FileID),
attribute.String("achgateway.database", "sql"),
))
defer span.End()

qry := `DELETE FROM files WHERE file_id = ? AND shard_key = ?;`
_, err := r.db.ExecContext(ctx, qry,
file.FileID,
file.ShardKey,
)
if err != nil {
return fmt.Errorf("cleaning up fileID record: %w", err)
}
return nil
}

func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error {
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
attribute.String("achgateway.file_id", fileID),
Expand Down Expand Up @@ -98,6 +117,22 @@ func (r *spannerRepository) Record(ctx context.Context, file AcceptedFile) error
return nil
}

func (r *spannerRepository) Cleanup(ctx context.Context, file AcceptedFile) error {
ctx, span := telemetry.StartSpan(ctx, "files-cleanup", trace.WithAttributes(
attribute.String("achgateway.file_id", file.FileID),
attribute.String("achgateway.database", "spanner"),
))
defer span.End()

m := spanner.Delete("files", spanner.Key{file.FileID})

_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
if err != nil {
return fmt.Errorf("cleanup of files record failed: %w", err)
}
return nil
}

func (r *spannerRepository) Cancel(ctx context.Context, fileID string) error {
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
attribute.String("achgateway.file_id", fileID),
Expand Down
36 changes: 24 additions & 12 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/moov-io/achgateway/pkg/compliance"
"github.com/moov-io/achgateway/pkg/models"
"github.com/moov-io/base/admin"
"github.com/moov-io/base/database"
"github.com/moov-io/base/log"
"github.com/moov-io/base/telemetry"

Expand Down Expand Up @@ -464,27 +465,38 @@ func (fr *FileReceiver) processACHFile(ctx context.Context, file incoming.ACHFil
})

// We only want to handle files once, so become the winner by saving the record.
hostname, _ := os.Hostname()
err = fr.fileRepository.Record(ctx, files.AcceptedFile{
acceptanceData := files.AcceptedFile{
FileID: file.FileID,
ShardKey: file.ShardKey,
Hostname: hostname,
AcceptedAt: time.Now().In(time.UTC),
})
if err != nil {
logger.Warn().LogErrorf("not handling received ACH file: %v", err)
return nil
}
logger.Log("begin handling of received ACH file")
acceptanceData.Hostname, _ = os.Hostname()

err = agg.acceptFile(ctx, file)
if err != nil {
return logger.Error().LogErrorf("problem accepting file under shardName=%s", agg.shard.Name).Err()
// TODO(adam): add defer here instead? errgroup.Group instead?

fileRecordErr := fr.fileRepository.Record(ctx, acceptanceData)
if fileRecordErr != nil {
if database.UniqueViolation(err) {
logger.Debug().Log("already handled file -- skipping")
return nil
}
return logger.Error().LogErrorf("not handling received ACH file: %v", fileRecordErr).Err()
}

acceptFileErr := agg.acceptFile(ctx, file)
if acceptFileErr != nil {
// Delete the record from files table
deleteErr := fr.fileRepository.Cleanup(ctx, acceptanceData)
if deleteErr != nil {
logger.Error().LogErrorf("unable to cleanup files table: %v", err)
}
return logger.Error().LogErrorf("problem accepting file: %v", err).Err()
}

// Record the file as accepted
pendingFiles.With("shard", agg.shard.Name).Add(1)
logger.Log("finished handling ACH file")

logger.Log("accepted ACH file")

return nil
}
Expand Down
33 changes: 33 additions & 0 deletions internal/pipeline/file_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,36 @@ func TestFileReceiver__contains(t *testing.T) {
require.False(t, contains(err, "connect: "))
require.False(t, contains(err, "EOF"))
}

func TestFileReceiver_AcceptFileErr(t *testing.T) {
fr := testFileReceiver(t)

m, ok := fr.shardAggregators["testing"].merger.(*filesystemMerging)
require.True(t, ok)

ms := &MockStorage{
WriteFileErr: errors.New("bad thing"),
}
m.storage = ms

// queue a file
file, err := ach.ReadFile(filepath.Join("..", "..", "testdata", "ppd-debit.ach"))
require.NoError(t, err)

bs, err := compliance.Protect(nil, models.Event{
Event: models.QueueACHFile{
FileID: base.ID(),
ShardKey: "testing",
File: file,
},
})
require.NoError(t, err)

err = fr.Publisher.Send(context.Background(), &pubsub.Message{
Body: bs,
})
require.NoError(t, err)

// We should see an error, but can clear ms.WriteFileErr and retry
// TODO(adam):
}
78 changes: 78 additions & 0 deletions internal/storage/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package storage

import (
"io/fs"
)

type MockStorage struct {
Chest

OpenErr error
GlobErr error

ReadDirErr error

ReplaceFileErr error
ReplaceDirErr error

MkdirAllErr error
RmdirAllErr error

WriteFileErr error
}

func (m *MockStorage) Open(path string) (fs.File, error) {
if m.OpenErr != nil {
return nil, m.OpenErr
}
return m.Chest.Open(path)
}

func (m *MockStorage) Glob(pattern string) ([]FileStat, error) {
if m.GlobErr != nil {
return nil, m.GlobErr
}
return m.Chest.Glob(path)

Check failure on line 35 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

undefined: path

Check failure on line 35 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

undefined: path

Check failure on line 35 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path

Check failure on line 35 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path
}

func (m *MockStorage) ReadDir(name string) ([]fs.DirEntry, error) {
if m.ReadDirErr != nil {
return nil, m.ReadDirErr
}
return m.Chest.ReadDir(path)

Check failure on line 42 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

undefined: path

Check failure on line 42 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

undefined: path

Check failure on line 42 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path

Check failure on line 42 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path
}

func (m *MockStorage) ReplaceFile(oldpath, newpath string) error {
if m.ReplaceFileErr != nil {
return nil, m.ReplaceFileErr

Check failure on line 47 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

too many return values

Check failure on line 47 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

too many return values

Check failure on line 47 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

too many return values

Check failure on line 47 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

too many return values
}
return m.Chest.ReplaceFile(path)

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

not enough arguments in call to m.Chest.ReplaceFile

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

undefined: path

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

not enough arguments in call to m.Chest.ReplaceFile

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

undefined: path

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

not enough arguments in call to m.Chest.ReplaceFile

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

not enough arguments in call to m.Chest.ReplaceFile

Check failure on line 49 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path
}

func (m *MockStorage) ReplaceDir(oldpath, newpath string) error {
if m.ReplaceDirErr != nil {
return nil, m.ReplaceDirErr

Check failure on line 54 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

too many return values

Check failure on line 54 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

too many return values

Check failure on line 54 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

too many return values

Check failure on line 54 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

too many return values
}
return m.Chest.ReplaceDir(path)

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

not enough arguments in call to m.Chest.ReplaceDir

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

undefined: path

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

not enough arguments in call to m.Chest.ReplaceDir

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

undefined: path

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

not enough arguments in call to m.Chest.ReplaceDir

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

not enough arguments in call to m.Chest.ReplaceDir

Check failure on line 56 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

undefined: path
}

func (m *MockStorage) MkdirAll(path string) error {
if m.MkdirErr != nil {

Check failure on line 60 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 60 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 60 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 60 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)
return nil, m.MkdirErr

Check failure on line 61 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (ubuntu-latest)

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 61 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / Go Build (macos-latest)

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 61 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)

Check failure on line 61 in internal/storage/mock.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

m.MkdirErr undefined (type *MockStorage has no field or method MkdirErr)
}
return m.Chest.Mkdir(path)
}

func (m *MockStorage) RmdirAll(path string) error {
if m.RmdirErr != nil {
return nil, m.RmdirErr
}
return m.Chest.Rmdir(path)
}

func (m *MockStorage) WriteFile(path string, contents []byte) error {
if m.WriteFileErr != nil {
return nil, m.WriteFileErr
}
return m.Chest.WriteFile(path)
}
Loading