|
34 | 34 | import org.thingsboard.server.queue.settings.TbQueueCloudEventSettings; |
35 | 35 | import org.thingsboard.server.queue.settings.TbQueueCloudEventTSSettings; |
36 | 36 |
|
37 | | -import java.util.ArrayList; |
38 | 37 | import java.util.List; |
39 | 38 | import java.util.concurrent.ExecutorService; |
40 | 39 | import java.util.concurrent.Executors; |
@@ -118,48 +117,74 @@ protected void onDestroy() throws InterruptedException { |
118 | 117 | } |
119 | 118 | } |
120 | 119 |
|
121 | | - private void processUplinkMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer) { |
122 | | - log.trace("[{}] starting processing cloud events", tenantId); |
123 | | - if (initialized && !syncInProgress) { |
124 | | - isGeneralProcessInProgress = true; |
125 | | - processMessages(msgs, consumer, true); |
126 | | - isGeneralProcessInProgress = false; |
127 | | - } else { |
128 | | - sleep(); |
129 | | - } |
| 120 | + private void processUplinkMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, |
| 121 | + TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer) { |
| 122 | + boolean isProcessed = false; |
| 123 | + do { |
| 124 | + log.trace("[{}] Trying to process general uplink messages", tenantId); |
| 125 | + |
| 126 | + if (initialized && !syncInProgress) { |
| 127 | + isGeneralProcessInProgress = true; |
| 128 | + isProcessed = processMessages(msgs, consumer, true); |
| 129 | + isGeneralProcessInProgress = false; |
| 130 | + } else { |
| 131 | + log.debug("[{}] Waiting: initialized={}, syncInProgress={}", tenantId, initialized, syncInProgress); |
| 132 | + } |
| 133 | + |
| 134 | + if (!isProcessed) { |
| 135 | + sleep(); |
| 136 | + } |
| 137 | + } while (!isProcessed); |
130 | 138 | } |
131 | 139 |
|
132 | | - private void processTsUplinkMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer) { |
133 | | - if (initialized && !syncInProgress && !isGeneralProcessInProgress) { |
134 | | - processMessages(msgs, consumer, false); |
135 | | - } else { |
136 | | - sleep(); |
137 | | - } |
| 140 | + private void processTsUplinkMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, |
| 141 | + TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer) { |
| 142 | + boolean isProcessed = false; |
| 143 | + |
| 144 | + do { |
| 145 | + log.trace("[{}] Trying to process TS uplink messages", tenantId); |
| 146 | + |
| 147 | + if (initialized && !syncInProgress && !isGeneralProcessInProgress) { |
| 148 | + isProcessed = processMessages(msgs, consumer, false); |
| 149 | + } else { |
| 150 | + log.debug("[{}] Waiting: initialized={}, syncInProgress={}, generalInProgress={}", |
| 151 | + tenantId, initialized, syncInProgress, isGeneralProcessInProgress); |
| 152 | + } |
| 153 | + |
| 154 | + if (!isProcessed) { |
| 155 | + sleep(); |
| 156 | + } |
| 157 | + } while (!isProcessed); |
138 | 158 | } |
139 | 159 |
|
140 | 160 | private void sleep() { |
141 | 161 | try { |
142 | 162 | Thread.sleep(cloudEventStorageSettings.getNoRecordsSleepInterval()); |
143 | 163 | } catch (InterruptedException interruptedException) { |
144 | | - log.trace("Failed to wait until the server has capacity to handle new requests", interruptedException); |
| 164 | + log.trace("Interrupted while waiting to retry uplink processing", interruptedException); |
145 | 165 | } |
146 | 166 | } |
147 | 167 |
|
148 | | - private void processMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer, boolean isGeneralMsg) { |
149 | | - List<CloudEvent> cloudEvents = new ArrayList<>(); |
150 | | - for (TbProtoQueueMsg<TransportProtos.ToCloudEventMsg> msg : msgs) { |
151 | | - CloudEvent cloudEvent = ProtoUtils.fromProto(msg.getValue().getCloudEventMsg()); |
152 | | - cloudEvents.add(cloudEvent); |
153 | | - } |
| 168 | + private boolean processMessages(List<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> msgs, |
| 169 | + TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToCloudEventMsg>> consumer, |
| 170 | + boolean isGeneralMsg) { |
| 171 | + List<CloudEvent> cloudEvents = msgs.stream() |
| 172 | + .map(msg -> ProtoUtils.fromProto(msg.getValue().getCloudEventMsg())) |
| 173 | + .toList(); |
| 174 | + |
154 | 175 | try { |
155 | 176 | boolean isInterrupted = processCloudEvents(cloudEvents, isGeneralMsg).get(); |
156 | 177 | if (isInterrupted) { |
157 | | - log.debug("[{}] Send uplink messages task was interrupted", tenantId); |
| 178 | + log.warn("[{}] Send uplink messages task was interrupted", tenantId); |
| 179 | + return false; |
158 | 180 | } else { |
159 | 181 | consumer.commit(); |
| 182 | + log.trace("[{}] Successfully processed {} uplink messages (type={})", tenantId, cloudEvents.size(), isGeneralMsg ? "GENERAL" : "TS"); |
| 183 | + return true; |
160 | 184 | } |
161 | 185 | } catch (Exception e) { |
162 | 186 | log.error("Failed to process all uplink messages", e); |
| 187 | + return false; |
163 | 188 | } |
164 | 189 | } |
165 | 190 |
|
|
0 commit comments