Another/라즈베리파이

[Raspberry Pi] MQTT Pub/Sub Python

Brad_Heo 2021. 4. 11. 16:13

오늘은 MQTT에 대해서 알아보겠습니다.

MQTT를 간단하게 소개하자면 IoT(Internet of Things)를 위한 프로토콜입니다.
MQTT가 왜 특별히 IoT에 특화된 프로토콜인지 알아보고, python을 통해 mqtt 통신을 직접 구현해보겠습니다.

IoT의 특징

사물인터넷이라고 하는 IoT장비를 떠올려보겠습니다. 웨어러블디바이스를 생각해보면 매우 작은 크기 입니다. 또한 우리는 이동하면서 기기를 사용합니다. 이를 조금 정리해보겠습니다.

  • 소형화된 장비에서 사용
  • 여러곳으로 이동하면서 동작하는 경우가 있음
  • 성능이 PC처럼 좋지 않음

위와 같은 경우들을 생각해보면 의문이 생깁니다.

  • 이동 = IP address가 변경됨 ▶︎ IoT장비에 데이터를 어떻게 보내지?
  • 좋지 않은 성능 ▶︎ 데이터를 많이 보내거나 많이 받기가 힘들다

이런 문제들을 어떻게 해결할까요? 이런 문제들을 해결하기 위해서 MQTT라는 것을 사용하게 됩니다.

MQTT?

위에서 간략하게 설명했듯이 MQTT(Message Queue Telemetry Transport)는 IoT에서 사용되는 대표적인 프로토콜입니다. TCP/IP 스택 위에 구축되었습니다. 또한 SSL/TLS에서 실행되어 장치 간의 모든 데이터 통신이 암호화되고 안전한지 확인합니다. 또한 Pub/Sub 구조인 만큼 불안정한 네트워크 환경에서 확장이 가능합니다.

  • IP가 바뀔 때, 사설 IP이더라도 통신이 가능하도록
  • 간단한 프로토콜(가벼움)
  • 구조는 클라이언트-서버 구조가 아닌, Pub/Sub, Broker 구조

왜 다른 프로토콜은 IoT에 적합하지 않을까?

웹개발자들은 HTTP에 익숙합니다. 그렇다면 왜 HTTP를 IoT에 적용하지 않았을까요?

  • HTTP는 동기 프로토콜 입니다. HTTP는 클라이언트는 서버가 응답 할 때까지 기다립니다. IoT는 많은 수의 장치와 불안정하고 지연 시간이 긴 네트워크로 인해 동기식 통신은 문제를 발생시킵니다.
  • HTTP는 단방향입니다. 쉽게 이야기해서, 일반적으로 웹서버는 별도의 요청을 받지 않는 이상 자기가 능동적으로 웹페이지를 제공하지 않습니다. IoT 애플리키이션에서 장치 또는 센서는 일반적으로 클라이언트이므로 네트워크에서 수동으로 명령을 받을 수 없습니다.
  • HTTP는 많은 헤더를 가지고 있는 무거운 프로토콜입니다.

고성능을 제공하는 웹서버는 네트워크 대기시간이 문제가되지 않습니다. 하지만 IoT 장치 또는 센서는 고성능을 제공하지 않습니다. AMQP를 포함한 다른 프로토콜 또한 리소스가 제한된 IoT 애플리케이션에는 MQTT보다 적합하지 않습니다.

MQTT 구조 가볍게 살펴보기

IoT장비는 일반적으로 메세지를 보내기도 하고 받기도한다. pub이자 sub이여야한다.

Client
클라이어트-서버 구조는 아니지만 MQTT Broker에 연결된 모든 것을 이야기 합니다.

Broker
Broker는 모든 메세지를 수신, 필터링, 메시지를 구독하는 사람 결정, 클라이언트에게 메세지를 보내는 역할을 한다.
즉, 특정 publisher가 메세지를 broker에게 전달하게 되면 subscriber에게 메세지를 전달한다.
publisher와 subscriber는 모두 broker와 통신하게 된다.

