/// © MiroZ 2024 #include "MqttService.h" #include #include "TaskMgr.h" #include "app_config.h" #include "Buffers.h" static const char * mqtt_broker = "mqtt-dev-server.westus2-1.ts.eventgrid.azure.net"; static const char * topic = "wellnuotopics/topic1"; static const int mqtt_port = 8883; static const char * TAG = "mqtts"; void MqttService::callback(char* topic, uint8_t * payload, uint32_t length) { ESP_LOGI(TAG, "Message arrived in topic: %s\n", topic); payload[length] = 0; ESP_LOGW(TAG, "%s", (char*)payload); } MqttService::MqttService(AppIF & app_if) : m_app_if(app_if) { } uint8_t buffer[256]; void MqttService::task() { while(true) { // if(m_app_if.getBuffer()->getSemaphore().take(5000)) // wait for the data to become available if(m_app_if.getBuffer()->waitForDataAvailable(5000)) { uint8_t len = 0; if(m_app_if.getBuffer()->getBlock(buffer, len)) { ESP_LOGI(TAG, "got data, len %d", len); while (!m_mqtt_client->connected()) { ESP_LOGI(TAG, "connecting to mqtt broker..."); if (m_mqtt_client->connect("Esp32 client", "client1-authn-ID", NULL)) { ESP_LOGI(TAG, "connected"); if(m_mqtt_client->subscribe("my_topic")) { ESP_LOGI(TAG, "subscribed"); } else { ESP_LOGE(TAG, "subscribe failed"); } } else { ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state()); delay(5000); } } while(true) { if(!m_mqtt_client->connected()) break; m_mqtt_client->loop(); ESP_LOGI(TAG, "publishing..."); // bool val = m_mqtt_client->publish(topic, "Hi I'm ESP32 ^^"); bool val = m_mqtt_client->publish(topic, buffer, len); if(val) { ESP_LOGI(TAG, "publish ok"); break; } else ESP_LOGE(TAG, "publish failed"); } } } else m_mqtt_client->loop(); } } void MqttService::start() { ESP_LOGW(TAG, "Starting mqtt service..."); m_esp_client = new WiFiClientSecure(); m_mqtt_client = new PubSubClient(*m_esp_client); m_esp_client->setCACert((const char*)server_cert); m_esp_client->setCertificate((const char *)client_cert); // for client verification m_esp_client->setPrivateKey((const char *)client_key); // for client verification m_mqtt_client->setServer(mqtt_broker, mqtt_port); m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3)); m_mqtt_client->setKeepAlive(30); m_mqtt_client->setSocketTimeout(30); m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE); }