Skip to content

Commit b2e5f2d

Browse files
authored
Merge pull request #37 from JavaSaBr/feature-broker-33
Separate mqtt clients to internal/external
2 parents bc1fa49 + ef184b0 commit b2e5f2d

15 files changed

+517
-231
lines changed

src/main/java/com/ss/mqtt/broker/MqttBrokerApplication.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.ss.mqtt.broker;
22

33
import com.ss.mqtt.broker.config.MqttBrokerConfig;
4+
import com.ss.mqtt.broker.config.MqttNetworkConfig;
45
import lombok.RequiredArgsConstructor;
56
import org.jetbrains.annotations.NotNull;
67
import org.springframework.boot.SpringApplication;
@@ -9,7 +10,10 @@
910

1011
@Configuration
1112
@RequiredArgsConstructor
12-
@Import(MqttBrokerConfig.class)
13+
@Import({
14+
MqttBrokerConfig.class,
15+
MqttNetworkConfig.class
16+
})
1317
public class MqttBrokerApplication {
1418

1519
public static void main(@NotNull String[] args) {

src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java

Lines changed: 2 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -11,52 +11,23 @@
1111
import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler;
1212
import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler;
1313
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
14-
import com.ss.mqtt.broker.model.MqttPropertyConstants;
15-
import com.ss.mqtt.broker.model.QoS;
16-
import com.ss.mqtt.broker.network.MqttConnection;
17-
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
18-
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
1914
import com.ss.mqtt.broker.network.packet.PacketType;
2015
import com.ss.mqtt.broker.service.*;
2116
import com.ss.mqtt.broker.service.impl.*;
22-
import com.ss.rlib.network.BufferAllocator;
23-
import com.ss.rlib.network.Network;
24-
import com.ss.rlib.network.NetworkFactory;
25-
import com.ss.rlib.network.ServerNetworkConfig;
26-
import com.ss.rlib.network.impl.DefaultBufferAllocator;
27-
import com.ss.rlib.network.server.ServerNetwork;
2817
import lombok.RequiredArgsConstructor;
2918
import lombok.extern.log4j.Log4j2;
3019
import org.jetbrains.annotations.NotNull;
3120
import org.springframework.context.annotation.Bean;
3221
import org.springframework.context.annotation.Configuration;
3322
import org.springframework.core.env.Environment;
3423

35-
import java.net.InetSocketAddress;
36-
import java.nio.channels.AsynchronousSocketChannel;
37-
import java.util.function.BiFunction;
38-
import java.util.function.Consumer;
39-
4024
@Log4j2
4125
@Configuration
4226
@RequiredArgsConstructor
4327
public class MqttBrokerConfig {
4428

45-
private interface ChannelFactory extends
46-
BiFunction<Network<MqttConnection>, AsynchronousSocketChannel, MqttConnection> {}
47-
4829
private final Environment env;
4930

50-
@Bean
51-
@NotNull ServerNetworkConfig networkConfig() {
52-
return ServerNetworkConfig.DEFAULT_SERVER;
53-
}
54-
55-
@Bean
56-
@NotNull BufferAllocator bufferAllocator(@NotNull ServerNetworkConfig networkConfig) {
57-
return new DefaultBufferAllocator(networkConfig);
58-
}
59-
6031
@Bean
6132
@NotNull ClientIdRegistry clientIdRegistry() {
6233
return new InMemoryClientIdRegistry(
@@ -89,7 +60,7 @@ private interface ChannelFactory extends
8960
}
9061

9162
@Bean
92-
PacketInHandler @NotNull [] devicePacketHandlers(
63+
PacketInHandler @NotNull [] packetHandlers(
9364
@NotNull AuthenticationService authenticationService,
9465
@NotNull ClientIdRegistry clientIdRegistry,
9566
@NotNull SubscriptionService subscriptionService,
@@ -117,7 +88,7 @@ private interface ChannelFactory extends
11788
}
11889

11990
@Bean
120-
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
91+
@NotNull MqttClientReleaseHandler mqttClientReleaseHandler(
12192
@NotNull ClientIdRegistry clientIdRegistry,
12293
@NotNull MqttSessionService mqttSessionService,
12394
@NotNull SubscriptionService subscriptionService
@@ -129,39 +100,6 @@ private interface ChannelFactory extends
129100
);
130101
}
131102

132-
@Bean
133-
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork(
134-
@NotNull ServerNetworkConfig networkConfig,
135-
@NotNull BufferAllocator bufferAllocator,
136-
@NotNull MqttConnectionConfig deviceConnectionConfig,
137-
PacketInHandler @NotNull [] devicePacketHandlers,
138-
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
139-
) {
140-
return NetworkFactory.newServerNetwork(
141-
networkConfig,
142-
deviceConnectionFactory(
143-
bufferAllocator,
144-
deviceConnectionConfig,
145-
devicePacketHandlers,
146-
deviceMqttClientReleaseHandler
147-
)
148-
);
149-
}
150-
151-
@Bean
152-
@NotNull InetSocketAddress deviceNetworkAddress(
153-
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,
154-
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer
155-
) {
156-
157-
var address = new InetSocketAddress("localhost", 1883);
158-
159-
deviceNetwork.start(address);
160-
deviceNetwork.onAccept(mqttConnectionConsumer);
161-
162-
return address;
163-
}
164-
165103
@Bean
166104
@NotNull SubscriptionService subscriptionService() {
167105
return new SimpleSubscriptionService();
@@ -192,92 +130,4 @@ private interface ChannelFactory extends
192130
@NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) {
193131
return new DefaultPublishingService(publishInHandlers);
194132
}
195-
196-
@Bean
197-
@NotNull Consumer<@NotNull MqttConnection> mqttConnectionConsumer() {
198-
return mqttConnection -> {
199-
log.info("Accepted connection: {}", mqttConnection);
200-
var client = (UnsafeMqttClient) mqttConnection.getClient();
201-
mqttConnection.onReceive((conn, packet) -> client.handle(packet));
202-
};
203-
}
204-
205-
@Bean
206-
@NotNull MqttConnectionConfig deviceConnectionConfig() {
207-
return new MqttConnectionConfig(
208-
QoS.of(env.getProperty("mqtt.connection.max.qos", int.class, 2)),
209-
env.getProperty(
210-
"mqtt.connection.max.packet.size",
211-
int.class,
212-
MqttPropertyConstants.MAXIMUM_PACKET_SIZE_DEFAULT
213-
),
214-
env.getProperty(
215-
"mqtt.connection.min.keep.alive",
216-
int.class,
217-
MqttPropertyConstants.SERVER_KEEP_ALIVE_DEFAULT
218-
),
219-
env.getProperty(
220-
"mqtt.connection.receive.maximum",
221-
int.class,
222-
MqttPropertyConstants.RECEIVE_MAXIMUM_DEFAULT
223-
),
224-
env.getProperty(
225-
"mqtt.connection.topic.alias.maximum",
226-
int.class,
227-
MqttPropertyConstants.TOPIC_ALIAS_MAXIMUM_DISABLED
228-
),
229-
env.getProperty(
230-
"mqtt.connection.default.session.expiration.time",
231-
long.class,
232-
MqttPropertyConstants.SESSION_EXPIRY_INTERVAL_DEFAULT
233-
),
234-
env.getProperty(
235-
"mqtt.connection.keep.alive.enabled",
236-
boolean.class,
237-
MqttPropertyConstants.KEEP_ALIVE_ENABLED_DEFAULT
238-
),
239-
env.getProperty(
240-
"mqtt.connection.sessions.enabled",
241-
boolean.class,
242-
MqttPropertyConstants.SESSIONS_ENABLED_DEFAULT
243-
),
244-
env.getProperty(
245-
"mqtt.connection.retain.available",
246-
boolean.class,
247-
MqttPropertyConstants.RETAIN_AVAILABLE_DEFAULT
248-
),
249-
env.getProperty(
250-
"mqtt.connection.wildcard.subscription.available",
251-
boolean.class,
252-
MqttPropertyConstants.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT
253-
),
254-
env.getProperty(
255-
"mqtt.connection.subscription.id.available",
256-
boolean.class,
257-
MqttPropertyConstants.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT
258-
),
259-
env.getProperty(
260-
"mqtt.connection.shared.subscription.available",
261-
boolean.class,
262-
MqttPropertyConstants.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT
263-
)
264-
);
265-
}
266-
267-
private @NotNull ChannelFactory deviceConnectionFactory(
268-
@NotNull BufferAllocator bufferAllocator,
269-
@NotNull MqttConnectionConfig connectionConfig,
270-
PacketInHandler @NotNull [] packetHandlers,
271-
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler
272-
) {
273-
return (network, channel) -> new MqttConnection(
274-
network,
275-
channel,
276-
bufferAllocator,
277-
100,
278-
packetHandlers,
279-
connectionConfig,
280-
mqttConnection -> new DeviceMqttClient(mqttConnection, deviceMqttClientReleaseHandler)
281-
);
282-
}
283133
}

0 commit comments

Comments
 (0)