Skip to content

Commit 82bb065

Browse files
committed
Fix #191: only emit endRecord() after triple was received
Only emit `endRecord()` in `resetStream()` and `closeStream()` if a triple was received and a `startRecord()` event emitted.
1 parent 20d21e1 commit 82bb065

File tree

2 files changed

+60
-26
lines changed

2 files changed

+60
-26
lines changed

src/main/java/org/culturegraph/mf/stream/pipe/sort/TripleCollect.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013, 2014 Deutsche Nationalbibliothek
2+
* Copyright 2013, 2014, 2016 Deutsche Nationalbibliothek
33
*
44
* Licensed under the Apache License, Version 2.0 the "License";
55
* you may not use this file except in compliance with the License.
@@ -27,17 +27,18 @@
2727

2828
/**
2929
* Collects named values to form records.
30-
*
30+
*
3131
* @author markus geipel
32-
*
32+
*
3333
*/
3434
@Description("Collects named values to form records. The name becomes the id, the value is split by 'separator' into name and value")
3535
@In(Triple.class)
3636
@Out(StreamReceiver.class)
3737
public final class TripleCollect extends DefaultObjectPipe<Triple, StreamReceiver> {
38+
3839
private final FormetaParser parser = new FormetaParser();
3940
private final PartialRecordEmitter emitter = new PartialRecordEmitter();
40-
41+
4142
private String currentSubject;
4243

4344
public TripleCollect() {
@@ -50,7 +51,7 @@ public void process(final Triple triple) {
5051
currentSubject = triple.getSubject();
5152
getReceiver().startRecord(currentSubject);
5253
}
53-
54+
5455
if (currentSubject.equals(triple.getSubject())) {
5556
decodeTriple(triple);
5657
} else {
@@ -74,16 +75,20 @@ public void decodeTriple(final Triple triple) {
7475

7576
@Override
7677
protected void onResetStream() {
77-
currentSubject = null;
78-
getReceiver().endRecord();
78+
if (currentSubject != null) {
79+
currentSubject = null;
80+
getReceiver().endRecord();
81+
}
7982
}
8083

8184
@Override
8285
protected void onCloseStream() {
83-
currentSubject = null;
84-
getReceiver().endRecord();
86+
if (currentSubject != null) {
87+
currentSubject = null;
88+
getReceiver().endRecord();
89+
}
8590
}
86-
91+
8792
@Override
8893
protected void onSetReceiver() {
8994
emitter.setReceiver(getReceiver());
Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013, 2014 Deutsche Nationalbibliothek
2+
* Copyright 2013, 2014, 2016 Deutsche Nationalbibliothek
33
*
44
* Licensed under the Apache License, Version 2.0 the "License";
55
* you may not use this file except in compliance with the License.
@@ -15,21 +15,28 @@
1515
*/
1616
package org.culturegraph.mf.stream.pipe.sort;
1717

18+
import static org.mockito.Mockito.never;
19+
import static org.mockito.Mockito.verify;
20+
1821
import org.culturegraph.mf.formeta.Formeta;
1922
import org.culturegraph.mf.framework.StreamReceiver;
2023
import org.culturegraph.mf.types.Triple;
2124
import org.culturegraph.mf.types.Triple.ObjectType;
25+
import org.junit.Before;
2226
import org.junit.Test;
2327
import org.mockito.InOrder;
28+
import org.mockito.Mock;
2429
import org.mockito.Mockito;
30+
import org.mockito.MockitoAnnotations;
2531

2632
/**
2733
* Tests {@link TripleCollect}
28-
*
34+
*
2935
* @author Markus Geipel
30-
*
36+
*
3137
*/
3238
public final class TripleCollectTest {
39+
3340
private static final String VALUE = "value";
3441
private static final String VALUE1 = "value1";
3542
private static final String VALUE2 = "value2";
@@ -38,45 +45,67 @@ public final class TripleCollectTest {
3845
private static final String REC_ID = "id";
3946
private static final String REC_ALT_ID = "altid";
4047

48+
@Mock
49+
private StreamReceiver receiver;
50+
51+
private TripleCollect collect;
52+
53+
@Before
54+
public void initMocks() {
55+
MockitoAnnotations.initMocks(this);
56+
collect = new TripleCollect();
57+
collect.setReceiver(receiver);
58+
}
59+
4160
@Test
4261
public void testShouldBuildRecords() {
43-
final StreamReceiver receiver = Mockito.mock(StreamReceiver.class);
44-
final TripleCollect collect = new TripleCollect();
45-
collect.setReceiver(receiver);
46-
4762
collect.process(new Triple(REC_ID, NAME, VALUE1));
4863
collect.process(new Triple(REC_ID, NAME, VALUE2));
4964
collect.process(new Triple(REC_ALT_ID, NAME, VALUE1));
50-
65+
collect.closeStream();
66+
5167
final InOrder ordered = Mockito.inOrder(receiver);
52-
68+
5369
ordered.verify(receiver).startRecord(REC_ID);
5470
ordered.verify(receiver).literal(NAME, VALUE1);
5571
ordered.verify(receiver).literal(NAME, VALUE2);
5672
ordered.verify(receiver).endRecord();
5773
ordered.verify(receiver).startRecord(REC_ALT_ID);
5874
ordered.verify(receiver).literal(NAME, VALUE1);
75+
ordered.verify(receiver).endRecord();
5976
}
60-
77+
6178
@Test
6279
public void testShouldDecodeEntities() {
63-
final StreamReceiver receiver = Mockito.mock(StreamReceiver.class);
64-
final TripleCollect collect = new TripleCollect();
65-
collect.setReceiver(receiver);
66-
6780
collect.process(new Triple(REC_ID, ENTITY_NAME, Formeta.GROUP_START +NAME + Formeta.NAME_VALUE_SEPARATOR + VALUE
6881
+ Formeta.ITEM_SEPARATOR + ENTITY_NAME + Formeta.GROUP_START + NAME
6982
+ Formeta.NAME_VALUE_SEPARATOR + VALUE + Formeta.GROUP_END + Formeta.GROUP_END,
7083
ObjectType.ENTITY));
71-
84+
collect.closeStream();
85+
7286
final InOrder ordered = Mockito.inOrder(receiver);
73-
87+
7488
ordered.verify(receiver).startRecord(REC_ID);
7589
ordered.verify(receiver).startEntity(ENTITY_NAME);
7690
ordered.verify(receiver).literal(NAME, VALUE);
7791
ordered.verify(receiver).startEntity(ENTITY_NAME);
7892
ordered.verify(receiver).literal(NAME, VALUE);
7993
ordered.verify(receiver, Mockito.times(2)).endEntity();
94+
ordered.verify(receiver).endRecord();
95+
}
8096

97+
@Test
98+
public void shouldNotEmitEndRecordOnCloseStreamIfNoTriplesWereReceived() {
99+
collect.closeStream();
100+
101+
verify(receiver, never()).endRecord();
102+
}
103+
104+
@Test
105+
public void shouldNotEmitEndRecordOnResetStreamIfNoTriplesWereReceived() {
106+
collect.resetStream();
107+
108+
verify(receiver, never()).endRecord();
81109
}
110+
82111
}

0 commit comments

Comments
 (0)