107 lines
2.6 KiB
C++
107 lines
2.6 KiB
C++
/// © MiroZ 2024
|
|
|
|
#include "MqttService.h"
|
|
|
|
#include <esp_log.h>
|
|
#include "TaskMgr.h"
|
|
#include "app_config.h"
|
|
#include "Buffers.h"
|
|
|
|
static const char * mqtt_broker = "mqtt-dev-server.westus2-1.ts.eventgrid.azure.net";
|
|
static const char * topic = "wellnuotopics/topic1";
|
|
static const int mqtt_port = 8883;
|
|
|
|
static const char * TAG = "mqtts";
|
|
|
|
void MqttService::callback(char* topic, uint8_t * payload, uint32_t length)
|
|
{
|
|
ESP_LOGI(TAG, "Message arrived in topic: %s\n", topic);
|
|
payload[length] = 0;
|
|
ESP_LOGW(TAG, "%s", (char*)payload);
|
|
}
|
|
|
|
MqttService::MqttService(AppIF & app_if) : m_app_if(app_if)
|
|
{
|
|
|
|
}
|
|
|
|
uint8_t buffer[256];
|
|
|
|
void MqttService::task()
|
|
{
|
|
while(true)
|
|
{
|
|
// if(m_app_if.getBuffer()->getSemaphore().take(5000)) // wait for the data to become available
|
|
if(m_app_if.getBuffer()->waitForDataAvailable(5000))
|
|
{
|
|
uint8_t len = 0;
|
|
if(m_app_if.getBuffer()->getBlock(buffer, len))
|
|
{
|
|
ESP_LOGI(TAG, "got data, len %d", len);
|
|
|
|
while (!m_mqtt_client->connected())
|
|
{
|
|
ESP_LOGI(TAG, "connecting to mqtt broker...");
|
|
if (m_mqtt_client->connect("Esp32 client", "client1-authn-ID", NULL))
|
|
{
|
|
ESP_LOGI(TAG, "connected");
|
|
if(m_mqtt_client->subscribe("my_topic"))
|
|
{
|
|
ESP_LOGI(TAG, "subscribed");
|
|
}
|
|
else
|
|
{
|
|
ESP_LOGE(TAG, "subscribe failed");
|
|
}
|
|
}
|
|
else
|
|
{
|
|
ESP_LOGE(TAG, "failed with state %d", m_mqtt_client->state());
|
|
delay(5000);
|
|
}
|
|
}
|
|
|
|
while(true)
|
|
{
|
|
if(!m_mqtt_client->connected())
|
|
break;
|
|
|
|
m_mqtt_client->loop();
|
|
|
|
ESP_LOGI(TAG, "publishing...");
|
|
// bool val = m_mqtt_client->publish(topic, "Hi I'm ESP32 ^^");
|
|
bool val = m_mqtt_client->publish(topic, buffer, len);
|
|
|
|
if(val)
|
|
{
|
|
ESP_LOGI(TAG, "publish ok");
|
|
break;
|
|
}
|
|
else
|
|
ESP_LOGE(TAG, "publish failed");
|
|
}
|
|
}
|
|
}
|
|
else
|
|
m_mqtt_client->loop();
|
|
}
|
|
}
|
|
|
|
void MqttService::start()
|
|
{
|
|
ESP_LOGW(TAG, "Starting mqtt service...");
|
|
|
|
m_esp_client = new WiFiClientSecure();
|
|
m_mqtt_client = new PubSubClient(*m_esp_client);
|
|
|
|
m_esp_client->setCACert((const char*)server_cert);
|
|
m_esp_client->setCertificate((const char *)client_cert); // for client verification
|
|
m_esp_client->setPrivateKey((const char *)client_key); // for client verification
|
|
|
|
m_mqtt_client->setServer(mqtt_broker, mqtt_port);
|
|
m_mqtt_client->setCallback(std::bind(&MqttService::callback, this, _1, _2, _3));
|
|
m_mqtt_client->setKeepAlive(30);
|
|
m_mqtt_client->setSocketTimeout(30);
|
|
|
|
m_task = TaskMgr::getInstance().createTask(std::bind(&MqttService::task, this), MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, MQTT_TASK_PRIORITY, MQTT_TASK_CORE);
|
|
} |