使用python连接mqtt进行通讯
# docker启动emqx
各种端口及连接信息见
物联网iot搭建MQTT对接/01.基于EMQX平台自建MQTT服务器.md
使用python进行对接
import paho.mqtt.client as mqtt
import time
import json
# MQTT服务器的配置
MQTT_BROKER = "localhost" # EMQX服务器地址
MQTT_PORT = 1883 # MQTT默认端口
MQTT_TOPIC = "testtopic/#" # 订阅的主题
# 连接回调函数
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("成功连接到MQTT服务器")
client.subscribe(MQTT_TOPIC) # 订阅主题
else:
print(f"连接失败,返回码:{rc}")
# 接收消息的回调函数
def on_message(client, userdata, msg):
print(f"收到消息 主题:{msg.topic} 消息:{msg.payload.decode()}")
# 订阅消息的方法
def subscribe_message():
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect(MQTT_BROKER, MQTT_PORT, 60)
client.loop_start() # 启动循环,保持订阅状态
return client
except Exception as e:
print(f"连接错误:{e}")
return None
# 发布消息的方法
def publish_message(client, topic, message):
try:
# 发布消息
result = client.publish(topic, message)
status = result[0]
if status == 0:
print(f"成功发送消息到主题 {topic}")
else:
print(f"发送消息失败")
except Exception as e:
print(f"发布消息错误:{e}")
def main():
# 创建并启动订阅客户端
client = subscribe_message()
if client:
try:
# 等待连接建立
time.sleep(1)
# 发布几条测试消息
test_message = {
"timestamp": time.time(),
"value": "测试消息"
}
# 发布到不同的子主题
publish_message(client, "testtopic/1", json.dumps(test_message))
publish_message(client, "testtopic/2", "Hello EMQX!")
# 保持程序运行
while True:
time.sleep(1)
except KeyboardInterrupt:
print("程序终止")
client.loop_stop()
client.disconnect()
if __name__ == "__main__":
main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
编辑 (opens new window)
上次更新: 2024/12/18, 17:45:13
- 01
- 免费的在线logo设计,uugai找了我好就12-24
- 02
- Untitled12-24