Skip to content

Commit a04e617

Browse files
committed
continue rework network part
1 parent 7ae4fc9 commit a04e617

15 files changed

+74
-23
lines changed

application/src/test/resources/log4j2.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
<Logger name="javasabr.mqtt.network.client.AbstractMqttClient" level="DEBUG" additivity="false">
1010
<AppenderRef ref="BrokerConsoleTest"/>
1111
</Logger>
12+
<Logger name="javasabr.mqtt.service.impl.DefaultConnectionService" level="DEBUG" additivity="false">
13+
<AppenderRef ref="BrokerConsoleTest"/>
14+
</Logger>
1215
<Logger name="javasabr.mqtt.model.network.impl.DefaultMqttSession" level="DEBUG" additivity="false">
1316
<AppenderRef ref="BrokerConsoleTest"/>
1417
</Logger>

network/src/main/java/javasabr/mqtt/network/impl/DefaultMqttSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private static void updatePendingPacket(
6464
}
6565

6666
if (pendingPublish == null) {
67-
log.warning(clientId , response, "Not found pending publish for client:[%s] by received packet:[%]"::formatted);
67+
log.warning(clientId , response, "Not found pending publish for client:[%s] by received packet:[%s]"::formatted);
6868
return;
6969
}
7070

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@NullMarked
2+
package javasabr.mqtt.network.impl;
3+
4+
import org.jspecify.annotations.NullMarked;

service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ protected void processReceivedMessage(
5656
return;
5757
}
5858

59+
log.debug(
60+
connection.client().clientId(),
61+
networkPacket.name(),
62+
networkPacket,
63+
"[%s] Received from client message:[%s] %s"::formatted);
64+
5965
try {
6066
//noinspection DataFlowIssue
6167
inMessageHandlers[mrp.packetType()].processReceived(connection, mrp);

service/src/main/java/javasabr/mqtt/service/impl/DefaultPublishDeliveringService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public DefaultPublishDeliveringService(
4848
public PublishHandlingResult startDelivering(PublishInPacket publish, SingleSubscriber subscriber) {
4949
try {
5050
//noinspection DataFlowIssue
51-
return publishOutMessageHandlers[publish.getQos().index()].handle(publish, subscriber);
51+
return publishOutMessageHandlers[subscriber.getQos().index()].handle(publish, subscriber);
5252
} catch (IndexOutOfBoundsException | NullPointerException ex) {
5353
log.warning(publish, "Received not supported publish message:[%s]"::formatted);
5454
return PublishHandlingResult.UNSPECIFIED_ERROR;

service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ protected void processReceived(
2626
DisconnectInPacket networkPacket) {
2727
DisconnectReasonCode reasonCode = networkPacket.getReasonCode();
2828
if (reasonCode == DisconnectReasonCode.NORMAL_DISCONNECTION) {
29-
log.info(client, "Disconnect client:[%s]"::formatted);
29+
log.info(client.clientId(), "Disconnect client:[%s]"::formatted);
3030
} else {
3131
log.error("Disconnect client:[%s] by error reason:[%s]".formatted(client, reasonCode));
3232
}
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@
66
import javasabr.mqtt.network.packet.HasPacketId;
77
import javasabr.mqtt.network.packet.in.MqttReadablePacket;
88

9-
public abstract class PendingResponseMqttInMessageHandler<P extends MqttReadablePacket & HasPacketId>
9+
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttReadablePacket & HasPacketId>
1010
extends AbstractMqttInMessageHandler<ExternalMqttClient, P> {
1111

12-
protected PendingResponseMqttInMessageHandler(Class<P> expectedNetworkPacket) {
12+
protected PendingOutResponseMqttInMessageHandler(Class<P> expectedNetworkPacket) {
1313
super(ExternalMqttClient.class, expectedNetworkPacket);
1414
}
1515

service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishAckMqttInMessageHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import javasabr.mqtt.network.packet.MqttPacketType;
44
import javasabr.mqtt.network.packet.in.PublishAckInPacket;
55

6-
public class PublishAckMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishAckInPacket> {
6+
public class PublishAckMqttInMessageHandler extends PendingOutResponseMqttInMessageHandler<PublishAckInPacket> {
77

88
public PublishAckMqttInMessageHandler() {
99
super(PublishAckInPacket.class);

service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishCompleteMqttInMessageHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import javasabr.mqtt.network.packet.MqttPacketType;
44
import javasabr.mqtt.network.packet.in.PublishCompleteInPacket;
55

6-
public class PublishCompleteMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishCompleteInPacket> {
6+
public class PublishCompleteMqttInMessageHandler extends
7+
PendingOutResponseMqttInMessageHandler<PublishCompleteInPacket> {
78

89
public PublishCompleteMqttInMessageHandler() {
910
super(PublishCompleteInPacket.class);

service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishReceiveMqttInMessageHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
import javasabr.mqtt.network.packet.MqttPacketType;
44
import javasabr.mqtt.network.packet.in.PublishReceivedInPacket;
55

6-
public class PublishReceiveMqttInMessageHandler extends PendingResponseMqttInMessageHandler<PublishReceivedInPacket> {
6+
public class PublishReceiveMqttInMessageHandler extends
7+
PendingOutResponseMqttInMessageHandler<PublishReceivedInPacket> {
78

89
public PublishReceiveMqttInMessageHandler() {
910
super(PublishReceivedInPacket.class);

0 commit comments

Comments
 (0)