Publish
topic을 지정하여 topic을 subscribeㅏ고 있는 클라이언트(sub)에게 메세지를 보낸다.

Subscribe
topic을 구독하여 topic으로 publish된 메세지를 받는다.

Topic
전달하려는 신호 및 data의 종류를 topic으로 구분한다.
MQTT의 topic은 슬래쉬(/)를 분리 문자로 사용하여 폴더 및 파일과 유사한 계층 구조를 가진다.

ex)
house
house/room
house/room/main-light
house/room/left-light

MQTT가 동작하는 방식

위의 단어들이 의미하는 바를 파악했다면 MQTT가 어떻게 동작하는지 살펴봅시다.

  1. 클라이언트가 브로커에 연결됩니다. 클라이언트는 브로커의 모든 메세지의 topic을 구독 할 수 있습니다.
  2. 클라이언트는 메세지와 topic을 브로커에게 전송하여 topic과 연결된 메세지를 발행합니다.
  3. 그런다음 브로커는 해당 주제를 구독하는 모든 클라이언트에게 메세지를 전달합니다.

MQTT with python(feat.RabbitMQ)

이렇게 알아봤자 무슨 의미가 있을까요? 결국 코딩해봐야합니다.
python으로 sub/pub을 작성하고, broker로는 RabbitMQ를 사용하겠습니다.
RabbitMQ는 docker를 통해서 띄우도록 하겠습니다!

Running broker on docker

아래 명령어를 순서대로 입력하겠습니다. 도커에 대한 자세한 설명은 넘어가도록 하겠습니다.

$ docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 -p 1883:1883 rabbitmq:3 $ docker exec -it myrabbitmq /bin/bash root@container_id:/# rabbitmq-plugins enable rabbitmq_management root@container_id:/# rabbitmq-plugins enable rabbitmq_mqtt root@container_id:/# rabbitmq-plugins enable rabbitmq_web_mqtt root@container_id:/# rabbitmq-plugins enable rabbitmq_amqp1_0

$ docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 -p 1883:1883 rabbitmq:3 
$ docker exec -it myrabbitmq /bin/bash root@container_id:/# 

rabbitmq-plugins enable rabbitmq_management root@container_id:/# 
rabbitmq-plugins enable rabbitmq_mqtt root@container_id:/# 
rabbitmq-plugins enable rabbitmq_web_mqtt root@container_id:/# r
abbitmq-plugins enable rabbitmq_amqp1_0

Installing The Client

pip를 통해서 MQTT client를 설치합니다.

pip install paho-mqtt

paho-mqtt의 source code를 보면 paho mqtt client class의 method를 확인할 수 있습니다.
궁금한 내용이 있다면 소스코드에 설명이 자세하게 나와있으니 소스코드 확인을 강력 추천드립니다!

class Client(object): """MQTT version 3.1/3.1.1/5.0 client class. This is the main class for use communicating with an MQTT broker. General usage flow: * Use connect()/connect_async() to connect to a broker * Call loop() frequently to maintain network traffic flow with the broker * Or use loop_start() to set a thread running to call loop() for you. * Or use loop_forever() to handle calling loop() for you in a blocking * function. * Use subscribe() to subscribe to a topic and receive messages * Use publish() to send messages * Use disconnect() to disconnect from the broker ...

class Client(object): 
	"""MQTT version 3.1/3.1.1/5.0 client class. 
    
    This is the main class for use communicating with an MQTT broker. 
    
    General usage flow: 
    
    * Use connect()/connect_async() to connect to a broker 
    * Call loop() frequently to maintain network traffic flow with the broker 
    * Or use loop_start() to set a thread running to call loop() for you. 
    * Or use loop_forever() to handle calling loop() for you in a blocking 
    * function. 
    * Use subscribe() to subscribe to a topic and receive messages 
    * Use publish() to send messages 
    * Use disconnect() to disconnect from the broker
...
  

publisher.py

import random
import time
import paho.mqtt.client as mqtt_client


