2626import java .io .InputStream ;
2727import java .util .Arrays ;
2828import java .util .Comparator ;
29+ import java .util .concurrent .ConcurrentHashMap ;
30+ import java .util .concurrent .ConcurrentMap ;
2931
3032import org .junit .Test ;
3133import org .junit .runner .RunWith ;
4345import org .springframework .integration .file .FileHeaders ;
4446import org .springframework .integration .file .filters .AcceptAllFileListFilter ;
4547import org .springframework .integration .file .remote .session .SessionFactory ;
48+ import org .springframework .integration .metadata .SimpleMetadataStore ;
4649import org .springframework .integration .scheduling .PollerMetadata ;
4750import org .springframework .integration .sftp .SftpTestSupport ;
51+ import org .springframework .integration .sftp .filters .SftpPersistentAcceptOnceFileListFilter ;
4852import org .springframework .integration .sftp .session .SftpFileInfo ;
4953import org .springframework .integration .sftp .session .SftpRemoteFileTemplate ;
5054import org .springframework .integration .transformer .StreamTransformer ;
6367 *
6468 */
6569@ RunWith (SpringRunner .class )
66- @ DirtiesContext
70+ @ DirtiesContext ( classMode = DirtiesContext . ClassMode . AFTER_EACH_TEST_METHOD )
6771public class SftpStreamingMessageSourceTests extends SftpTestSupport {
6872
6973 @ Autowired
@@ -81,6 +85,9 @@ public class SftpStreamingMessageSourceTests extends SftpTestSupport {
8185 @ Autowired
8286 private ApplicationContext context ;
8387
88+ @ Autowired
89+ private ConcurrentMap <String , String > metadataMap ;
90+
8491 @ SuppressWarnings ("unchecked" )
8592 @ Test
8693 public void testAllContents () {
@@ -111,6 +118,7 @@ public void testAllContents() {
111118 this .adapter .stop ();
112119 this .source .setFileInfoJson (false );
113120 this .data .purge (null );
121+ this .metadataMap .clear ();
114122 this .adapter .start ();
115123 received = (Message <byte []>) this .data .receive (10000 );
116124 assertNotNull (received );
@@ -186,12 +194,20 @@ public PollerMetadata defaultPoller() {
186194 return pollerMetadata ;
187195 }
188196
197+ @ Bean
198+ public ConcurrentMap <String , String > metadataMap () {
199+ return new ConcurrentHashMap <>();
200+ }
201+
202+
189203 @ Bean
190204 @ InboundChannelAdapter (channel = "stream" , autoStartup = "false" )
191205 public MessageSource <InputStream > sftpMessageSource () {
192206 SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource (template (),
193207 Comparator .comparing (LsEntry ::getFilename ));
194- messageSource .setFilter (new AcceptAllFileListFilter <>());
208+ messageSource .setFilter (
209+ new SftpPersistentAcceptOnceFileListFilter (
210+ new SimpleMetadataStore (metadataMap ()), "testStreaming" ));
195211 messageSource .setRemoteDirectory ("sftpSource/" );
196212 return messageSource ;
197213 }
0 commit comments