/** * MQTT Service for WellNuo Backend * * Connects to mqtt.eluxnetworks.net and listens for alerts * from WellNuo IoT devices (sensors). * * Topic format: /well_{deployment_id} * Message format: { Command: "REPORT", body: "alert text", time: unix_timestamp } * * Auto-subscribes to ALL active deployments from database * Sends push notifications to users with access to each deployment */ const mqtt = require('mqtt'); const { pool } = require('../config/database'); const { sendPushNotifications: sendNotificationsWithSettings, NotificationType } = require('./notifications'); // MQTT Configuration const MQTT_BROKER = process.env.MQTT_BROKER || 'mqtt://mqtt.eluxnetworks.net:1883'; const MQTT_USER = process.env.MQTT_USER || process.env.LEGACY_API_USERNAME || 'robster'; const MQTT_PASSWORD = process.env.MQTT_PASSWORD || process.env.LEGACY_API_PASSWORD || 'rob2'; // Store for received alerts (in-memory, last 100) const alertsCache = []; const MAX_ALERTS_CACHE = 100; // MQTT Client let client = null; let isConnected = false; let subscribedTopics = new Set(); /** * Initialize MQTT connection */ function init() { if (client) { console.log('[MQTT] Already initialized'); return; } console.log(`[MQTT] Connecting to ${MQTT_BROKER}...`); client = mqtt.connect(MQTT_BROKER, { username: MQTT_USER, password: MQTT_PASSWORD, clientId: `wellnuo-backend-${Date.now()}`, reconnectPeriod: 5000, // Reconnect every 5 seconds keepalive: 60, }); client.on('connect', () => { console.log('[MQTT] ✅ Connected to broker'); isConnected = true; // Resubscribe to all topics on reconnect subscribedTopics.forEach(topic => { client.subscribe(topic, (err) => { if (!err) { console.log(`[MQTT] Resubscribed to ${topic}`); } }); }); }); client.on('message', async (topic, payload) => { const timestamp = new Date().toISOString(); const messageStr = payload.toString(); console.log(`[MQTT] 📨 Message on ${topic}: ${messageStr}`); try { const message = JSON.parse(messageStr); // Extract deployment_id from topic (/well_21 -> 21) const deploymentId = parseInt(topic.replace('/well_', ''), 10); const alert = { id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, topic, deploymentId, command: message.Command || 'UNKNOWN', body: message.body || messageStr, messageTime: message.time ? new Date(message.time * 1000).toISOString() : null, receivedAt: timestamp, raw: message, }; // Add to cache alertsCache.unshift(alert); if (alertsCache.length > MAX_ALERTS_CACHE) { alertsCache.pop(); } // Process alert based on command await processAlert(alert); } catch (e) { console.log(`[MQTT] ⚠️ Non-JSON message: ${messageStr}`); // Still cache raw messages alertsCache.unshift({ id: `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`, topic, command: 'RAW', body: messageStr, receivedAt: timestamp, }); } }); client.on('error', (err) => { console.error('[MQTT] ❌ Error:', err.message); }); client.on('close', () => { console.log('[MQTT] 🔌 Connection closed'); isConnected = false; }); client.on('reconnect', () => { console.log('[MQTT] 🔄 Reconnecting...'); }); } /** * Process incoming alert */ async function processAlert(alert) { console.log(`[MQTT] Processing alert: ${alert.command} for deployment ${alert.deploymentId}`); // Handle different command types switch (alert.command) { case 'REPORT': // This is a sensor alert - could be emergency, activity, etc. await saveAlertToDatabase(alert); // Send push notification to users subscribed to this deployment await sendPushNotifications(alert); break; case 'CREDS': // Credential/device setup message - ignore for now console.log(`[MQTT] Ignoring CREDS message`); break; default: console.log(`[MQTT] Unknown command: ${alert.command}`); } } /** * Get all active deployments from database */ async function getAllActiveDeployments() { try { const result = await pool.query(` SELECT DISTINCT legacy_deployment_id FROM beneficiary_deployments WHERE legacy_deployment_id IS NOT NULL `); return result.rows.map(r => r.legacy_deployment_id); } catch (e) { console.error('[MQTT] Failed to get deployments from DB:', e.message); return []; } } /** * Subscribe to all active deployments from database */ async function subscribeToAllDeployments() { const deployments = await getAllActiveDeployments(); console.log(`[MQTT] Found ${deployments.length} active deployments:`, deployments); for (const deploymentId of deployments) { subscribeToDeployment(deploymentId); } return deployments; } /** * Get users with push tokens for a deployment */ async function getUsersForDeployment(deploymentId) { try { // Find all users who have access to beneficiaries linked to this deployment const result = await pool.query(` SELECT DISTINCT u.id as user_id, u.email, pt.token as push_token, b.name as beneficiary_name, bd.beneficiary_id, ua.role FROM beneficiary_deployments bd JOIN user_access ua ON ua.beneficiary_id = bd.beneficiary_id JOIN users u ON u.id = ua.accessor_id JOIN beneficiaries b ON b.id = bd.beneficiary_id LEFT JOIN push_tokens pt ON pt.user_id = u.id WHERE bd.legacy_deployment_id = $1 AND pt.token IS NOT NULL `, [deploymentId]); return result.rows; } catch (e) { console.error('[MQTT] Failed to get users for deployment:', e.message); return []; } } /** * Send push notifications for an alert (uses notifications.js with settings check) */ async function sendPushNotifications(alert) { const users = await getUsersForDeployment(alert.deploymentId); if (users.length === 0) { console.log(`[MQTT] No users found for deployment ${alert.deploymentId}`); return; } // Get unique user IDs const userIds = [...new Set(users.map(u => u.user_id))]; // Get first beneficiary info for the notification const beneficiaryName = users[0]?.beneficiary_name || 'Beneficiary'; const beneficiaryId = users[0]?.beneficiary_id || null; // Determine notification type based on alert content let notificationType = NotificationType.ACTIVITY; const bodyLower = (alert.body || '').toLowerCase(); if (bodyLower.includes('emergency') || bodyLower.includes('fall') || bodyLower.includes('sos')) { notificationType = NotificationType.EMERGENCY; } else if (bodyLower.includes('battery') || bodyLower.includes('low power')) { notificationType = NotificationType.LOW_BATTERY; } console.log(`[MQTT] Sending ${notificationType} notification to ${userIds.length} users for deployment ${alert.deploymentId}`); // Use the new notifications service with settings check const result = await sendNotificationsWithSettings({ userIds, title: `Alert: ${beneficiaryName}`, body: alert.body || 'New sensor alert', type: notificationType, beneficiaryId, data: { source: 'mqtt_alert', deploymentId: alert.deploymentId, alertId: alert.id, command: alert.command, }, channelId: notificationType === NotificationType.EMERGENCY ? 'emergency' : 'default', }); console.log(`[MQTT] Notification result: ${result.sent} sent, ${result.skipped} skipped, ${result.failed} failed`); } /** * Save alert to database */ async function saveAlertToDatabase(alert) { try { await pool.query(` INSERT INTO mqtt_alerts (deployment_id, command, body, message_time, received_at, raw_payload) VALUES ($1, $2, $3, $4, $5, $6) `, [ alert.deploymentId, alert.command, alert.body, alert.messageTime, alert.receivedAt, JSON.stringify(alert.raw) ]); console.log('[MQTT] ✅ Alert saved to database'); } catch (e) { // Table might not exist yet - that's ok if (e.code === '42P01') { console.log('[MQTT] mqtt_alerts table does not exist - skipping DB save'); } else { console.error('[MQTT] DB save error:', e.message); } } } /** * Subscribe to deployment alerts * @param {number} deploymentId - The deployment ID to subscribe to */ function subscribeToDeployment(deploymentId) { if (!client || !isConnected) { console.error('[MQTT] Not connected'); return false; } const topic = `/well_${deploymentId}`; if (subscribedTopics.has(topic)) { console.log(`[MQTT] Already subscribed to ${topic}`); return true; } client.subscribe(topic, (err) => { if (err) { console.error(`[MQTT] Failed to subscribe to ${topic}:`, err.message); return false; } console.log(`[MQTT] ✅ Subscribed to ${topic}`); subscribedTopics.add(topic); }); return true; } /** * Unsubscribe from deployment alerts */ function unsubscribeFromDeployment(deploymentId) { if (!client) return; const topic = `/well_${deploymentId}`; client.unsubscribe(topic); subscribedTopics.delete(topic); console.log(`[MQTT] Unsubscribed from ${topic}`); } /** * Subscribe to multiple deployments */ function subscribeToDeployments(deploymentIds) { deploymentIds.forEach(id => subscribeToDeployment(id)); } /** * Get recent alerts from cache */ function getRecentAlerts(limit = 50, deploymentId = null) { let alerts = alertsCache; if (deploymentId) { alerts = alerts.filter(a => a.deploymentId === deploymentId); } return alerts.slice(0, limit); } /** * Get connection status */ function getStatus() { return { connected: isConnected, broker: MQTT_BROKER, subscribedTopics: Array.from(subscribedTopics), cachedAlerts: alertsCache.length, }; } /** * Publish a test message (for testing) */ function publishTest(deploymentId, message) { if (!client || !isConnected) { console.error('[MQTT] Not connected'); return false; } const topic = `/well_${deploymentId}`; const payload = JSON.stringify({ Command: 'REPORT', body: message, time: Math.floor(Date.now() / 1000), }); client.publish(topic, payload); console.log(`[MQTT] 📤 Published to ${topic}: ${payload}`); return true; } /** * Graceful shutdown */ function shutdown() { if (client) { console.log('[MQTT] Shutting down...'); client.end(); client = null; isConnected = false; } } module.exports = { init, subscribeToDeployment, unsubscribeFromDeployment, subscribeToDeployments, subscribeToAllDeployments, getRecentAlerts, getStatus, publishTest, shutdown, };