H blog H blog
首页
  • 前端文章

    • 错误处理
  • 学习笔记

    • 个人站长
  • Mineadmin
  • Hyperf
  • 图床
  • Golang
  • Python
  • 技术文档
  • 小程序解包
  • Tor浏览器
  • 自建代理池
  • 物联网
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

jrndge

精通各种语言的hello word编写
首页
  • 前端文章

    • 错误处理
  • 学习笔记

    • 个人站长
  • Mineadmin
  • Hyperf
  • 图床
  • Golang
  • Python
  • 技术文档
  • 小程序解包
  • Tor浏览器
  • 自建代理池
  • 物联网
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 图床

  • golang框架

  • python使用

    • 使用python生成复杂token
    • 使用python搭建本地的chatts语音合成
    • 公司电脑显卡的全部信息
    • 谷歌免费的显卡使用
    • 使用Django后台
    • 使用python连接mqtt进行通讯
      • docker启动emqx
  • hyperf之mineadmin的使用

  • 把hyperf项目部署到centos服务器上

  • fastadmin使用thinkphp来搭建已有的项目

  • 物联网iot搭建MQTT对接

  • 代码部署生成秘钥

  • golang学习笔记

  • 后端
  • python使用
jrndge
2024-12-13
目录

使用python连接mqtt进行通讯

# docker启动emqx

各种端口及连接信息见

物联网iot搭建MQTT对接/01.基于EMQX平台自建MQTT服务器.md

使用python进行对接

import paho.mqtt.client as mqtt
import time
import json

# MQTT服务器的配置
MQTT_BROKER = "localhost"  # EMQX服务器地址
MQTT_PORT = 1883          # MQTT默认端口
MQTT_TOPIC = "testtopic/#"  # 订阅的主题

# 连接回调函数
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("成功连接到MQTT服务器")
        client.subscribe(MQTT_TOPIC)  # 订阅主题
    else:
        print(f"连接失败,返回码:{rc}")

# 接收消息的回调函数
def on_message(client, userdata, msg):
    print(f"收到消息 主题:{msg.topic} 消息:{msg.payload.decode()}")

# 订阅消息的方法
def subscribe_message():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    
    try:
        client.connect(MQTT_BROKER, MQTT_PORT, 60)
        client.loop_start()  # 启动循环,保持订阅状态
        return client
    except Exception as e:
        print(f"连接错误:{e}")
        return None

# 发布消息的方法
def publish_message(client, topic, message):
    try:
        # 发布消息
        result = client.publish(topic, message)
        status = result[0]
        if status == 0:
            print(f"成功发送消息到主题 {topic}")
        else:
            print(f"发送消息失败")
    except Exception as e:
        print(f"发布消息错误:{e}")

def main():
    # 创建并启动订阅客户端
    client = subscribe_message()
    
    if client:
        try:
            # 等待连接建立
            time.sleep(1)
            
            # 发布几条测试消息
            test_message = {
                "timestamp": time.time(),
                "value": "测试消息"
            }
            
            # 发布到不同的子主题
            publish_message(client, "testtopic/1", json.dumps(test_message))
            publish_message(client, "testtopic/2", "Hello EMQX!")
            
            # 保持程序运行
            while True:
                time.sleep(1)
                
        except KeyboardInterrupt:
            print("程序终止")
            client.loop_stop()
            client.disconnect()

if __name__ == "__main__":
    main()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
编辑 (opens new window)
上次更新: 2024/12/18, 17:45:13
使用Django后台
使用mineadmin

← 使用Django后台 使用mineadmin→

最近更新
01
免费的在线logo设计,uugai找了我好就
12-24
02
Untitled
12-24
03
使用vdoing搭建个人博客,然后申请一个免费域名,挂载到github后使用cloudflare代理全球加速
12-24
更多文章>
Theme by Vdoing | Copyright © 2008-2024 jrndge | MIT License | 渝ICP备2024034950号 | 渝公网安备45142202000030
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式