Skip to content

Commit 5a8e280

Browse files
committed
test: remove ReplaceStreamFiles
1 parent 88d87e1 commit 5a8e280

File tree

2 files changed

+1
-43
lines changed

2 files changed

+1
-43
lines changed

internal/pipeline/file_receiver.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -119,24 +119,6 @@ func (fr *FileReceiver) reconnect() error {
119119
return nil
120120
}
121121

122-
func (fr *FileReceiver) ReplaceStreamFiles(ctx context.Context, sub stream.Subscription) {
123-
fr.mu.Lock()
124-
defer fr.mu.Unlock()
125-
126-
fr.logger.Info().Log("replacing stream subscription")
127-
128-
// Shut down receiving messages
129-
fr.cancel()
130-
131-
// Close an existing stream subscription
132-
if fr.streamFiles != nil {
133-
fr.streamFiles.Shutdown(context.Background())
134-
}
135-
fr.streamFiles = sub
136-
137-
go fr.Start(ctx)
138-
}
139-
140122
func (fr *FileReceiver) Start(ctx context.Context) {
141123
for {
142124
// Create a context that will be shutdown by its parent or after a read iteration

internal/test/upload_test.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -171,15 +171,9 @@ func TestUploads(t *testing.T) {
171171
adminServer := admintest.Server(t)
172172
fileReceiver.RegisterAdminRoutes(adminServer)
173173

174-
// Force the stream subscription to fail
175-
flakeySub := streamtest.FailingSubscription(errors.New("write: broken pipe"))
176-
fileReceiver.ReplaceStreamFiles(context.Background(), flakeySub)
177-
require.Contains(t, fmt.Sprintf("%#v", fileReceiver), "streamFiles:(*streamtest.FailedSubscription)")
178-
179174
// Upload our files
180175
createdEntries := 0
181176
canceledEntries := 0
182-
erroredSubscriptions := 0
183177
var createdFileIDs, canceledFileIDs []string
184178

185179
iterations := 500
@@ -205,13 +199,6 @@ func TestUploads(t *testing.T) {
205199
} else {
206200
createdFileIDs = append(createdFileIDs, fileID)
207201
}
208-
209-
// Force the subscription to fail sometimes
210-
if err := causeSubscriptionFailure(t); err != nil {
211-
flakeySub := streamtest.FailingSubscription(err)
212-
fileReceiver.ReplaceStreamFiles(context.Background(), flakeySub)
213-
erroredSubscriptions += 1
214-
}
215202
}
216203
require.NoError(t, g.Wait())
217204

@@ -274,7 +261,7 @@ func TestUploads(t *testing.T) {
274261
expected := createdEntries - canceledEntries
275262
found := countAllEntries(uploadedFiles)
276263

277-
t.Logf("found %d entries of %d expected (%d canceled) (%d errored) from %d uploaded files", found, expected, canceledEntries, erroredSubscriptions, len(uploadedFiles))
264+
t.Logf("found %d entries of %d expected (%d canceled) from %d uploaded files", found, expected, canceledEntries, len(uploadedFiles))
278265

279266
return expected == found
280267
}, wait, tick)
@@ -463,17 +450,6 @@ var subscriptionFailures = []error{
463450
errors.New("contains: pubsub error"),
464451
}
465452

466-
func causeSubscriptionFailure(t *testing.T) error {
467-
t.Helper()
468-
469-
n := randomInt(t, 100)
470-
if n <= 5 { // 5%
471-
idx := (len(subscriptionFailures) - 1) % (int(n) + 1)
472-
return subscriptionFailures[idx]
473-
}
474-
return nil
475-
}
476-
477453
func firstDirectory(t *testing.T, fsys fs.FS, prefix string) string {
478454
t.Helper()
479455

0 commit comments

Comments
 (0)