diff --git a/main/MqttService.cpp b/main/MqttService.cpp index 31590f2..4f3b4c1 100644 --- a/main/MqttService.cpp +++ b/main/MqttService.cpp @@ -48,40 +48,40 @@ void MqttService::task() uint8_t len = 0; if(m_app_if.getBuffer()->getBlock(buffer, len)) { - char buffer[64]; + char top[64]; uint8_t mac[6]; WiFi.macAddress(mac); while (!m_mqtt_client->connected()) { - sprintf(buffer, "wh_%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); + sprintf(top, "wh_%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); ESP_LOGI(TAG, "connecting to mqtt broker, dev id '%s'...", SETTINGS.mqtt.device_id); - if (m_mqtt_client->connect(buffer, SETTINGS.mqtt.device_id, NULL)) + if (m_mqtt_client->connect(top, SETTINGS.mqtt.device_id, NULL)) { try_connect_count = 0; ESP_LOGI(TAG, "connected"); - sprintf(buffer, "/%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); + sprintf(top, "/%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]); - ESP_LOGI(TAG, "Subscribing to %s", buffer); - if(m_mqtt_client->subscribe(buffer)) + ESP_LOGI(TAG, "Subscribing to %s", top); + if(m_mqtt_client->subscribe(top)) ESP_LOGI(TAG, "subscribed"); else ESP_LOGE(TAG, "subscribe failed"); - sprintf(buffer, "/%08x", SETTINGS.device.group_id); - ESP_LOGI(TAG, "Subscribing to %s", buffer); - if(m_mqtt_client->subscribe(buffer)) + sprintf(top, "/%08x", SETTINGS.device.group_id); + ESP_LOGI(TAG, "Subscribing to %s", top); + if(m_mqtt_client->subscribe(top)) ESP_LOGI(TAG, "subscribed"); else ESP_LOGE(TAG, "subscribe failed"); } else { - try_connect_count++; - if(try_connect_count > 100) + if(++try_connect_count > 100) esp_restart(); + ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state()); delay(5000); } @@ -127,8 +127,8 @@ void MqttService::start() m_mqtt_client->setServer(mqtt_broker, mqtt_port); m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3)); - m_mqtt_client->setKeepAlive(50); - m_mqtt_client->setSocketTimeout(50); + m_mqtt_client->setKeepAlive(40); + m_mqtt_client->setSocketTimeout(40); m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE); } \ No newline at end of file diff --git a/utils/mqtt.py b/utils/mqtt.py index cd7acf4..88a242f 100755 --- a/utils/mqtt.py +++ b/utils/mqtt.py @@ -7,11 +7,16 @@ mqtt_broker = "mqtt-dev-server.westus2-1.ts.eventgrid.azure.net" mqtt_port = 8883 mqtt_topic = "wellnuotopics/topic1" mqtt_client_id = "client1-authn-ID" -mqtt_client_id = "292" +#mqtt_client_id = "292" connected = False -def connect_mqtt(client_id): +monitor = ["64B70889043C", "64B70888F8C0", "64B70888FAD0", "64B70888FAC8"] + +macs = [] +hist = {} + +def connect_mqtt(): def on_connect(client, userdata, flags, rc): global connected if rc == 0: @@ -26,32 +31,60 @@ def connect_mqtt(client_id): certfile='../certs/client1-authn-ID.pem', keyfile='../certs/client1-authn-ID.key' ) + + def on_message(client, userdata, message): + bin = message.payload + secs = bin[0] + bin[1]*256 + bin[2]*256*256 + bin[3]*256*256*256 + mac = ''.join('{:02x}'.format(x) for x in bin[8:14]) - client.username_pw_set(client_id, "") + grp_id = bin[14] + bin[15]*256 + bin[16]*256*256 + bin[17]*256*256*256 + + if not mac in macs: + macs.append(mac) + print (f"{len(macs)} {mac} {grp_id}") + + if not mac in hist: + hist[mac] = 0 + else: + hist[mac] += 1 + + if mac in monitor: + print(f"*** {mac} ****") + + client.username_pw_set(mqtt_client_id, None) client.on_connect = on_connect + client.on_message = on_message client.connect(mqtt_broker, mqtt_port) + client.subscribe(mqtt_topic) return client def publish(client, topic, msg): - result = client.publish(topic, msg) - # result: [0, 1] - status = result[0] - if status == 0: - print(f"Sent `{msg}` to topic `{topic}`") - else: - print(f"Failed to send message to topic {topic}") + result = client.publish(topic, msg) + # result: [0, 1] + # status = result[0] + # if status == 0: + # print(f"Sent `{msg}` to topic `{topic}`") + # else: + # print(f"Failed to send message to topic {topic}") def main() -> None: - client = connect_mqtt(mqtt_client_id) - client.loop_start() - - while(connected == False): - time.sleep(1) + client = connect_mqtt() + client.loop_start() + + while(connected == False): + time.sleep(1) - publish(client, "/000013ed", "s") - time.sleep(2) - client.disconnect() + # for n in range(100000000): + # publish(client, f"/{monitor[0]}", "pin|7856") + # publish(client, f"/{monitor[0]}", "s") + + time.sleep(300) + + # for key in hist: + # print(f"{key} : {hist[key]}") + + client.disconnect() if __name__ == "__main__": - main() + main()