MQTT 初体验 – umqtt.simple@MicroPython

目录 Content
[hide]

前段时间 LT 入手了 NodeMCU,见《NodeMCU 开发板》,于是就琢磨着怎么能搞一个具体的应用,之后发现了这篇《MicroPython使用MQTT协议接入OneNET云平台》,按照文中介绍,体验了一把 MQTT。

一、oneNet 平台

感谢中移物联网提供了这么一个平台 https://open.iot.10086.cn/,官方关于MQTT的介绍见:https://open.iot.10086.cn/doc/art253.html#68

《MQTT基础入门(一) 》提到 “ OneNET平台对Mqtt的实现兼容Mqtt标准协议V3.1.1的大部分内容,同时也有一定的差异性,官网文档“设备终端接入协议4-MQTT.docx”对差异性作了详细说明。 文档下载地址:http://open.iot.10086.cn/doc/art194.html#43 ”。

1.MQTT 连接要素

对于 MQTT 协议,需要关注几个概念,那么使用就清晰了:

  • oneNet 的 MQTT服务器:183.230.40.39:6002
  • 产品 ID : 一个产品下面有若干设备。对应MQTT客户端里的用户名。
  • 主题 TOPIC:各个设备订阅和发布的主题。
  • 设备 ID:设备,订阅者或发布者。
  • 鉴权信息:用于识别设备身份。也可用设备的APIKey。对应MQTT客户端里的密码。

2. oneNet 连接超时

oneNet 服务器会在客户端无数据交换超时300秒后关闭连接。这时候就要用到 ping 了。

二、MQTT 测试工具

有一款得心应手的测试工具那将是事半功倍,最初尝试使用 MQTT Lens,但是发现无法成功连接 oneNet。后来遇到 eclipse paho 发现挺好用的。

1. eclipse paho

2. iOS Mqttt

Chenhsin Wong 的这款 App,可以连接oneNet. 可以在 Appstore 里搜索 Mqtttt

三、代码

以下代码用MQTT协议连接oneNet后,订阅 test 主题,并且每隔4秒,发布 test 主题内容 toggle , ESP8266 收到后点亮或熄灭 GPIO02。

1. 连接 WiFi :wifi_lt.py

def connect():
        import network
        sta_if = network.WLAN(network.STA_IF)
        ap_if = network.WLAN(network.AP_IF)
        if ap_if.active():
                ap_if.active(False)
        if not sta_if.isconnected():
                print('connecting to LT_AF0F_2 WiFi network...')
        sta_if.active(True)
        
        #WiFi SSID & PASSWORD
        sta_if.connect('SSID', 'PASSWORD')
        
        while not sta_if.isconnected():
                pass
        print('network config:', sta_if.ifconfig())

2.MQTT 客户端:mqtt_lt.py

from umqtt.simple import MQTTClient
from machine import Pin
import machine
import micropython
from machine import Timer

#LED
pin = Pin(2, Pin.OUT, value=0)

#oneNet Server & Port
server = '183.230.40.39'
port = 6002

#product ID
username='XXXXX'

#topic
TOPIC = 'XXXXX'

#client ID
clientID = 'XXXXX'

#client APIKey:
password='XXXXX'

#ping  unit: seconds
livebit_timeout = 4000


state = 0
def sub_cb(topic, msg):
    global state
    print((topic, msg))
    if msg == b"on":
        pin.value(0)
        state = 1
    elif msg == b"off":
        pin.value(1)
        state = 0
    elif msg == b"toggle":
        state = 1 - state
        pin.value(state)  
       

def main():
    c = MQTTClient(clientID,server,port,username,password)
    c.set_callback(sub_cb)
    c.connect()
    c.subscribe(TOPIC)
    print("Connected to oneNet server: %s, subscribed to topic: %s " % (server, TOPIC))
    
    livebit = Timer(-1)
    livebit.init(period=livebit_timeout, mode=Timer.PERIODIC, callback=lambda t : c.publish(TOPIC,"toggle"))
    
    try:
        while 1:
            c.wait_msg()
    finally:                        
        c.disconnect()
        print("something wrong, disconnected!")
        pin.value(1)

3. 调用上面两个程序:lt.py

def do():
    print('staring wifi_lt.py...')
    import wifi_lt
    wifi_lt.connect()
    print('staring mqtt_lt.py...')
    import mqtt_lt
    mqtt_lt.main()

4. 通过main调用 lt.py:main.py

import ki
ki.do()

5. umqtt.simple 库

来自 https://github.com/micropython/micropython-lib/tree/master/umqtt.simple

import usocket as socket
import ustruct as struct
from ubinascii import hexlify

class MQTTException(Exception):
    pass

class MQTTClient:

    def __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0,
                 ssl=False, ssl_params={}):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False

    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))
        self.sock.write(s)

    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7f) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True):
        self.sock = socket.socket()
        addr = socket.getaddrinfo(self.server, self.port)[0][-1]
        self.sock.connect(addr)
        if self.ssl:
            import ussl
            self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\x04MQTT\x04\x02\0\0")

        sz = 10 + 2 + len(self.client_id)
        msg[6] = clean_session << 1
        if self.user is not None:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg[6] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[7] |= self.keepalive >> 8
            msg[8] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[6] |= self.lw_retain << 5

        i = 1
        while sz > 0x7f:
            premsg[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        premsg[i] = sz

        self.sock.write(premsg, i + 2)
        self.sock.write(msg)
        #print(hex(len(msg)), hexlify(msg, ":"))
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user is not None:
            self._send_str(self.user)
            self._send_str(self.pswd)
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1

    def disconnect(self):
        self.sock.write(b"\xe0\0")
        self.sock.close()

    def ping(self):
        self.sock.write(b"\xc0\0")

    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7f:
            pkt[i] = (sz & 0x7f) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt, i + 1)
        self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        self.sock.write(msg)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        #print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt)
        self._send_str(topic)
        self.sock.write(qos.to_bytes(1, "little"))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                #print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
        self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xf0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
        elif op & 6 == 4:
            assert 0

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
        self.sock.setblocking(False)
        return self.wait_msg()

 

四、延伸阅读

 

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.