1818
1919import java .util .ArrayList ;
2020import java .util .HashMap ;
21+ import java .util .Iterator ;
2122import java .util .List ;
2223import java .util .Map ;
24+ import java .util .Map .Entry ;
2325
26+ import org .springframework .cloud .stream .binder .BinderHeaders ;
2427import org .springframework .messaging .Message ;
2528import org .springframework .messaging .MessageHeaders ;
2629import org .springframework .messaging .support .MessageBuilder ;
30+ import org .springframework .util .Assert ;
2731
2832/**
2933 * @author Oleg Zhurakousky
3034 * @since 4.2
3135 */
32- public class StandardBatchUtils {
36+ public final class StandardBatchUtils {
3337
34- public static String BATCH_HEADERS = "scst_batchHeaders" ;
38+ private StandardBatchUtils () {
39+
40+ }
3541
42+ /**
43+ * Iterates over batch message structure returning {@link Iterable} of individual messages.
44+ *
45+ * @param batchMessage instance of batch {@link Message}
46+ * @return instance of {@link Iterable} representing individual Messages in a batch {@link Message} as {@link Entry}.
47+ */
48+ public static Iterable <Entry <Object , Map <String , Object >>> iterate (Message <List <Object >> batchMessage ) {
49+ return new Iterable <Map .Entry <Object ,Map <String , Object >>>() {
50+ @ Override
51+ public Iterator <Entry <Object , Map <String , Object >>> iterator () {
52+ return new Iterator <Entry <Object , Map <String , Object >>>() {
53+ int index = 0 ;
54+ @ Override
55+ public Entry <Object , Map <String , Object >> next () {
56+ return getMessageByIndex (batchMessage , index ++);
57+ }
58+
59+ @ Override
60+ public boolean hasNext () {
61+ return index < batchMessage .getPayload ().size ();
62+ }
63+ };
64+ }
65+ };
66+ }
67+
68+ /**
69+ * Extracts individual {@link Message} by index from batch {@link Message}
70+ * @param batchMessage instance of batch {@link Message}
71+ * @param index index of individual {@link Message} in a batch
72+ * @return individual {@link Message} in a batch {@link Message}
73+ */
74+ public static Entry <Object , Map <String , Object >> getMessageByIndex (Message <List <Object >> batchMessage , int index ) {
75+ Assert .isTrue (index < batchMessage .getPayload ().size (), "Index " + index + " is out of bounds as there are only "
76+ + batchMessage .getPayload ().size () + " messages in a batch." );
77+ return new Entry <Object , Map <String ,Object >>() {
78+
79+ @ Override
80+ public Map <String , Object > setValue (Map <String , Object > value ) {
81+ throw new UnsupportedOperationException ();
82+ }
83+
84+ @ SuppressWarnings ("unchecked" )
85+ @ Override
86+ public Map <String , Object > getValue () {
87+ return ((List <Map <String , Object >>) batchMessage .getHeaders ().get (BinderHeaders .BATCH_HEADERS )).get (index );
88+ }
89+
90+ @ Override
91+ public Object getKey () {
92+ return batchMessage .getPayload ().get (index );
93+ }
94+ };
95+ }
3696
3797 public static class BatchMessageBuilder {
3898
@@ -48,13 +108,13 @@ public BatchMessageBuilder addMessage(Object payload, Map<String, Object> batchH
48108 return this ;
49109 }
50110
51- public BatchMessageBuilder addHeader (String key , Object value ) {
111+ public BatchMessageBuilder addRootHeader (String key , Object value ) {
52112 this .headers .put (key , value );
53113 return this ;
54114 }
55115
56116 public Message <List <Object >> build () {
57- this .headers .put (BATCH_HEADERS , this .batchHeaders );
117+ this .headers .put (BinderHeaders . BATCH_HEADERS , this .batchHeaders );
58118 return MessageBuilder .createMessage (payloads , new MessageHeaders (headers ));
59119 }
60120 }
0 commit comments