From 5fe44ccd9236cf4a489fe47d73bc201fe41d3da2 Mon Sep 17 00:00:00 2001 From: Sergei Date: Mon, 26 Jan 2026 19:17:18 -0800 Subject: [PATCH] Integrate MQTT with notification settings service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Integrate mqtt.js with notifications.js for push notification sending - Add notification type detection (emergency, activity, low_battery) - Check user notification settings before sending pushes - Add beneficiary_id to getUsersForDeployment SQL query - Fix express-rate-limit IPv6 validation error - Remove unused Expo SDK import from mqtt.js 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- backend/src/routes/auth.js | 4 +- backend/src/services/mqtt.js | 76 +++++++++++++++--------------------- 2 files changed, 34 insertions(+), 46 deletions(-) diff --git a/backend/src/routes/auth.js b/backend/src/routes/auth.js index 856de9a..8003c5e 100644 --- a/backend/src/routes/auth.js +++ b/backend/src/routes/auth.js @@ -22,7 +22,7 @@ const verifyOtpLimiter = rateLimit({ message: { error: 'Too many verification attempts. Please try again in 15 minutes.' }, standardHeaders: true, legacyHeaders: false, - validate: { xForwardedForHeader: false }, // Disable IPv6 validation warning + validate: { ip: false, xForwardedForHeader: false }, // Disable IP validation }); // Rate limiter for OTP request: 3 attempts per 15 minutes per email/IP @@ -40,7 +40,7 @@ const requestOtpLimiter = rateLimit({ message: { error: 'Too many OTP requests. Please try again in 15 minutes.' }, standardHeaders: true, legacyHeaders: false, - validate: { xForwardedForHeader: false }, // Disable IPv6 validation warning + validate: { ip: false, xForwardedForHeader: false }, // Disable IP validation }); /** diff --git a/backend/src/services/mqtt.js b/backend/src/services/mqtt.js index 5e2a8e4..eb1229f 100644 --- a/backend/src/services/mqtt.js +++ b/backend/src/services/mqtt.js @@ -13,7 +13,6 @@ const mqtt = require('mqtt'); const { pool } = require('../config/database'); -const { Expo } = require('expo-server-sdk'); const { sendPushNotifications: sendNotificationsWithSettings, NotificationType } = require('./notifications'); // MQTT Configuration @@ -21,9 +20,6 @@ const MQTT_BROKER = process.env.MQTT_BROKER || 'mqtt://mqtt.eluxnetworks.net:188 const MQTT_USER = process.env.MQTT_USER || 'anandk'; const MQTT_PASSWORD = process.env.MQTT_PASSWORD || 'anandk_8'; -// Expo Push Client -const expo = new Expo(); - // Store for received alerts (in-memory, last 100) const alertsCache = []; const MAX_ALERTS_CACHE = 100; @@ -194,6 +190,7 @@ async function getUsersForDeployment(deploymentId) { u.email, pt.token as push_token, b.name as beneficiary_name, + bd.beneficiary_id, ua.role FROM beneficiary_deployments bd JOIN user_access ua ON ua.beneficiary_id = bd.beneficiary_id @@ -212,61 +209,52 @@ async function getUsersForDeployment(deploymentId) { } /** - * Send push notifications for an alert + * Send push notifications for an alert (uses notifications.js with settings check) */ async function sendPushNotifications(alert) { const users = await getUsersForDeployment(alert.deploymentId); if (users.length === 0) { - console.log(`[MQTT] No push tokens found for deployment ${alert.deploymentId}`); + console.log(`[MQTT] No users found for deployment ${alert.deploymentId}`); return; } - console.log(`[MQTT] Sending push to ${users.length} users for deployment ${alert.deploymentId}`); + // Get unique user IDs + const userIds = [...new Set(users.map(u => u.user_id))]; - const messages = []; + // Get first beneficiary info for the notification + const beneficiaryName = users[0]?.beneficiary_name || 'Beneficiary'; + const beneficiaryId = users[0]?.beneficiary_id || null; - for (const user of users) { - // Validate token format - if (!Expo.isExpoPushToken(user.push_token)) { - console.log(`[MQTT] Invalid push token for user ${user.email}: ${user.push_token}`); - continue; - } + // Determine notification type based on alert content + let notificationType = NotificationType.ACTIVITY; + const bodyLower = (alert.body || '').toLowerCase(); - // Build notification message - const beneficiaryName = user.beneficiary_name || 'Beneficiary'; - - messages.push({ - to: user.push_token, - sound: 'default', - title: `Alert: ${beneficiaryName}`, - body: alert.body || 'New sensor alert', - data: { - type: 'mqtt_alert', - deploymentId: alert.deploymentId, - alertId: alert.id, - command: alert.command, - }, - priority: 'high', - }); + if (bodyLower.includes('emergency') || bodyLower.includes('fall') || bodyLower.includes('sos')) { + notificationType = NotificationType.EMERGENCY; + } else if (bodyLower.includes('battery') || bodyLower.includes('low power')) { + notificationType = NotificationType.LOW_BATTERY; } - if (messages.length === 0) { - console.log('[MQTT] No valid push tokens to send to'); - return; - } + console.log(`[MQTT] Sending ${notificationType} notification to ${userIds.length} users for deployment ${alert.deploymentId}`); - // Send in chunks (Expo limit) - const chunks = expo.chunkPushNotifications(messages); + // Use the new notifications service with settings check + const result = await sendNotificationsWithSettings({ + userIds, + title: `Alert: ${beneficiaryName}`, + body: alert.body || 'New sensor alert', + type: notificationType, + beneficiaryId, + data: { + source: 'mqtt_alert', + deploymentId: alert.deploymentId, + alertId: alert.id, + command: alert.command, + }, + channelId: notificationType === NotificationType.EMERGENCY ? 'emergency' : 'default', + }); - for (const chunk of chunks) { - try { - const receipts = await expo.sendPushNotificationsAsync(chunk); - console.log(`[MQTT] ✅ Push sent:`, receipts); - } catch (e) { - console.error('[MQTT] ❌ Push failed:', e.message); - } - } + console.log(`[MQTT] Notification result: ${result.sent} sent, ${result.skipped} skipped, ${result.failed} failed`); } /**