fixed a mqtt connecting/sending issue

This commit is contained in:
Miro Zmrzli 2024-08-19 15:37:38 -07:00
parent 060a831abd
commit 58d034173e
2 changed files with 65 additions and 32 deletions

View File

@ -48,40 +48,40 @@ void MqttService::task()
uint8_t len = 0; uint8_t len = 0;
if(m_app_if.getBuffer()->getBlock(buffer, len)) if(m_app_if.getBuffer()->getBlock(buffer, len))
{ {
char buffer[64]; char top[64];
uint8_t mac[6]; uint8_t mac[6];
WiFi.macAddress(mac); WiFi.macAddress(mac);
while (!m_mqtt_client->connected()) while (!m_mqtt_client->connected())
{ {
sprintf(buffer, "wh_%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); sprintf(top, "wh_%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
ESP_LOGI(TAG, "connecting to mqtt broker, dev id '%s'...", SETTINGS.mqtt.device_id); ESP_LOGI(TAG, "connecting to mqtt broker, dev id '%s'...", SETTINGS.mqtt.device_id);
if (m_mqtt_client->connect(buffer, SETTINGS.mqtt.device_id, NULL)) if (m_mqtt_client->connect(top, SETTINGS.mqtt.device_id, NULL))
{ {
try_connect_count = 0; try_connect_count = 0;
ESP_LOGI(TAG, "connected"); ESP_LOGI(TAG, "connected");
sprintf(buffer, "/%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); sprintf(top, "/%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
ESP_LOGI(TAG, "Subscribing to %s", buffer); ESP_LOGI(TAG, "Subscribing to %s", top);
if(m_mqtt_client->subscribe(buffer)) if(m_mqtt_client->subscribe(top))
ESP_LOGI(TAG, "subscribed"); ESP_LOGI(TAG, "subscribed");
else else
ESP_LOGE(TAG, "subscribe failed"); ESP_LOGE(TAG, "subscribe failed");
sprintf(buffer, "/%08x", SETTINGS.device.group_id); sprintf(top, "/%08x", SETTINGS.device.group_id);
ESP_LOGI(TAG, "Subscribing to %s", buffer); ESP_LOGI(TAG, "Subscribing to %s", top);
if(m_mqtt_client->subscribe(buffer)) if(m_mqtt_client->subscribe(top))
ESP_LOGI(TAG, "subscribed"); ESP_LOGI(TAG, "subscribed");
else else
ESP_LOGE(TAG, "subscribe failed"); ESP_LOGE(TAG, "subscribe failed");
} }
else else
{ {
try_connect_count++; if(++try_connect_count > 100)
if(try_connect_count > 100)
esp_restart(); esp_restart();
ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state()); ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state());
delay(5000); delay(5000);
} }
@ -127,8 +127,8 @@ void MqttService::start()
m_mqtt_client->setServer(mqtt_broker, mqtt_port); m_mqtt_client->setServer(mqtt_broker, mqtt_port);
m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3)); m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3));
m_mqtt_client->setKeepAlive(50); m_mqtt_client->setKeepAlive(40);
m_mqtt_client->setSocketTimeout(50); m_mqtt_client->setSocketTimeout(40);
m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE); m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE);
} }

View File

@ -7,11 +7,16 @@ mqtt_broker = "mqtt-dev-server.westus2-1.ts.eventgrid.azure.net"
mqtt_port = 8883 mqtt_port = 8883
mqtt_topic = "wellnuotopics/topic1" mqtt_topic = "wellnuotopics/topic1"
mqtt_client_id = "client1-authn-ID" mqtt_client_id = "client1-authn-ID"
mqtt_client_id = "292" #mqtt_client_id = "292"
connected = False connected = False
def connect_mqtt(client_id): monitor = ["64B70889043C", "64B70888F8C0", "64B70888FAD0", "64B70888FAC8"]
macs = []
hist = {}
def connect_mqtt():
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
global connected global connected
if rc == 0: if rc == 0:
@ -27,31 +32,59 @@ def connect_mqtt(client_id):
keyfile='../certs/client1-authn-ID.key' keyfile='../certs/client1-authn-ID.key'
) )
client.username_pw_set(client_id, "") def on_message(client, userdata, message):
bin = message.payload
secs = bin[0] + bin[1]*256 + bin[2]*256*256 + bin[3]*256*256*256
mac = ''.join('{:02x}'.format(x) for x in bin[8:14])
grp_id = bin[14] + bin[15]*256 + bin[16]*256*256 + bin[17]*256*256*256
if not mac in macs:
macs.append(mac)
print (f"{len(macs)} {mac} {grp_id}")
if not mac in hist:
hist[mac] = 0
else:
hist[mac] += 1
if mac in monitor:
print(f"*** {mac} ****")
client.username_pw_set(mqtt_client_id, None)
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_broker, mqtt_port) client.connect(mqtt_broker, mqtt_port)
client.subscribe(mqtt_topic)
return client return client
def publish(client, topic, msg): def publish(client, topic, msg):
result = client.publish(topic, msg) result = client.publish(topic, msg)
# result: [0, 1] # result: [0, 1]
status = result[0] # status = result[0]
if status == 0: # if status == 0:
print(f"Sent `{msg}` to topic `{topic}`") # print(f"Sent `{msg}` to topic `{topic}`")
else: # else:
print(f"Failed to send message to topic {topic}") # print(f"Failed to send message to topic {topic}")
def main() -> None: def main() -> None:
client = connect_mqtt(mqtt_client_id) client = connect_mqtt()
client.loop_start() client.loop_start()
while(connected == False): while(connected == False):
time.sleep(1) time.sleep(1)
publish(client, "/000013ed", "s") # for n in range(100000000):
time.sleep(2) # publish(client, f"/{monitor[0]}", "pin|7856")
client.disconnect() # publish(client, f"/{monitor[0]}", "s")
time.sleep(300)
# for key in hist:
# print(f"{key} : {hist[key]}")
client.disconnect()
if __name__ == "__main__": if __name__ == "__main__":
main() main()