diff --git a/internal/files/mock_repo_files.go b/internal/files/mock_repo_files.go index 8ebbb8f..efaa928 100644 --- a/internal/files/mock_repo_files.go +++ b/internal/files/mock_repo_files.go @@ -14,6 +14,10 @@ func (r *MockRepository) Record(_ context.Context, file AcceptedFile) error { return r.Err } +func (r *MockRepository) Cleanup(ctx context.Context, file AcceptedFile) error { + return r.Err +} + func (r *MockRepository) Cancel(_ context.Context, fileID string) error { return r.Err } diff --git a/internal/files/repo_files.go b/internal/files/repo_files.go index dec2d5e..1c3f5d3 100644 --- a/internal/files/repo_files.go +++ b/internal/files/repo_files.go @@ -15,6 +15,7 @@ import ( type Repository interface { Record(ctx context.Context, file AcceptedFile) error + Cleanup(ctx context.Context, file AcceptedFile) error Cancel(ctx context.Context, fileID string) error } @@ -56,6 +57,24 @@ func (r *sqlRepository) Record(ctx context.Context, file AcceptedFile) error { return nil } +func (r *sqlRepository) Cleanup(ctx context.Context, file AcceptedFile) error { + ctx, span := telemetry.StartSpan(ctx, "files-cleanup", trace.WithAttributes( + attribute.String("achgateway.file_id", file.FileID), + attribute.String("achgateway.database", "sql"), + )) + defer span.End() + + qry := `DELETE FROM files WHERE file_id = ? AND shard_key = ?;` + _, err := r.db.ExecContext(ctx, qry, + file.FileID, + file.ShardKey, + ) + if err != nil { + return fmt.Errorf("cleaning up fileID record: %w", err) + } + return nil +} + func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error { ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes( attribute.String("achgateway.file_id", fileID), @@ -98,6 +117,22 @@ func (r *spannerRepository) Record(ctx context.Context, file AcceptedFile) error return nil } +func (r *spannerRepository) Cleanup(ctx context.Context, file AcceptedFile) error { + ctx, span := telemetry.StartSpan(ctx, "files-cleanup", trace.WithAttributes( + attribute.String("achgateway.file_id", file.FileID), + attribute.String("achgateway.database", "spanner"), + )) + defer span.End() + + m := spanner.Delete("files", spanner.Key{file.FileID}) + + _, err := r.client.Apply(ctx, []*spanner.Mutation{m}) + if err != nil { + return fmt.Errorf("cleanup of files record failed: %w", err) + } + return nil +} + func (r *spannerRepository) Cancel(ctx context.Context, fileID string) error { ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes( attribute.String("achgateway.file_id", fileID), diff --git a/internal/pipeline/file_receiver.go b/internal/pipeline/file_receiver.go index 70eaf5a..7469174 100644 --- a/internal/pipeline/file_receiver.go +++ b/internal/pipeline/file_receiver.go @@ -36,6 +36,7 @@ import ( "github.com/moov-io/achgateway/pkg/compliance" "github.com/moov-io/achgateway/pkg/models" "github.com/moov-io/base/admin" + "github.com/moov-io/base/database" "github.com/moov-io/base/log" "github.com/moov-io/base/telemetry" @@ -464,27 +465,38 @@ func (fr *FileReceiver) processACHFile(ctx context.Context, file incoming.ACHFil }) // We only want to handle files once, so become the winner by saving the record. - hostname, _ := os.Hostname() - err = fr.fileRepository.Record(ctx, files.AcceptedFile{ + acceptanceData := files.AcceptedFile{ FileID: file.FileID, ShardKey: file.ShardKey, - Hostname: hostname, AcceptedAt: time.Now().In(time.UTC), - }) - if err != nil { - logger.Warn().LogErrorf("not handling received ACH file: %v", err) - return nil } - logger.Log("begin handling of received ACH file") + acceptanceData.Hostname, _ = os.Hostname() - err = agg.acceptFile(ctx, file) - if err != nil { - return logger.Error().LogErrorf("problem accepting file under shardName=%s", agg.shard.Name).Err() + // TODO(adam): add defer here instead? errgroup.Group instead? + + fileRecordErr := fr.fileRepository.Record(ctx, acceptanceData) + if fileRecordErr != nil { + if database.UniqueViolation(err) { + logger.Debug().Log("already handled file -- skipping") + return nil + } + return logger.Error().LogErrorf("not handling received ACH file: %v", fileRecordErr).Err() + } + + acceptFileErr := agg.acceptFile(ctx, file) + if acceptFileErr != nil { + // Delete the record from files table + deleteErr := fr.fileRepository.Cleanup(ctx, acceptanceData) + if deleteErr != nil { + logger.Error().LogErrorf("unable to cleanup files table: %v", err) + } + return logger.Error().LogErrorf("problem accepting file: %v", err).Err() } // Record the file as accepted pendingFiles.With("shard", agg.shard.Name).Add(1) - logger.Log("finished handling ACH file") + + logger.Log("accepted ACH file") return nil } diff --git a/internal/pipeline/file_receiver_test.go b/internal/pipeline/file_receiver_test.go index 3e89527..f1648b1 100644 --- a/internal/pipeline/file_receiver_test.go +++ b/internal/pipeline/file_receiver_test.go @@ -229,3 +229,36 @@ func TestFileReceiver__contains(t *testing.T) { require.False(t, contains(err, "connect: ")) require.False(t, contains(err, "EOF")) } + +func TestFileReceiver_AcceptFileErr(t *testing.T) { + fr := testFileReceiver(t) + + m, ok := fr.shardAggregators["testing"].merger.(*filesystemMerging) + require.True(t, ok) + + ms := &MockStorage{ + WriteFileErr: errors.New("bad thing"), + } + m.storage = ms + + // queue a file + file, err := ach.ReadFile(filepath.Join("..", "..", "testdata", "ppd-debit.ach")) + require.NoError(t, err) + + bs, err := compliance.Protect(nil, models.Event{ + Event: models.QueueACHFile{ + FileID: base.ID(), + ShardKey: "testing", + File: file, + }, + }) + require.NoError(t, err) + + err = fr.Publisher.Send(context.Background(), &pubsub.Message{ + Body: bs, + }) + require.NoError(t, err) + + // We should see an error, but can clear ms.WriteFileErr and retry + // TODO(adam): +} diff --git a/internal/storage/mock.go b/internal/storage/mock.go new file mode 100644 index 0000000..812eabc --- /dev/null +++ b/internal/storage/mock.go @@ -0,0 +1,78 @@ +package storage + +import ( + "io/fs" +) + +type MockStorage struct { + Chest + + OpenErr error + GlobErr error + + ReadDirErr error + + ReplaceFileErr error + ReplaceDirErr error + + MkdirAllErr error + RmdirAllErr error + + WriteFileErr error +} + +func (m *MockStorage) Open(path string) (fs.File, error) { + if m.OpenErr != nil { + return nil, m.OpenErr + } + return m.Chest.Open(path) +} + +func (m *MockStorage) Glob(pattern string) ([]FileStat, error) { + if m.GlobErr != nil { + return nil, m.GlobErr + } + return m.Chest.Glob(path) +} + +func (m *MockStorage) ReadDir(name string) ([]fs.DirEntry, error) { + if m.ReadDirErr != nil { + return nil, m.ReadDirErr + } + return m.Chest.ReadDir(path) +} + +func (m *MockStorage) ReplaceFile(oldpath, newpath string) error { + if m.ReplaceFileErr != nil { + return nil, m.ReplaceFileErr + } + return m.Chest.ReplaceFile(path) +} + +func (m *MockStorage) ReplaceDir(oldpath, newpath string) error { + if m.ReplaceDirErr != nil { + return nil, m.ReplaceDirErr + } + return m.Chest.ReplaceDir(path) +} + +func (m *MockStorage) MkdirAll(path string) error { + if m.MkdirErr != nil { + return nil, m.MkdirErr + } + return m.Chest.Mkdir(path) +} + +func (m *MockStorage) RmdirAll(path string) error { + if m.RmdirErr != nil { + return nil, m.RmdirErr + } + return m.Chest.Rmdir(path) +} + +func (m *MockStorage) WriteFile(path string, contents []byte) error { + if m.WriteFileErr != nil { + return nil, m.WriteFileErr + } + return m.Chest.WriteFile(path) +}