Skip to content

Commit 5d33d9c

Browse files
committed
pipeline: record more information when canceling file
1 parent 0f32e91 commit 5d33d9c

File tree

3 files changed

+190
-7
lines changed

3 files changed

+190
-7
lines changed

internal/pipeline/merging.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"encoding/hex"
2525
"encoding/json"
2626
"fmt"
27+
"io/fs"
2728
"path/filepath"
2829
"slices"
2930
"strings"
@@ -131,23 +132,50 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
131132
}
132133

133134
func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) {
135+
_, span := telemetry.StartSpan(ctx, "handle-cancel", trace.WithAttributes(
136+
attribute.String("achgateway.file_id", cancel.FileID),
137+
attribute.String("achgateway.shard", m.shard.Name),
138+
attribute.String("achgateway.shard_key", cancel.ShardKey),
139+
))
140+
defer span.End()
141+
134142
fileID := strings.TrimSuffix(cancel.FileID, ".ach")
135143
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", fileID))
136144

137145
// Check if the file exists already
138-
file, _ := m.storage.Open(path)
139-
if file != nil {
140-
defer file.Close()
146+
originalFile, _ := m.storage.Open(path)
147+
if originalFile != nil {
148+
defer originalFile.Close()
149+
}
150+
151+
// Check if the canceled file exists already
152+
var canceledFile fs.File
153+
if originalFile == nil {
154+
canceledFile, _ = m.storage.Open(path + ".canceled")
155+
if canceledFile != nil {
156+
defer canceledFile.Close()
157+
}
141158
}
142159

143160
// Write the canceled File
144161
err := m.storage.ReplaceFile(path, path+".canceled")
145162
if err != nil {
146-
telemetry.RecordError(ctx, err)
163+
span.RecordError(err)
147164
}
148165

149-
// File was found and we didn't error during the rename
150-
var successful bool = file != nil && err == nil
166+
originalFileWasFound := originalFile != nil
167+
canceledFileWasFound := canceledFile != nil
168+
successfulReplace := err == nil
169+
170+
span.SetAttributes(
171+
attribute.Bool("achgateway.canceled_file_found", canceledFileWasFound),
172+
attribute.Bool("achgateway.cancel_replacement_written", successfulReplace),
173+
attribute.Bool("achgateway.original_file_found", originalFileWasFound),
174+
attribute.String("achgateway.path", path),
175+
)
176+
177+
// We need a file to be found and we no errors during the rename
178+
var successful bool = (originalFileWasFound || canceledFileWasFound) && successfulReplace
151179

152180
out := incoming.FileCancellationResponse{
153181
FileID: cancel.FileID,

internal/pipeline/merging_test.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"os"
2828
"path/filepath"
2929
"testing"
30+
"time"
3031

3132
"github.com/moov-io/ach"
3233
"github.com/moov-io/achgateway/internal/incoming"
@@ -184,9 +185,19 @@ func TestMerging_mappings(t *testing.T) {
184185
require.True(t, ok)
185186

186187
// Write an invalid ACH file that we cancel (serves both to check that we don't open it and that it's not merged)
187-
err = m.storage.WriteFile(filepath.Join(dir, "mergable", "SD-testing", "foo2.ach"), nil)
188+
foo2Path := filepath.Join("mergable", "SD-testing", "foo2.ach")
189+
err = m.storage.WriteFile(foo2Path, nil)
188190
require.NoError(t, err)
189191

192+
// Verify file is written
193+
require.Eventually(t, func() bool {
194+
fd, err := m.storage.Open(foo2Path)
195+
if fd != nil {
196+
defer fd.Close()
197+
}
198+
return err == nil
199+
}, 10*time.Second, 500*time.Millisecond)
200+
190201
cancelResponse, err := merging.HandleCancel(context.Background(), incoming.CancelACHFile{
191202
FileID: "foo2.ach",
192203
ShardKey: "SD-testing",

internal/test/cancel_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to The Moov Authors under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. The Moov Authors licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package test
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"net/http"
25+
"net/http/httptest"
26+
"os"
27+
"path/filepath"
28+
"slices"
29+
"testing"
30+
"time"
31+
32+
"github.com/moov-io/achgateway/internal/files"
33+
"github.com/moov-io/achgateway/internal/incoming/stream/streamtest"
34+
"github.com/moov-io/achgateway/internal/incoming/web"
35+
"github.com/moov-io/achgateway/internal/pipeline"
36+
"github.com/moov-io/achgateway/internal/service"
37+
"github.com/moov-io/achgateway/internal/shards"
38+
"github.com/moov-io/achgateway/internal/storage"
39+
"github.com/moov-io/achgateway/pkg/models"
40+
"github.com/moov-io/base"
41+
"github.com/moov-io/base/database"
42+
"github.com/moov-io/base/log"
43+
44+
"github.com/gorilla/mux"
45+
"github.com/stretchr/testify/require"
46+
)
47+
48+
func TestCancelFileAPI(t *testing.T) {
49+
ctx := context.Background()
50+
logger := log.NewTestLogger()
51+
52+
dir := t.TempDir()
53+
conf := &service.Config{
54+
Sharding: service.Sharding{
55+
Shards: []service.Shard{
56+
{
57+
Name: "testing",
58+
Cutoffs: service.Cutoffs{
59+
Timezone: "America/New_York",
60+
Windows: []string{"12:00"},
61+
},
62+
UploadAgent: "mock",
63+
},
64+
},
65+
Default: "testing",
66+
},
67+
Upload: service.UploadAgents{
68+
Agents: []service.UploadAgent{
69+
{
70+
Mock: &service.MockAgent{},
71+
},
72+
},
73+
Merging: service.Merging{
74+
Storage: storage.Config{
75+
Filesystem: storage.FilesystemConfig{
76+
Directory: dir,
77+
},
78+
},
79+
},
80+
},
81+
}
82+
83+
httpFilesTopic, httpFilesSub := streamtest.InmemStream(t)
84+
85+
shardRepo := shards.NewInMemoryRepository()
86+
shardRepo.Add(service.ShardMapping{ShardKey: "testing", ShardName: "testing"}, database.NopInTx)
87+
88+
fileRepo := &files.MockRepository{}
89+
90+
fileReceiver, err := pipeline.Start(ctx, logger, conf, shardRepo, fileRepo, httpFilesSub)
91+
require.NoError(t, err)
92+
93+
controller := web.NewFilesController(logger, service.HTTPConfig{}, httpFilesTopic, fileReceiver.CancellationResponses)
94+
r := mux.NewRouter()
95+
controller.AppendRoutes(r)
96+
97+
// Accept file from stream
98+
fileID := base.ID()
99+
100+
fd, err := os.Open(filepath.Join("..", "..", "testdata", "ppd-debit.ach"))
101+
require.NoError(t, err)
102+
t.Cleanup(func() { fd.Close() })
103+
104+
req := httptest.NewRequest("POST", fmt.Sprintf("/shards/SD-testing/files/%s.ach", fileID), fd)
105+
w := httptest.NewRecorder()
106+
r.ServeHTTP(w, req)
107+
require.Equal(t, http.StatusOK, w.Code)
108+
109+
// Verify the file is written
110+
where := filepath.Join(dir, "mergable", "testing", "*.ach")
111+
require.Eventually(t, func() bool {
112+
filenames, err := filepath.Glob(where)
113+
require.NoError(t, err)
114+
115+
parent, _ := filepath.Split(where)
116+
return slices.Contains(filenames, filepath.Join(parent, fmt.Sprintf("%s.ach", fileID)))
117+
}, 10*time.Second, 1*time.Second)
118+
119+
// Now cancel that file
120+
req = httptest.NewRequest("DELETE", fmt.Sprintf("/shards/SD-testing/files/%s.ach", fileID), nil)
121+
w = httptest.NewRecorder()
122+
r.ServeHTTP(w, req)
123+
require.Equal(t, http.StatusOK, w.Code)
124+
125+
var response models.FileCancellationResponse
126+
err = json.NewDecoder(w.Body).Decode(&response)
127+
require.NoError(t, err)
128+
require.Equal(t, fileID, response.FileID)
129+
require.Equal(t, "SD-testing", response.ShardKey)
130+
require.True(t, response.Successful)
131+
132+
// Cancel it again (which should be successful)
133+
req = httptest.NewRequest("DELETE", fmt.Sprintf("/shards/SD-testing/files/%s.ach", fileID), nil)
134+
w = httptest.NewRecorder()
135+
r.ServeHTTP(w, req)
136+
require.Equal(t, http.StatusOK, w.Code)
137+
138+
var response2 models.FileCancellationResponse
139+
err = json.NewDecoder(w.Body).Decode(&response2)
140+
require.NoError(t, err)
141+
require.Equal(t, fileID, response2.FileID)
142+
require.Equal(t, "SD-testing", response2.ShardKey)
143+
require.True(t, response2.Successful)
144+
}

0 commit comments

Comments
 (0)