Skip to content

Commit 65b3bdc

Browse files
artembilangaryrussell
authored andcommitted
GH-3271: Close session on error in stream source
Fixes #3271 When exception happens at `.withPayload(session.readRaw(remotePath))` in the `AbstractRemoteFileStreamingMessageSource` we don't close session. The resource leaking happens in the caching session factory * Add `session.close();` into the `catch (IOException e) {` in the `AbstractRemoteFileStreamingMessageSource.doReceive()` to clean up resources properly **Cherry-pick to 5.2.x, 5.1.x & 4.3.x**
1 parent c931c2b commit 65b3bdc

File tree

2 files changed

+146
-0
lines changed

2 files changed

+146
-0
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ protected Object doReceive() {
194194
this.fileInfoJson ? file.toJson() : file);
195195
}
196196
catch (IOException e) {
197+
session.close();
197198
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
198199
}
199200
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2015-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.file.remote;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.mockito.ArgumentMatchers.anyString;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import java.io.IOException;
26+
import java.io.UncheckedIOException;
27+
import java.util.Collection;
28+
import java.util.Comparator;
29+
import java.util.List;
30+
import java.util.stream.Collectors;
31+
32+
import org.junit.jupiter.api.Test;
33+
34+
import org.springframework.beans.factory.BeanFactory;
35+
import org.springframework.integration.file.remote.session.CachingSessionFactory;
36+
import org.springframework.integration.file.remote.session.Session;
37+
import org.springframework.integration.file.remote.session.SessionFactory;
38+
39+
/**
40+
* @author Lukas Gemela
41+
* @author Artem Bilan
42+
*
43+
* @since 5.2.2
44+
*
45+
*/
46+
public class RemoteFileStreamingMessageSourceTests {
47+
48+
@Test
49+
@SuppressWarnings("unchecked")
50+
public void sessionReturnedToCacheProperlyOnDoReceive() throws IOException {
51+
Session<String> session = mock(Session.class);
52+
when(session.readRaw(anyString())).thenThrow(IOException.class);
53+
when(session.list("remoteDirectory")).thenReturn(new String[] { "file1" });
54+
55+
SessionFactory<String> sessionFactory = mock(SessionFactory.class);
56+
when(sessionFactory.getSession()).thenReturn(session);
57+
58+
CachingSessionFactory<String> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
59+
RemoteFileTemplate<String> remoteFileTemplate = new RemoteFileTemplate<>(cachingSessionFactory);
60+
61+
TestRemoteFileStreamingMessageSource testRemoteFileStreamingMessageSource =
62+
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, null);
63+
64+
testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
65+
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
66+
testRemoteFileStreamingMessageSource.start();
67+
68+
assertThatExceptionOfType(UncheckedIOException.class)
69+
.isThrownBy(testRemoteFileStreamingMessageSource::doReceive);
70+
71+
assertThat(cachingSessionFactory.getSession()).isNotNull();
72+
}
73+
74+
static class TestRemoteFileStreamingMessageSource extends AbstractRemoteFileStreamingMessageSource<String> {
75+
76+
TestRemoteFileStreamingMessageSource(RemoteFileTemplate<String> template, Comparator<String> comparator) {
77+
super(template, comparator);
78+
}
79+
80+
@Override
81+
protected List<AbstractFileInfo<String>> asFileInfoList(Collection<String> files) {
82+
return files
83+
.stream()
84+
.map(TestFileInfo::new)
85+
.collect(Collectors.toList());
86+
}
87+
88+
@Override
89+
protected boolean isDirectory(String file) {
90+
return false;
91+
}
92+
93+
@Override
94+
public String getComponentType() {
95+
return null;
96+
}
97+
98+
}
99+
100+
static class TestFileInfo extends AbstractFileInfo<String> {
101+
102+
TestFileInfo(String fileName) {
103+
this.fileName = fileName;
104+
}
105+
106+
private final String fileName;
107+
108+
@Override
109+
public boolean isDirectory() {
110+
return false;
111+
}
112+
113+
@Override
114+
public boolean isLink() {
115+
return false;
116+
}
117+
118+
@Override
119+
public long getSize() {
120+
return 0;
121+
}
122+
123+
@Override
124+
public long getModified() {
125+
return 0;
126+
}
127+
128+
@Override
129+
public String getFilename() {
130+
return fileName;
131+
}
132+
133+
@Override
134+
public String getPermissions() {
135+
return null;
136+
}
137+
138+
@Override
139+
public String getFileInfo() {
140+
return null;
141+
}
142+
143+
}
144+
145+
}

0 commit comments

Comments
 (0)