Skip to content

Commit 9cd8a4a

Browse files
garyrussellartembilan
authored andcommitted
GH-3315: Fix (S)FTP Stream with Fair Rotation
Resolves #3315 `maxFetchSize` ignored with filters supporting single file filtering (default). This breaks "fair" rotation with the `RotatingServerAdvice`. Honor `maxFetchSize`, even with filters that support single file filtering. **cherry-pick to 5.3.x, 5.2.x**
1 parent 434817f commit 9cd8a4a

File tree

3 files changed

+131
-15
lines changed

3 files changed

+131
-15
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.BlockingQueue;
2828
import java.util.concurrent.LinkedBlockingQueue;
2929
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
3031

3132
import org.springframework.context.Lifecycle;
3233
import org.springframework.expression.Expression;
@@ -65,6 +66,8 @@ public abstract class AbstractRemoteFileStreamingMessageSource<F>
6566

6667
private final AtomicBoolean running = new AtomicBoolean();
6768

69+
private final AtomicInteger fetched = new AtomicInteger();
70+
6871
private boolean fileInfoJson = true;
6972

7073
/**
@@ -183,12 +186,11 @@ public boolean isRunning() {
183186

184187
@Override
185188
protected Object doReceive(int maxFetchSize) {
186-
return doReceive();
187-
}
188-
189-
@Override
190-
protected Object doReceive() {
191189
Assert.state(this.running.get(), () -> getComponentName() + " is not running");
190+
if (maxFetchSize > 0 && this.fetched.get() >= maxFetchSize) {
191+
this.toBeReceived.clear();
192+
this.fetched.set(0);
193+
}
192194
AbstractFileInfo<F> file = poll();
193195
while (file != null) {
194196
if (this.filter != null && this.filter.supportsSingleFileFiltering()
@@ -205,6 +207,9 @@ protected Object doReceive() {
205207
try {
206208
String remotePath = remotePath(file);
207209
Session<?> session = this.remoteFileTemplate.getSession();
210+
if (maxFetchSize > 0) {
211+
this.fetched.incrementAndGet();
212+
}
208213
try {
209214
return getMessageBuilderFactory()
210215
.withPayload(session.readRaw(remotePath))

spring-integration-file/src/test/java/org/springframework/integration/file/remote/RemoteFileStreamingMessageSourceTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.ArgumentMatchers.anyInt;
2122
import static org.mockito.ArgumentMatchers.anyString;
2223
import static org.mockito.Mockito.mock;
2324
import static org.mockito.Mockito.when;
@@ -70,7 +71,7 @@ public void filterOutFilesNotAcceptedByFilter() throws IOException {
7071
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
7172
testRemoteFileStreamingMessageSource.start();
7273

73-
assertThat(testRemoteFileStreamingMessageSource.doReceive()).isNull();
74+
assertThat(testRemoteFileStreamingMessageSource.doReceive(-1)).isNull();
7475
}
7576

7677
@Test
@@ -94,7 +95,7 @@ public void sessionReturnedToCacheProperlyOnDoReceive() throws IOException {
9495
testRemoteFileStreamingMessageSource.start();
9596

9697
assertThatExceptionOfType(UncheckedIOException.class)
97-
.isThrownBy(testRemoteFileStreamingMessageSource::doReceive);
98+
.isThrownBy(() -> testRemoteFileStreamingMessageSource.doReceive(anyInt()));
9899

99100
assertThat(cachingSessionFactory.getSession()).isNotNull();
100101
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java

Lines changed: 116 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@
3737
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3838
import org.springframework.context.annotation.Bean;
3939
import org.springframework.context.annotation.Configuration;
40+
import org.springframework.integration.StaticMessageHeaderAccessor;
4041
import org.springframework.integration.channel.QueueChannel;
4142
import org.springframework.integration.config.EnableIntegration;
4243
import org.springframework.integration.dsl.IntegrationFlow;
4344
import org.springframework.integration.dsl.IntegrationFlows;
4445
import org.springframework.integration.dsl.MessageChannels;
4546
import org.springframework.integration.dsl.Pollers;
46-
import org.springframework.integration.dsl.StandardIntegrationFlow;
47+
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
48+
import org.springframework.integration.file.FileHeaders;
4749
import org.springframework.integration.file.remote.aop.RotatingServerAdvice;
4850
import org.springframework.integration.file.remote.aop.RotationPolicy;
4951
import org.springframework.integration.file.remote.session.CachingSessionFactory;
@@ -58,6 +60,7 @@
5860
import org.springframework.integration.ftp.filters.FtpPersistentAcceptOnceFileListFilter;
5961
import org.springframework.integration.ftp.session.FtpRemoteFileTemplate;
6062
import org.springframework.integration.metadata.SimpleMetadataStore;
63+
import org.springframework.messaging.Message;
6164

6265
/**
6366
* @author Gary Russell
@@ -88,18 +91,49 @@ public static void setup() {
8891
});
8992
}
9093

94+
@BeforeEach
95+
public void extraSetup(TestInfo info) {
96+
if (info.getTestMethod().get().getName().equals("testFairStreaming")) {
97+
FtpRemoteFileTemplate rft = new FtpRemoteFileTemplate(sessionFactory());
98+
rft.execute(s -> {
99+
ByteArrayInputStream bais = new ByteArrayInputStream("foo".getBytes());
100+
// 2 files per server, remove empty dirs
101+
s.write(bais, "foo/f4");
102+
s.write(bais, "baz/f5");
103+
s.write(bais, "fiz/f6");
104+
s.rmdir("bar");
105+
s.rmdir("qux");
106+
s.rmdir("buz");
107+
return null;
108+
});
109+
}
110+
}
111+
91112
@BeforeEach
92113
@AfterEach
93114
public void clean(TestInfo info) {
94115
recursiveDelete(new File(tmpDir), info);
95116
}
96117

118+
@AfterEach
119+
public void extraCleanUp(TestInfo info) {
120+
if (info.getTestMethod().get().getName().equals("testFairStreaming")) {
121+
FtpRemoteFileTemplate rft = new FtpRemoteFileTemplate(sessionFactory());
122+
rft.execute(s -> {
123+
s.remove("foo/f4");
124+
s.remove("baz/f5");
125+
s.remove("fiz/f6");
126+
return null;
127+
});
128+
}
129+
}
130+
97131
@Test
98132
public void testStandard() throws Exception {
99133
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(StandardConfig.class);
100134
StandardConfig config = ctx.getBean(StandardConfig.class);
101-
ctx.getBean(StandardIntegrationFlow.class).stop();
102135
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
136+
ctx.getBean(SourcePollingChannelAdapter.class).stop();
103137
List<Integer> sfCalls = config.sessionSources.stream().limit(17).collect(Collectors.toList());
104138
assertThat(sfCalls).containsExactly(1, 1, 1, 2, 2, 2, 3, 3, 3, 1, 1, 2, 2, 3, 3, 1, 1);
105139
File f1 = new File(tmpDir + File.separator + "standard" + File.separator + "f1");
@@ -116,8 +150,8 @@ public void testStandard() throws Exception {
116150
public void testFair() throws Exception {
117151
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(FairConfig.class);
118152
StandardConfig config = ctx.getBean(StandardConfig.class);
119-
ctx.getBean(StandardIntegrationFlow.class).stop();
120153
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
154+
ctx.getBean(SourcePollingChannelAdapter.class).stop();
121155
List<Integer> sfCalls = config.sessionSources.stream().limit(17).collect(Collectors.toList());
122156
assertThat(sfCalls).containsExactly(1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3);
123157
File f1 = new File(tmpDir + File.separator + "fair" + File.separator + "f1");
@@ -135,7 +169,7 @@ public void testVariableLocalDir() throws Exception {
135169
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(VariableLocalConfig.class);
136170
StandardConfig config = ctx.getBean(StandardConfig.class);
137171
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
138-
ctx.getBean(StandardIntegrationFlow.class).stop();
172+
ctx.getBean(SourcePollingChannelAdapter.class).stop();
139173
List<Integer> sfCalls = config.sessionSources.stream().limit(17).collect(Collectors.toList());
140174
assertThat(sfCalls).containsExactly(1, 1, 1, 2, 2, 2, 3, 3, 3, 1, 1, 2, 2, 3, 3, 1, 1);
141175
File f1 = new File(tmpDir + File.separator + "variable" + File.separator + "foo" + File.separator + "f1");
@@ -152,15 +186,59 @@ public void testVariableLocalDir() throws Exception {
152186
public void testStreaming() throws Exception {
153187
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(StreamingConfig.class);
154188
StandardConfig config = ctx.getBean(StandardConfig.class);
155-
ctx.getBean(StandardIntegrationFlow.class).stop();
156189
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
190+
ctx.getBean(SourcePollingChannelAdapter.class).stop();
157191
List<Integer> sfCalls = config.sessionSources.stream().limit(17).collect(Collectors.toList());
158192
// there's an extra getSession() with this adapter in listFiles
159193
assertThat(sfCalls).containsExactly(1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 1, 1, 2, 2, 3);
160-
assertThat(ctx.getBean("files", QueueChannel.class).getQueueSize()).isEqualTo(3);
194+
QueueChannel files = ctx.getBean("files", QueueChannel.class);
195+
assertThat(files.getQueueSize()).isEqualTo(3);
196+
Message<?> received = files.receive(0);
197+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
198+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f1");
199+
received = files.receive(0);
200+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
201+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f2");
202+
received = files.receive(0);
203+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
204+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f3");
161205
ctx.close();
162206
}
163207

208+
@Test
209+
public void testFairStreaming() throws Exception {
210+
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(FairStreamingConfig.class);
211+
try {
212+
StandardConfig config = ctx.getBean(StandardConfig.class);
213+
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
214+
ctx.getBean(SourcePollingChannelAdapter.class).stop();
215+
List<Integer> sfCalls = config.sessionSources.stream().limit(17).collect(Collectors.toList());
216+
assertThat(sfCalls).containsExactly(1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3, 1, 2, 3, 1, 2);
217+
QueueChannel files = ctx.getBean("files", QueueChannel.class);
218+
assertThat(files.getQueueSize()).isEqualTo(6);
219+
Message<?> received = files.receive(0);
220+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
221+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f1");
222+
received = files.receive(0);
223+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
224+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f2");
225+
received = files.receive(0);
226+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
227+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f3");
228+
received = files.receive(0);
229+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
230+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f4");
231+
received = files.receive(0);
232+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
233+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f5");
234+
received = files.receive(0);
235+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
236+
assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE, String.class)).isEqualTo("f6");
237+
}
238+
finally {
239+
ctx.close();
240+
}
241+
}
164242

165243
@Configuration
166244
@EnableIntegration
@@ -307,4 +385,36 @@ public IntegrationFlow flow() {
307385

308386
}
309387

388+
@Configuration
389+
public static class FairStreamingConfig extends StandardConfig {
390+
391+
@Override
392+
@Bean
393+
public RotatingServerAdvice advice() {
394+
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
395+
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
396+
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
397+
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
398+
return theAdvice(keyDirectories);
399+
}
400+
401+
@Override
402+
@Bean
403+
public IntegrationFlow flow() {
404+
return IntegrationFlows.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf()))
405+
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
406+
.remoteDirectory(".")
407+
.maxFetchSize(1),
408+
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
409+
.channel(MessageChannels.queue("files"))
410+
.get();
411+
}
412+
413+
@Override
414+
protected RotatingServerAdvice theAdvice(List<RotationPolicy.KeyDirectory> keyDirectories) {
415+
return new RotatingServerAdvice(sf(), keyDirectories, true);
416+
}
417+
418+
}
419+
310420
}

0 commit comments

Comments
 (0)