๐ INDEX
[SpringBoot] MQTT ๊ธฐ๋ฐ ๋จ๋ง๊ธฐ ํต์ API ์๋ฒ ๊ตฌ์ถํ๊ธฐ
์ด ๊ธ์์๋ Java Spring Boot๋ฅผ ์ฌ์ฉํ์ฌ MQTT ๊ธฐ๋ฐ์ ๋จ๋ง๊ธฐ ํต์ API ์๋ฒ๋ฅผ ๊ตฌ์ถํ๋ ๋ฐฉ๋ฒ์ ์๊ฐํฉ๋๋ค. MQTT ๋ธ๋ก์ปค๋ฅผ ํตํด ๋จ๋ง๊ธฐ์์ ์ค์๊ฐ ๋ฐ์ดํฐ ์ก์์ ๊ธฐ๋ฅ์ ๊ตฌํํ๊ณ , ์์ง๋ ์ ๋ณด๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํ๋ ๊ณผ์ ์ ๋ค๋ฃน๋๋ค.
MqttConfig.java
์ ๋ฐ์ ์ผ๋ก mqtt Broker์์ ๋ฐํ ๋ฐ ๊ตฌ๋ ๋๋ message๋ฅผ ์ฝ๊ณ , ์ฒ๋ฆฌํ๋ ๊ธฐ๋ฅ๋ค์ MqttConfig ์์ ๋ค๋ฃจ๊ณ ์์ต๋๋ค.
(์ถํ์ ๋ฆฌํํ ๋ง์ด ํ์ํ ๊ฒ์ผ๋ก ๋ณด์ ๋๋ค.)
MqttConfig.java ํ์ผ์ ์์ฑํ๊ณ , @Log4j2, @Configuration ์ด๋ ธํ ์ด์ ์ ์ฌ์ฉํด ์ฃผ์์ต๋๋ค.
@Log4j2
ํด๋์ค์ ์ด ์ด๋ ธํ ์ด์ ์ ๋ถ์ด๋ฉด Log4j 2 ์ ๋ก๊ฑฐ ์ธ์คํด์ค๊ฐ ์์ฑ๋์ด ๋ก๊ทธ๋ฅผ ๊ธฐ๋กํ ์ ์์ต๋๋ค.
์ฝ๋ ๋ด์์ ๋ก๊ทธ๋ฅผ ์ฝ๊ฒ ์์ฑํ ์ ์๋๋ก ๋์์ฃผ๋ฉฐ, ๋ค์ํ ๋ก๊ทธ๋ ๋ฒจ์ ์ง์ํฉ๋๋ค.
@Configuration
ํด๋น ํด๋์ค๊ฐ ํ๋ ์ด์์ Spring Bean์ ์ ์ํ๋ ๊ตฌ์ฑ ํด๋์ค๋ฅผ ๋ํ๋ ๋๋ค.
์ด ํด๋์ค๋ Spring ์ IoC ์ปจํ ์ด๋์ ์ํด ๊ด๋ฆฌ๋๋ฉฐ, ์ ํ๋ฆฌ์ผ์ด์ ์ ์ค์ ์ Java ์ฝ๋๋ก ์ ์ํ ์ ์๊ฒ ํด ์ฃผ๋ฉฐ, @Bean ์ด๋ ธํ ์ด์ ๊ณผ ํจ๊ป ์ฌ์ฉํ์ฌ ์คํ๋ง ์ปจํ ์ด๋์ ๋ฑ๋กํ ๊ฐ์ฒด๋ฅผ ์ ์ํ๊ณ ์ด๋ฅผ ํตํด XML ๊ธฐ๋ฐ ์ค์ ์ ๋์ฒดํ ์ ์์ต๋๋ค.
build.gradle ์ MQTT ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ํฌํธํ๊ธฐ
Spring Integration MQTT๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด mqtt ์์กด์ฑ์ ์ถ๊ฐํด ์ค๋๋ค.
dependencies {
implementation 'org.springframework.integration:spring-integration-mqtt'
}
Maven, Gradle ๋ฌด๊ดํ๋ฉฐ, ์ฌ์ฉํ๋ ์ค์ ์ ๋ง์ถฐ ์์กด์ฑ์ ์ถ๊ฐํด ์ฃผ๋ฉด ๋ฉ๋๋ค.
application.yml ์ MQTT URL ์ ๋ณด ์ค์ ํ๊ธฐ
MQTT ๋ธ๋ก์ปค URL ์ ๋ณด๋ฅผ ์ค์ ํ์ผ์ ์ถ๊ฐํฉ๋๋ค. application.properties ๋ฑ ์ฌ์ฉํ๋ ์ค์ ์ ๋ง๊ฒ ์ค์ ํด ์ค๋๋ค
spring.mqtt.url=tcp://localhost:1883
spring.mqtt.clientId=myClientId
spring.mqtt.username=myUsername // ํ์ํ ๊ฒฝ์ฐ
spring.mqtt.password=myPassword // ํ์ํ ๊ฒฝ์ฐ
spring.mqtt.subtopic: LIYO/V1/liyo/+/OUTBOX
pubTopicAddDeviceId: LIYO // ํ์ํ ๊ฒฝ์ฐ
spring.mqtt.qos: 1
URL ์ธ์๋ subtopic๊ณผ qos ์ ๋ณด๋ ์ถ๊ฐํด ์ฃผ์์ต๋๋ค.
**Topic
MQTT์์ ํ ํฝ์ ๋ฉ์์ง๊ฐ ์ ์ก๋๋ ๊ฒฝ๋ก๋ฅผ ๋ํ๋ด๋ ๋ฌธ์์ด์ ๋๋ค.
ํด๋ผ์ด์ธํธ๋ ํน์ ํ ํฝ์ ๊ตฌ๋ (subscribe)ํ๊ฑฐ๋, ํด๋น ํ ํฝ์ผ๋ก ๋ฉ์์ง๋ฅผ ๋ฐํ(publish)ํ ์ ์์ต๋๋ค.
ํ ํฝ์ ๊ณ์ธต๊ตฌ์กฐ๋ก ๊ตฌ์ฑ๋์ด ์์ด ๋ฉ์์ง๋ฅผ ์กฐ์ง์ ์ผ๋ก ๊ด๋ฆฌํ๊ณ ํํฐ๋งํ ์ ์์ต๋๋ค.
์์ผ๋์นด๋
+ : ํ ๋ ๋ฒจ์ ๋์ฒดํฉ๋๋ค. (ex: LIYO/+/liyo)
# : ์ฌ๋ฌ ๋ ๋ฒจ์ ๋์ฒดํฉ๋๋ค. (ex: LIYO/#)
QoS ๋ ๋ฒจ
MQTT์ QoS(Quality of Service)๋ ๋ฉ์์ง๊ฐ ์ ์ก๋๋ ๋ฐฉ์๊ณผ ์ ๋ขฐ์ฑ์ ์ ์ํฉ๋๋ค.
QoS 0 : ๋ฉ์์ง๊ฐ ํ ๋ฒ๋ง ์ ์ก๋๋ฉฐ ๊ฐ์ฅ ๋น ๋ฅด์ง๋ง ์ ๋ขฐ์ฑ์ด ๊ฐ์ฅ ๋ฎ์ผ๋ฉฐ ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆฌ๋ฐ ๋ฑ์ ์ฌ์ฉ๋ฉ๋๋ค.
QoS 1 : ๋ฉ์์ง๊ฐ ์ต์ํ ํ ๋ฒ ์ด์ ์ ์ก๋๋ฉฐ ์์ ์๋ ๋ฉ์์ง๋ฅผ ๋ฐ๋์ ๋ฐ์์ผ ํ๋ฉฐ, ์ค๋ณต ์์ ์ด ๋ฐ์ํ ์ ์์ต๋๋ค. ์ํ ์ ๋ฐ์ดํธ์ ๊ฐ์ ์ค์ํ ๋ฉ์์ง์ ์ฃผ๋ก ์ฌ์ฉ๋ฉ๋๋ค.
QoS 2 : ๋ฉ์์ง๊ฐ ์ ํํ ํ ๋ฒ๋ง ์ ์ก๋๋ฉฐ ๊ฐ์ฅ ๋์ ์ ๋ขฐ์ฑ์ ์ ๊ณตํ๋ฉฐ ์ค๋ณต์ด ๋ฐ์ํ์ง ์์ต๋๋ค. ์ ์ก ๊ณผ์ ์ด ๋ณต์กํ์ฌ ์ฑ๋ฅ์ด ๋จ์ด์ง ์ ์์ต๋๋ค. ๊ธ์ต ๊ฑฐ๋์ ๊ฐ์ด ์ค๋ณต์ด ์ ๋ ํ์ฉ๋์ง ์๋ ๊ฒฝ์ฐ์ ์ฃผ๋ก ์ฌ์ฉํฉ๋๋ค.
package com.ebab.beacnd.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Configuration
public class MqttConfig {
@Value("${spring.mqtt.brokerUrl}")
private String brokerUrl;
@Value("${spring.mqtt.qos}")
private int qos;
@Value("${spring.mqtt.subTopic}")
private String subTopic;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.pubTopicAddDeviceId}")
private String pubTopicAddDeviceId;
ํ๋์ ์ค์ ํ์ผ์์ ๊ฐ์ ์ฃผ์ ๋ฐ์ ํด๋น ๊ฐ๋ค์ ์ฌ์ฉํ ์ ์๋๋ก ํด์ค๋๋ค.
MQTT ํด๋ผ์ด์ธํธ ์ค์
MQTT_CLIENT_ID ์ MQTT_CLIENT_ID2 ๋ฅผ static final ๋ณ์๋ก ์์ฑํด ์ฃผ์์ต๋๋ค.
ํด๋ผ์ด์ธํธ ID๊ฐ ๋์ผํ๋ฉด MQTT ๋ธ๋ก๊ฑฐ๊ฐ ์ฐ๊ฒฐ์ ๋์ผ๋ฏ๋ก ๋ฐํ๊ณผ ๊ตฌ๋ ํด๋ผ์ด์ธํธ ID๋ฅผ ๋ฐ๋ก ์ฌ์ฉํฉ๋๋ค
๋ฉ์์ง ๋ฐํ ๋น(Bean) ์ ์
MQTT ํด๋ผ์ด์ธํธ ์์ฑ - mqttClientFactory
์ถ๊ฐ๋๋ ์ฝ๋๋ค์ mqttConfig ์ฝ๋์ ํ๋จ์ ์ถ๊ฐ๋ฉ๋๋ค. ์ ์ฒด ์ด๋ฏธ์ง๋ ์๋ตํ๊ฒ ์ต๋๋ค.
// ๋ฉ์์ง ๋ฐํ (Publish)
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { brokerUrl });
options.setUserName(username);
options.setPassword(password.toCharArray());
factory.setConnectionOptions(options);
return factory;
}
MqttPahoClientFactory๋ฅผ ์ค์ ํ์ฌ MQTT ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํฉ๋๋ค.
MqttConnectOptions๋ฅผ ์ฌ์ฉํ์ฌ ๋ธ๋ก์ปค์ URL, ์ฌ์ฉ์ ์ด๋ฆ ๋ฐ ๋น๋ฐ๋ฒํธ๋ฅผ ์ค์ ํฉ๋๋ค.
ํจ์ค์๋๋ toCharArray() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ๋ฌธ์์ด์ ๋ฌธ์ ๋ฐฐ์ด๋ก ๋ณํํ์ฌ ๋ณด์์ ๊ฐํํด์ผ ํฉ๋๋ค.
๋ฌธ์์ด(String)์ ๋ถ๋ณ ๊ฐ์ฒด๋ก, ๋ฉ๋ชจ๋ฆฌ์ ๋ฌธ์์ด์ด ๋จ์์์ ์ ์์ด ์ํธํ ๊ฐ์ ๋ฏผ๊ฐํ ์ ๋ณด๊ฐ ๋ฉ๋ชจ๋ฆฌ์์ ์ฝ๊ฒ ์ ๊ทผ๋ ์ ์์ต๋๋ค. char[] ๋ฐฐ์ด ๊ฐ์ฒด ์ฌ์ฉ์ผ๋ก ์ฌ์ฉ ํ ์ฆ์ ๋ด์ฉ์ ์ง์ ๋ฉ๋ชจ๋ฆฌ์์ ๋น๋ฐ๋ฒํธ๊ฐ ๋จ์ ์์ง ์๋๋ก ํฉ๋๋ค.
MQTT ๋ฉ์์ง ๋ฐํ ์ฑ๋ ์ค์ - mqttOutbound
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(MQTT_CLIENT_ID2, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(pubTopicAddDeviceId);
messageHandler.setCompletionTimeout(1000);
return messageHandler;
}
๋ฐํ ์ฑ๋์ ์ค์ ํ๋ ๋น์ ๋๋ค. MqttPahoMessageHandler๋ฅผ ์ฌ์ฉํ์ฌ ๋ฉ์์ง๋ฅผ ๋น๋๊ธฐ๋ก ๋ฐํํฉ๋๋ค.
๊ธฐ๋ณธ ๋ฐํ ํ ํฝ์ ๋ฏธ๋ฆฌ ์ค์ ํด ๋ pubTopicAddDevice ๋ก ์ค์ ํ์์ต๋๋ค.
MQTT ๋ฉ์์ง ๋ฐํ ์ฑ๋ - mqttOutboundChannel
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ๋ฐ ์ฌ์ฉ๋๋ ์ฑ๋์ ๋๋ค.
MQTT ๋ฉ์์ง ์ ์ก ์ธํฐํ์ด์ค ์ค์ - MyGateway
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
๋ฉ์์ง๋ฅผ MQTT๋ก ์ ์กํ๊ธฐ ์ํ ์ธํฐํ์ด์ค์ ๋๋ค.
@MessageGatewqy ์ด๋ ธํ ์ด์ ์ ํตํด Spring Integration์ ๋ฉ์์ง ๊ฒ์ดํธ์จ์ด๋ก ์ค์ ๋ฉ๋๋ค.
๋ฉ์์ง ๊ตฌ๋ ๋น(Bean) ์ ์
๋ฉ์์ง ๊ตฌ๋ ์ค์ - mqttInputChannel
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
MQTT ๋ฉ์์ง๋ฅผ ์์ ํ ์ฑ๋์ ์ ์ํฉ๋๋ค. DirectChannel์ ๋ฉ์์ง๋ฅผ ์์ ํ๋ ๋ฐ ์ฌ์ฉ๋๋ ๋๊ธฐ์ ์ฑ๋์ ๋๋ค.
MQTT ๋ฉ์์ง ์์ฐ์ ์ค์ - inboundChannel
@Bean
public MessageProducer inboundChannel() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(brokerUrl, MQTT_CLIENT_ID, subTopic);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(qos);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
MQTT ๋ฉ์์ง๋ฅผ ์์ ํ๋ ์ด๋ํฐ๋ฅผ ์ค์ ํฉ๋๋ค.
- brokerUrl: MQTT ๋ธ๋ก์ปค์ URL.
- MQTT_CLIENT_ID: ํด๋ผ์ด์ธํธ ID.
- subTopic: ๊ตฌ๋ ํ ์ฃผ์ .
- setCompletionTimeout(5000): ๋ฉ์์ง ์ ์ก ์๋ฃ ๋๊ธฐ ์๊ฐ ์ค์ (5์ด).
- setConverter: ๋ฉ์์ง ๋ณํ๊ธฐ๋ฅผ ์ค์ ํฉ๋๋ค.
- setQos(qos): QoS(์๋น์ค ํ์ง) ์์ค ์ค์ .
- setOutputChannel(mqttInputChannel()): ์์ ํ ๋ฉ์์ง๋ฅผ mqttInputChannel๋ก ๋ณด๋ ๋๋ค.
MQTT ํต์ ์ด๋ ฅ DB ์ ์ฅ - mqttServiceProcess
public void mqttServiceProcess(MqttDto mdto, String topic, String inOugGb, String payload) {
mdto.setTopic(topic);
mdto.setDirection_cd(inOugGb);
mdto.setPayload(payload);
mqttService.insertACND0910(mdto);
}
MQTT ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๊ณ , ๊ด๋ จ ์ ๋ณด๋ฅผ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ์ ์ฅํฉ๋๋ค.
๊ด๋ จ ๋ฐ์ดํฐ๋ฅผ ๋ด์ mdto๋ฅผ ์์ฑํด ์ฃผ์์ต๋๋ค.
ํ ํฝ ์ฐ๊ฒฐ ํ์ธ
๋ณธ๋ฌธ๋ด์ฉ ์ฝ์
public boolean isTopicConnection(String topic) {
boolean result = false;
return result;
}
ํน์ ํ ํฝ์ ์ฐ๊ฒฐ ์ํ๋ฅผ ํ์ธํ๋ ๋ฉ์๋์ ๋๋ค. ํ์ฌ๋ ํญ์ false๋ฅผ ๋ฐํํฉ๋๋ค.
๋ฌธ์์ด ๋ฐํ
public String booleanString(String bool) {
String returnStr = "N";
if ("true".equals(bool)) {
returnStr = "Y";
}
return returnStr;
}
๋ฌธ์์ด "ture"๋ฅผ ํ์ธํ์ฌ "Y" ๋๋ "N"์ ๋ฐํํฉ๋๋ค. ์ฃผ๋ก Boolean ๊ฐ์ ๋ฌธ์์ด๋ก ๋ณํํ๋ ์ฉ๋๋ก ์ฌ์ฉํฉ๋๋ค.
MQTT ์์ฒญ ์ฒ๋ฆฌ - callMqttservice
๋ณธ๋ฌธ๋ด์ฉ ์ฝ์
// MQTT ์์ฒญ
public String callMqttService(SubPayLoad subPayLoad, String inboxGb) {
String resMessage = "";
try {
log.info(":::::::::::::::::::::::MQTT ์ ์ก ์ getDeviceId {}", subPayLoad.getDeviceId());
// data์ request data ์กฐํํ์ฌ ๊ฐ์ ธ์ด
log.info("resMessage: {}", resMessage);
myGateway.sendToMqtt(resMessage, pubTopicAddDeviceId);
} catch (Exception e) {
log.info(":::::::::::::::::::::::callMqttService Exception : {}", e.toString());
resMessage = null;
return resMessage;
}
return resMessage;
}
ํด๋น ํ๋ก์ ํธ์์๋ ๋ฐํ์ ๋ฐ๋ก ํ์ง ์๊ณ , ๊ตฌ๋ ํ ๋ฐ์ดํฐ๋ฅผ DB์ ์ ์ฅํ๊ธฐ ๊น์ง๋ง ํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ ํด๋น ๋ด์ฉ์ ๋ด์ Mqtt ์์ฒญ์ ์ฒ๋ฆฌํ๊ณ ๋ฉ์์ง๋ฅผ ์ ์กํ๋ ๋ฉ์๋์ ๋๋ค.
์ฌ๊ธฐ๊น์ง MQTT ๋ฉ์์ง๋ฅผ ์ฃผ๊ณ ๋ฐ๊ธฐ ์ํ ๊ธฐ๋ณธ ์ค์ ์ด ์๋ฃ๋์์ต๋๋ค.
MQTT ๋ฉ์์ง ๊ตฌ๋ ํ ์ฒ๋ฆฌ - inboundMessageHandler
MQTT ๋ฉ์์ง ์์ ๋ฐ ๋ก๊ทธ ์ถ๋ ฅ
String topic = (String) message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC);
log.info(":::::::::::::::::::::::Topic:{}", topic);
log.info(":::::::::::::::::::::::Payload:{}", message.getPayload());
MQTT ๋ฉ์์ง์ ํ ํฝ๊ณผ Payload๋ฅผ ๋ก๊น ํด ์ค๋๋ค. ๋ธ๋ก์ปค๋ก๋ถํฐ ์์ ํ ๋ฉ์์ง์ ๋ด์ฉ์ ํ์ธํ ์ ์์ต๋๋ค.
Payload JSON ํ์ฑ
String payload = message.getPayload().toString();
ObjectMapper mapper = new ObjectMapper();
SubPayLoad subPayLoad = mapper.readValue(payload, SubPayLoad.class);
ํ์ด๋ก๋๋ฅผ JSON ํ์์ผ๋ก ๊ฐ์ฃผํ๊ณ ์ด๋ฅผ SubPayload ๊ฐ์ฒด๋ก ๋ณํํด ์ค๋๋ค.
ObjectMapper ๋ฅผ ์ฌ์ฉํด์ JSON ๋ฐ์ดํฐ๋ฅผ ๋งคํํด ์ฃผ์์ต๋๋ค.
๋ฉ์์ง ํ์ ์ ๋ฐ๋ฅธ ๋ถ๊ธฐ ์ฒ๋ฆฌ
String messageType = subPayLoad.getMessageType();
switch (messageType) {
case "EVENT_BOOTUP":
// ๋๋ฐ์ด์ค ๋ถํ
์ฒ๋ฆฌ
break;
case "EVENT_SENSOR_REPORT":
// ์ผ์ฑ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
break;
default:
break;
}
subPayLoad์์ MessageType ์ ์ถ์ถํ์ฌ ๋ฉ์์ง ํ์ ๋ณ ๋ถ๊ธฐ๋ฅผ ์ฒ๋ฆฌํฉ๋๋ค.
์ ๋ ๋จ๋ง๊ธฐ์ ๋ถํ ๊ณผ, ์ผ์ฑ ์ ๋ณด๋ฅผ ์์งํ๋ MQTT ์๋ฒ์ด๊ธฐ ๋๋ฌธ์, ๋๋ฐ์ด์ค ๋ถํ , ์ผ์ฑ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ๋ฉ์์ง๋ก ๋ฐ๊ณ ์์ต๋๋ค.
ํด๋น ์ ๋ณด๋ค์ ํตํด ๋จ๋ง๊ธฐ ์ค์ ์ ๋ณด, ํ์ฌ ์ํ, ์ฐ๊ฒฐ ์ด๋ ฅ, ์ผ์ฑ ๋ฐ์ดํฐ ์ด๋ ฅ ๋ฑ์ ์ ์ฅํ์ฌ ๊ด๋ฆฌํ ์ ์์ต๋๋ค.
๊ฐ๋ฐ ํ๊ฒฝ
- Frontend
- Framework: Vue 3
- Build Tool: Vite
- Backend
- Framework: Spring Boot 3.2.2
- Language: Java
- Gateway: Spring Cloud Gateway
- Database
- DBMS: MS SQL Server
- Container: Docker
- Development Tools
- IDE: Visual Studio Code (VSCode)