我被分配了一个关于 mqtt 的任务。任务是监听多个通道,获取值并对其进行操作。出现了一个问题:从客户端接收信息时,on_message函数不起作用。有一个发送后触发的函数,但是里面只发送了Client,我没有使用它。告诉我出了什么问题。传输数据的代码:
"""
Файл служит для определения точности вашего алгоритма
Для получения оценки точности, запустите файл на исполнение
"""
import json
from threading import Thread
import time
import paho.mqtt.client as paho
import eval as submission
# import solution as submission
def message_handler(client, userdata, msg):
topic = msg.topic
msg_data = msg.payload.decode()
# print(f"Received {topic}: {msg_data}")
if len(user_test_results) <= current_test_id:
user_test_results.append([])
user_test_results[current_test_id].append([topic, msg_data])
user_test_results = []
current_test_id = 0
def main():
global current_test_id
annot_file = "annotations.json"
with open(annot_file, 'r') as f:
data = json.load(f)
test_cases = data['test_cases']
test_results = data['test_results']
args = submission.setup()
Thread(
target=submission.main_loop,
args=(args,),
daemon=True
).start()
client = paho.Client()
client.on_message = message_handler
if client.connect("localhost", 1883, 60) != 0:
print("Couldn't connect to the mqtt broker")
exit(1)
client.subscribe("odd")
client.subscribe("test_multiply")
client.subscribe("test_addend")
correct = 0
for current_test_id, test_case in enumerate(test_cases):
for topic, msg in test_case:
client.publish(topic, msg, 0)
client.loop_write()
client.loop_read()
for _ in range(10):
client.loop_read()
time.sleep(0.1)
if test_results[current_test_id] == user_test_results[current_test_id]:
correct += 1
total_object = len(test_cases)
print(f"Из {total_object} тестов верны {correct}")
score = correct / total_object
print(f"Точность: {score:.2f}")
if __name__ == '__main__':
main()
我需要写入的文件是:
# -*- coding: utf-8 -*-
import paho.mqtt.client as paho
sum = 0
addend_numbers = 0
def setup():
"""
Функция для инициализации клиента MQTT, подписки на каналы,
создание callback-функций.
Выходные данные: что угодно, всё что вы захотите передать в функцию
main_loop после инициализации MQTT-клиента.
"""
client = paho.Client()
client.on_message = on_message
client.connect("127.0.0.1", 1883, 60)
client.subscribe([("*2", 0), ("*3", 0), ("addend", 0), ("command", 0), ("numbers", 0)])
def main_loop(paho_client):
"""
Функция с циклом для обработки входящих сообщений и отправки исходящих сообщений.
"""
# TODO: Отредактируйте эту функцию по своему усмотрению.
# Код проверки один раз вызовет функцию setup.
# Затем, для каждого теста будет вызывать функцию main_loop.
# Все пользовательские функции должны вызываться из вышеперечисленных.
pass
def on_message(client, userdata, msg):
print("Message received-> " + msg.topic + " " + str(msg.payload))
topic = msg.topic
msg_data = msg.payload.decode("utf-8")
if topic.endswith("2"):
number, channel_name = msg_data.split(";")
result = int(number) * 2
client.publish(channel_name, str(result))
elif topic.endswith("3"):
number, channel_name = msg_data.split(";")
result = int(number) * 3
client.publish(channel_name, str(result))
elif topic == "addend":
addend_numbers.append(int(msg_data))
elif topic == "command":
if addend_numbers:
total_sum = sum(addend_numbers)
client.publish(msg_data, str(total_sum))
addend_numbers.clear() # Очистка после отправки
elif topic == "numbers":
number = int(msg_data)
if number % 2 != 0: # Проверка на нечётность
client.publish("odd", str(number))
您不调用任何
loop函数。至少有 3 个用于不同的任务。例如,对于客户端,您可以使用loop_start()创建一个单独的线程,它将在其中侦听消息。它会自动重新启动。与 结合使用loop_stop()。有一个方法可以开始侦听,但同时会阻止该线程中的执行
loop_forever()。您可以使用
loop()指定锁定时间的特定于客户端的方法。我想这就是你正在寻找的