Skip to content

Commit 46cdc34

Browse files
author
mgeipel
committed
Addressed #24
by letting DefaultSender ignore multiple invocations of closeStream().
1 parent 90fd52e commit 46cdc34

File tree

2 files changed

+172
-92
lines changed

2 files changed

+172
-92
lines changed

src/main/java/org/culturegraph/mf/framework/DefaultSender.java

Lines changed: 101 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -13,96 +13,105 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package org.culturegraph.mf.framework;
17-
18-
/**
19-
* Default implementation for {@link Sender}s that simply
20-
* stores a reference to the receiver and implements the
21-
* correct behaviour required by the LifeCycle interface.
22-
*
23-
* @param <T> receiver base type of the downstream module
24-
*
25-
* @see DefaultStreamPipe
26-
* @see DefaultObjectPipe
27-
* @see DefaultXmlPipe
28-
*
29-
* @author Christoph Böhme
30-
*
31-
*/
32-
public class DefaultSender<T extends LifeCycle> implements Sender<T> {
33-
16+
package org.culturegraph.mf.framework;
17+
18+
/**
19+
* Default implementation for {@link Sender}s that simply stores a reference to
20+
* the receiver and implements the correct behaviour required by the LifeCycle
21+
* interface.
22+
*
23+
* @param <T>
24+
* receiver base type of the downstream module
25+
*
26+
* @see DefaultStreamPipe
27+
* @see DefaultObjectPipe
28+
* @see DefaultXmlPipe
29+
*
30+
* @author Christoph Böhme
31+
*
32+
*/
33+
public class DefaultSender<T extends LifeCycle> implements Sender<T> {
34+
3435
private T receiver;
35-
36-
37-
@Override
38-
public final <R extends T> R setReceiver(final R receiver) {
39-
this.receiver = receiver;
40-
onSetReceiver();
41-
return receiver;
42-
}
43-
44-
@Override
45-
public final void resetStream() {
46-
onResetStream();
47-
if (receiver != null) {
48-
receiver.resetStream();
49-
}
50-
}
51-
52-
@Override
53-
public final void closeStream() {
54-
onCloseStream();
55-
if (receiver != null) {
56-
receiver.closeStream();
57-
}
58-
}
59-
60-
/**
61-
* Invoked when the sender is connected with a receiver.
62-
* This method is called after the receiver has been updated. Hence,
63-
* {@code getReceiver} will return a reference to the new receiver.
64-
*/
65-
protected void onSetReceiver() {
66-
// Default implementation does nothing
67-
}
68-
69-
/**
70-
* Invoked when the {@code resetStream()} method is called.
71-
* Override this method to perform a reset of the module.
72-
*
73-
* Do not call the {@code resetStream()} method of the next module downstream.
74-
* This is handled by the implementation of {@code resetStream()} in
75-
* {@code DefaultSender}.
76-
*
77-
* {@code onResetStream()} is called before {@code DefaultSender} calls the
78-
* {@code resetStream()} method of the downstream module.
79-
*/
80-
protected void onResetStream() {
81-
// Default implementation does nothing
82-
}
83-
84-
/**
85-
* Invoked when the {@code closeStream()} method is called. Override
86-
* this method to close any resources used by the module.
87-
*
88-
* Do not call the {@code closeStream()} method of the next module
89-
* downstream. This is handled by the implementation of
90-
* {@code closeStream()} in {@code DefaultSender}.
91-
*
92-
* {@code onCloseStream()} is called before {@code DefaultSender} calls
93-
* the {@code closeStream()} method of the downstream module.
94-
*/
95-
protected void onCloseStream() {
96-
// Default implementation does nothing
97-
}
98-
99-
/**
100-
* Returns a reference to the downstream module.
101-
*
102-
* @return reference to the downstream module
103-
*/
104-
protected final T getReceiver() {
105-
return receiver;
106-
}
107-
108-
}
36+
private boolean isClosed;
37+
38+
public boolean isClosed() {
39+
return isClosed;
40+
}
41+
42+
@Override
43+
public final <R extends T> R setReceiver(final R receiver) {
44+
this.receiver = receiver;
45+
onSetReceiver();
46+
return receiver;
47+
}
48+
49+
@Override
50+
public final void resetStream() {
51+
onResetStream();
52+
if (receiver != null) {
53+
receiver.resetStream();
54+
}
55+
isClosed = false;
56+
}
57+
58+
@Override
59+
public final void closeStream() {
60+
if (!isClosed) {
61+
onCloseStream();
62+
if (receiver != null) {
63+
receiver.closeStream();
64+
}
65+
}
66+
isClosed = true;
67+
}
68+
69+
/**
70+
* Invoked when the sender is connected with a receiver. This method is
71+
* called after the receiver has been updated. Hence, {@code getReceiver}
72+
* will return a reference to the new receiver.
73+
*/
74+
protected void onSetReceiver() {
75+
// Default implementation does nothing
76+
}
77+
78+
/**
79+
* Invoked when the {@code resetStream()} method is called. Override this
80+
* method to perform a reset of the module.
81+
*
82+
* Do not call the {@code resetStream()} method of the next module
83+
* downstream. This is handled by the implementation of
84+
* {@code resetStream()} in {@code DefaultSender}.
85+
*
86+
* {@code onResetStream()} is called before {@code DefaultSender} calls the
87+
* {@code resetStream()} method of the downstream module.
88+
*/
89+
protected void onResetStream() {
90+
// Default implementation does nothing
91+
}
92+
93+
/**
94+
* Invoked when the {@code closeStream()} method is called. Override this
95+
* method to close any resources used by the module.
96+
*
97+
* Do not call the {@code closeStream()} method of the next module
98+
* downstream. This is handled by the implementation of
99+
* {@code closeStream()} in {@code DefaultSender}.
100+
*
101+
* {@code onCloseStream()} is called before {@code DefaultSender} calls the
102+
* {@code closeStream()} method of the downstream module.
103+
*/
104+
protected void onCloseStream() {
105+
// Default implementation does nothing
106+
}
107+
108+
/**
109+
* Returns a reference to the downstream module.
110+
*
111+
* @return reference to the downstream module
112+
*/
113+
protected final T getReceiver() {
114+
return receiver;
115+
}
116+
117+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2013 Deutsche Nationalbibliothek
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.culturegraph.mf.framework;
17+
18+
19+
20+
import org.junit.Assert;
21+
import org.junit.Test;
22+
23+
24+
/**
25+
* Tests Contract of {@link DefaultSender}.
26+
* @author Markus M Geipel
27+
*
28+
*/
29+
public final class DefaultSenderTest {
30+
31+
/**
32+
* onCloseStream() must be called only once even if closeSteam is called several times
33+
*/
34+
@Test
35+
public void testMultipleCloseStreamInvocations() {
36+
final CloseCounter closeCounter = new CloseCounter();
37+
38+
Assert.assertEquals(0, closeCounter.getCount());
39+
Assert.assertFalse(closeCounter.isClosed());
40+
41+
closeCounter.closeStream();
42+
Assert.assertEquals(1, closeCounter.getCount());
43+
Assert.assertTrue(closeCounter.isClosed());
44+
45+
closeCounter.closeStream();
46+
Assert.assertEquals(1, closeCounter.getCount());
47+
Assert.assertTrue(closeCounter.isClosed());
48+
49+
closeCounter.closeStream();
50+
Assert.assertEquals(1, closeCounter.getCount());
51+
Assert.assertTrue(closeCounter.isClosed());
52+
53+
}
54+
55+
/**
56+
* counts invocation of onCloseStream()
57+
*/
58+
protected static final class CloseCounter extends DefaultSender<ObjectReceiver<Object>>{
59+
private int count;
60+
61+
@Override
62+
protected void onCloseStream() {
63+
++count;
64+
}
65+
66+
public int getCount() {
67+
return count;
68+
}
69+
}
70+
71+
}

0 commit comments

Comments
 (0)