1. paho-mqtt 모듈
- MQTT Python Client 모듈
- 설치
- sudo pip3 install paho-mqtt
- api 문서
2. 구독자(Subscriber) 개발 절차
- MQTT 클라이언트 클래스 인스턴스화
- 브로커 연결
- 토픽 구독 신청
- 토픽 수신시 호출할 핸들러 등록
- 토픽 수신 대기
- 토픽 수신시 리 - 핸들러 호출
3. 발행자(Publisher) 개발 절차
- MQTT 클라이언트 클래스 인스턴스화
- 브로커 연결
- 필요시 토픽 발행(전송)
4. 클라이언트 클래스 인스턴스화하기: Client()
Client(client_id="", clean_session=True, userdata=None, protocol=MQTTv311,
transport="tcp"
- client_id
- 브로커에 연결할 때 사용되는 고유 클라이언트 ID 문자열
- 비어있거나 None인 경우, 무작위로 결정
- 이 경우 clean_session은 True여야함.
- clean_session
- 브로커와의 연결이 끊어졌을 때 브로커가 클라이언트의 정보를 지울지 여부
- userdata
- protocol
- MQTT 버전(MQTTv31 or MQTTv311)
- transport
- 전송 프로토콜(”tcp” 또는 “websocket”)
import paho.mqtt.client as mqtt
mqttc = mqtt.Client()
5. 연결: connect()
connect(host, port = 1883, keepalive = 60, bind_address="")
- host
- port
- keepalive
- 브로커와의 통신 사이에 허용되는 최대 기간 (초).
- 다른 메시지가 교환되지 않으면 클라이언트가 브로커에 ping 메시지를 보낼 속도를 제어
- bind_address
- 네트워크 인터페이스(랜카드)가 여러 개인 경우 바인딩할 IP 주소
- 콜백(Callback)
- 연결 성공시 on_connect() 콜백 호출
6. 비동기 연결: connect_async()
connect_async(host, port=1883, keepalive= 60, bind_address="")
- loop_start() 와 함께 사용하여 비 차단 방식으로 연결
7. 재연결: reconnect()
8. 연결 끊기: disconnect()
9. 네트워크 루프(프로그래밍)
- 메시지 수신 처리를 위해서는 스레드로 구현 필요
- loop()
- loop_start()
- 새로운 스레드를 실행하여 loop()를 무한 실행
- loop_forever()
10. 토픽 발간하기: publish()
publish(topic, payload = None, qos = 0, retain = False)
- topic
- payload
- 전송할 메시지
- 문자열 또는 바이트 데이터(struct.pack() 필요)
- qos
- qos(quality of service) 수준
- retain
- True 설정한 경우, 해당 토픽에 대한 가장 최근의 메시지 유지
11. 구독/구독 취소: subscribe()/unsubscribe()
- subscribe(topic, qos = 0)
- unsubscribe(topic)
12. 콜백
- 특정 이벤트(접속 성공, 메시지 수신 등)가 발생했을 때 호출되는 핸들러(함수)
- on_connect(client, userdata, flags, rc)
- 브로커가 연결 요청에 응답 할 때 호출
- client
- userdata
- Client() 또는 user_data_set()에서 설정한 사용자 데이터
- flags
- rc
- 연결 결과
- 0: 연결 성공
- 1~5: 연결 거부
- 6~255; 현재 사용되지 않습니다.
- on_disconnect(client, userdata, rc)
- 브로커와 연결이 끊어질 때 호출
- client
- userdata
- Client() 또는 user_data_set()에서 설정한 사용자 데이터
- rc
- 처리 결과
- 0 : 정상적으로 끊김
- 0이 아닌 값: 예외에 의해 끊어짐
# 콜백: 연결 및 연결 끊기 처리
def on_connect(client, userdata, flags, rc):
print("Connection returned result: " + rc)
def on_disconnect(client,userdata,rc):
if rc != 0:
print("Unexpected disconnection.")
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
- on_message(client, userdata, message)
- subscriber에서 메시지(토픽)가 수신됬을 때 호출
- client
- userdata
- Client() 또는 user_data_set()에서 설정한 사용자 데이터
- message
- MQTTMessage 인스턴스
- 속성: topic, payload, qos, retain
# 콜백: 메시지 수신 처리
def on_message(client, userdata, message):
print("Recieved message'" + str(message.payload) + "'ontopic'" + meeage.topic
+ "'with QoS" + str(message.qos))
mqtt.on_message = on_message
13. Subscriber 구현
import paho.mqtt.client as mqtt
# 브로커 접속 시도 결과 처리 콜백 함수
def on_connect(client, userdata, falgs, rc):
print("Connected with result code " + str(rc))
if rc == 0:
client.subscribe("iot/#") # 연결 성공시 토픽 구독 신청
else:
print('연결 실패 : ', rc)
# 관련 토픽 메시지 수신 콜백 함수
def on_message(client, userdata, msg):
value = float(msg.payload.decode())
print(f" {msg.topic} {value}")
# 1. MQTT 클라이언트 객체 인스턴스화
client = mqtt.Client()
# 2. 관련 이벤트에 대한 콜백 함수 등록
client.on_connect = on_connect
client.on_message = on_message
try:
# 3. 브로커 연결
client.connect("localhost")
# 4. 메시지 루프 - 이벤트 발생 시 해당 콜백 함수 호출 됨
client.loop_forever()
except Exception as err:
print('에러 : %s'%err)
14. Publisher 구현
import paho.mqtt.client as mqtt
# 1. MQTT 클라이언트 객체 인스턴스화
client = mqtt.Client()
try:
# 2. 브로커 연결
client.connect("localhost")
# 3. 토픽 메시지 발행
client.publish("iot/home2/greet", "Hello World!")
client.loop(2)
except Exception as err:
print(f'에러 : {err}')
15. 수신 데이터 DB 저장하기
CREATE TABLE SENSORS(
ID INTEGER PRIBERY KEY AUTOINCREMENT NOT NULL,
USER CHAR(20),
PLACE CHAR(20),
SENSOR CHAR(20),
VALUE REAL,
REG_DATE DATETIME DEFAULT CURRENT_TIMESTAMP
)
import sqlite3
con = sqlite3.connect('iot.db')
cursor = con.cursor()
.
.
.
# 관련 토픽 메시지 수신 콜백 함수
def on_message(client, userdata, msg):
value = float(msg.payload.decode())
(_, user, place, sensor) = msg.topic.split('/')
sql = f'''INSERT INTO sensors(user, place, sensor, value)
VALUES('{user}', '{place}', '{sensor}', {value})'''
cursor.excute(sql)
con.commit()
except Exception as err:
cursor.close()
con.close()