wellhub_reloaded/main/MqttService.cpp

136 lines
3.5 KiB
C++

/// © MiroZ 2024
#include "MqttService.h"
#include <esp_log.h>
#include "TaskMgr.h"
#include "app_config.h"
#include "Buffers.h"
#include "Settings.h"
#include "CommandProcessor.h"
static const char * mqtt_broker = "wellnua.com";//"mqtt-dev-server.westus2-1.ts.eventgrid.azure.net";
static const char * topic = "wellnuotopics/topic1";
static const int mqtt_port = 8883;
static const char * TAG = "mqtts";
void MqttService::callback(char* topic, uint8_t * payload, uint32_t length)
{
ESP_LOGI(TAG, "Message arrived in topic: %s\n", topic);
m_app_if.getCommandProcessor()->take();
payload[length] = 0;
ESP_LOGW(TAG, "%s", (char*)payload);
char * buffer = m_app_if.getCommandProcessor()->process((char *)payload, strlen((char *)payload));
// reply to
ESP_LOGI(TAG, "%s", buffer);
m_mqtt_client->publish("/well_hub", buffer, strlen(buffer));
m_app_if.getCommandProcessor()->give();
}
MqttService::MqttService(AppIF & app_if) : m_app_if(app_if)
{
}
uint8_t buffer[256];
void MqttService::task()
{
int try_connect_count = 0;
while(true)
{
if(m_app_if.getBuffer()->waitForDataAvailable(1000))
{
uint8_t len = 0;
if(m_app_if.getBuffer()->getBlock(buffer, len))
{
char top[64];
uint8_t mac[6];
WiFi.macAddress(mac);
while (!m_mqtt_client->connected())
{
sprintf(top, "wh_%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
ESP_LOGI(TAG, "connecting to mqtt broker, dev id '%s'...", SETTINGS.mqtt.device_id);
//if (m_mqtt_client->connect(top, SETTINGS.mqtt.device_id, NULL))
if (m_mqtt_client->connect(top, "well_user", "We3l1_best!"))
{
try_connect_count = 0;
ESP_LOGI(TAG, "connected");
sprintf(top, "/%02X%02X%02X%02X%02X%02X", mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]);
ESP_LOGI(TAG, "Subscribing to %s", top);
if(m_mqtt_client->subscribe(top))
ESP_LOGI(TAG, "subscribed");
else
ESP_LOGE(TAG, "subscribe failed");
sprintf(top, "/%08x", SETTINGS.device.group_id);
ESP_LOGI(TAG, "Subscribing to %s", top);
if(m_mqtt_client->subscribe(top))
ESP_LOGI(TAG, "subscribed");
else
ESP_LOGE(TAG, "subscribe failed");
}
else
{
if(++try_connect_count > 100)
esp_restart();
ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state());
delay(5000);
}
}
while(true)
{
if(!m_mqtt_client->connected())
break;
m_mqtt_client->loop();
ESP_LOGI(TAG, "publishing...");
bool val = m_mqtt_client->publish(topic, buffer, len);
if(val)
{
ESP_LOGI(TAG, "publish ok");
break;
}
else
ESP_LOGE(TAG, "publish failed");
}
}
}
if(m_mqtt_client->connected())
{
m_mqtt_client->loop();
}
}
}
void MqttService::start()
{
ESP_LOGW(TAG, "Starting mqtt service...");
m_esp_client = new WiFiClientSecure();
m_mqtt_client = new PubSubClient(*m_esp_client);
m_esp_client->setCACert((const char*)server_cert);
ESP_LOGI(TAG, "%s", server_cert);
// m_esp_client->setCertificate((const char *)client_cert); // for client verification
// m_esp_client->setPrivateKey((const char *)client_key); // for client verification
m_mqtt_client->setServer(mqtt_broker, mqtt_port);
m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3));
m_mqtt_client->setKeepAlive(40);
m_mqtt_client->setSocketTimeout(40);
m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE);
}