# broker 정보 #1
broker_address = "localhost"
broker_port = 1883

topic = "/python/mqtt"


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker")
        else:
            print(f"Failed to connect, Returned code: {rc}")

    def on_disconnect(client, userdata, flags, rc=0):
        print(f"disconnected result code {str(rc)}")

    def on_log(client, userdata, level, buf):
        print(f"log: {buf}")

    # client 생성 #2
    client_id = f"mqtt_client_{random.randint(0, 1000)}"
    client = mqtt_client.Client(client_id)

    # 콜백 함수 설정 #3
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_log = on_log

    # broker 연결 #4
    client.connect(host=broker_address, port=broker_port)
    return client


def publish(client: mqtt_client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start() #5
    print(f"connect to broker {broker_address}:{broker_port}")
    publish(client) #6

if __name__ == '__main__':
    run()
#1 broker 정보 입력하기
RabbitMQ를 docker로 띄운 상태입니다.
포트 포워딩을 통해서 local의 1883을 docker의 1883으로 연결했습니다.
#4에서 client를 broker로 연결합니다.

#2 client 생성
Client 생성자는 4개의 파라미터를 받습니다. 여기서 client_id는 필수값이고, 또한 유니크한 값이여야합니다.

Client(client_id=””, clean_session=True, userdata=None, protocol=MQTTv311, transport=”tcp”)

#1 broker 정보 입력하기

RabbitMQ를 docker로 띄운 상태입니다.
포트 포워딩을 통해서 local의 1883을 docker의 1883으로 연결했습니다.
#4에서 client를 broker로 연결합니다.

#2 client 생성

Client 생성자는 4개의 파라미터를 받습니다. 여기서 client_id는 필수값이고, 또한 유니크한 값이여야합니다.

Client(client_id=””, clean_session=True, userdata=None, protocol=MQTTv311, transport=”tcp”)

ramdom package를 통해서 random한 client를 생성합니다.

#3 콜백함수 설정

client 클래스를 확인해보면 기본적인 메소드를 제공합니다. print 때문에 오버라이드를 진행했습니다.

#4 broker 연결

client의 connect method를 통해서 broker에 연결이 가능합니다.

#5 loop_start()

클래스를 열어서 확인해보면 다음과 같이 설명합니다

"""This is part of the threaded client interface. Call this once to start a new thread to process network traffic. This provides an alternative to repeatedly calling loop() yourself.
"""

loop_start를 통해서 쓰레드를 생성합니다.

#6 publish(client)

함수를 살펴보시면 알겠지만 client가 message를 publish하는 함수입니다. client.publish method를 통해서 실제 publish가 발생합니다. 궁금한 사항은 publish 함수를 살펴보세요!

subscriber.py

# broker 정보
broker_address = "localhost"
broker_port = 1883

topic = "/python/mqtt"


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker")
        else:
            print(f"Failed to connect, Returned code: {rc}")

    def on_disconnect(client, userdata, flags, rc=0):
        print(f"disconnected result code {str(rc)}")

    def on_log(client, userdata, level, buf):
        print(f"log: {buf}")

    # client 생성
    client_id = f"mqtt_client_{random.randint(0, 1000)}"
    client = mqtt_client.Client(client_id)

    # 콜백 함수 설정
    client.on_connect = on_connect
    client.on_disconnect = on_disconnect
    client.on_log = on_log

    # broker 연결
    client.connect(host=broker_address, port=broker_port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic) #1
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()

실행결과 보기

해당 python파일을 실행해보면 다음과 같은 결과가 나옵니다.

이렇게 MQTT를 알아보고 python으로 pub/sub을 구현해보았습니다.
궁금한 사항이 있으시다면 댓글로 남겨주세요 저도 함께 공부해보겠습니다^^


REF: MQTT 이해하기
REF: Getting to know MQTT
REF: python mqtt tutorial
REF: Comparison of Python MQTT clients
REF: MQTT 소개
REF: MQTT QoS
REF: How to use MQTT in Python (Paho)