well-svc-alert/well-alerts.py
2025-06-16 10:31:41 -07:00

4051 lines
167 KiB
Python

#!/usr/bin/env python3
import os
import time
import signal
import sys
import threading
from threading import Timer
import json
import logging
import datetime
from datetime import timezone, timedelta
import time
import psycopg2
import redis
from ast import literal_eval
from kombu import Connection, Exchange #, Queue, Consumer, Message
#from kombu.exceptions import TimeoutError
from dotenv import load_dotenv
import uuid
import subprocess
import shutil
import psutil
import requests
import copy
import re
import random
import traceback
#import math
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
import paho.mqtt.client as mqtt
import inspect
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import queue
import telnyx # For SMS
import pytz
#from datetime import datetime, timedelta
executors = {
'default': ThreadPoolExecutor(20), # Allow up to 20 concurrent tasks
'processpool': ProcessPoolExecutor(5) # 5 parallel processes for CPU-bound tasks
}
scheduler = BackgroundScheduler(executors=executors)
scheduler.start()
# Store job IDs for later reference
job_registry = {}
task_arg_registry = {}
time_zones_all = {}
# --- Configuration Loading ---
# Load environment variables from $HOME/.env
dotenv_path = os.path.join(os.environ.get("HOME", "."), '.env')
#load_dotenv(dotenv_path=dotenv_path)
load_dotenv('.env_rz')
# Logging Configuration
default_uuid = str(uuid.uuid4())
logger = logging.getLogger(default_uuid)
logger.propagate = False
logger.setLevel(logging.DEBUG) #DEBUG, INFO, WARNING, ERROR, and CRITICAL
handler = logging.StreamHandler()
#formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
formatter = logging.Formatter('%(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
#permament_users = ["node_red_status_brankol", "node_red_status_robster", "node_red_status_bknigge"]
permament_users = []
# Database Configuration
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT', '5432')
ADMIN_USER = os.getenv('ADMIN_USER')
ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD')
# Mosquitto files
MOSQUITTO_PASSWORD_FILE = os.getenv('MOSQUITTO_PASSWORD_FILE')
MOSQUITTO_ACL_FILE = os.getenv('MOSQUITTO_ACL_FILE')
# Redis Configuration
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PORT = int(os.getenv('REDIS_PORT'))
REDIS_DB = int(os.getenv('REDIS_DB'))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
# RabbitMQ Configuration
RABBITMQ_URL = os.getenv('RABBITMQ_URL')
RABBITMQ_ALERTS_QNAME = os.getenv('RABBITMQ_ALERTS_QNAME')
exchange = Exchange("", type='direct')
#MQTT Configuration
client = None
connected = False
in_queue = []
MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net"
mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port
mqtt_client_id = "client777"
use_tls = MQTTSERVER not in ["localhost", "127.0.0.1", "192.168.68.70"]
# Alerting Thresholds
DEFAULT_TIMEOUT_MINUTES = 48 * 60 # Default 48 hours in minutes
TIME_OUT_THRESHOLD_MIN = int(os.getenv('time_out_threshold', DEFAULT_TIMEOUT_MINUTES))
TIME_OUT_THRESHOLD_SEC = TIME_OUT_THRESHOLD_MIN * 60
TIME_TO_GET_OUT_SEC = 5#60
TIME_BETWEEN_ALERTS_SEC = 12 * 60 * 60 #12 hours
BASE_DIR = "/home/ubuntu/.node-red"
MAIN_INSTANCE_DIR = f"{BASE_DIR}/main_instance"
USERS_DIR = f"{BASE_DIR}/users"
SHARED_NODES_DIR = f"{BASE_DIR}/shared_nodes"
TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY")
TELNYX_MESSAGING_PROFILE_ID = os.environ.get("TELNYX_MESSAGING_PROFILE_ID")
TELNYX_CONNECTION_ID_VOICE = os.environ.get("TELNYX_CONNECTION_ID_VOICE")
TELNYX_WEBHOOK_URL_VOICE = os.environ.get("TELNYX_WEBHOOK_URL_VOICE")
TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID")# (for numeric SMS sender)
# TELNYX_SENDER_ID_ALPHA is needed if 'alpha' or 'auto' for non-US SMS is used
TELNYX_SENDER_ID_ALPHA = os.environ.get("TELNYX_SENDER_ID_ALPHA")# not set (optional for SMS if only numeric is used or for US destinations in auto mode).")
TELNYX_VOICE_URL = os.getenv("TELNYX_VOICE_URL")
JOBS_DIR = '/home/ubuntu/tts_jobs'
RESULTS_DIR = '/home/ubuntu/tts_results'
RESULTS_SHARE_DIR = '/mnt/data/shared/clips'
for directory in [MAIN_INSTANCE_DIR, USERS_DIR, SHARED_NODES_DIR]:
if not os.path.exists(directory):
os.makedirs(directory)
logger.info(f"Created directory: {directory}")
# Monitoring Interval
CHECK_INTERVAL_SECONDS = 1
local_daily_minute_last = 0
# --- Global Variables ---
db_conn = None
redis_conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
rabbit_conn = None
rabbit_channel = None
stop_event = threading.Event()
devices_config = [] # Dictionary to store device config {device_id: {config}}
location_names = {-1:"All",0:"?",5:"Office",6:"Hallway",7:"Garage",8:"Outside",9:"Conference Room",10:"Room",34:"Kitchen",
56:"Bedroom",78:"Living Room",102:"Bathroom",103:"Dining Room",104:"Bathroom Main",105:"Bathroom Guest",
106:"Bedroom Master", 107:"Bedroom Guest", 108:"Conference Room", 109:"Basement", 110:"Attic", 200:"Other"}
location_indexes = {}
for i in location_names:
location_indexes[location_names[i]] = i
alarms_settings_all = {}
device_alerts_all = {}
care_takers_all = {}
times_found_armed = {}
device_to_deployment = {}
deployment_id_list = []
deployments_devices = {}
mac_to_device_id = {}
devices_details_map = {}
def schedule_task(function, argument, minutes_from_now, job_id=None):
"""
Schedule a function to run with a specific argument after X minutes
If a job with the same function and argument is already scheduled:
- Only replace it if the new schedule is EARLIER than the existing one
- Ignore the new schedule if it's later than the existing one
Returns the job_id or None if scheduling was ignored
"""
run_time = datetime.datetime.now() + timedelta(minutes=minutes_from_now)
# Create a key based on function name and argument to track related jobs
task_key = f"{function.__name__}_{argument}"
# Check if a job with the same function and argument is already scheduled
if task_key in task_arg_registry:
existing_job_id = task_arg_registry[task_key]
existing_job_time = job_registry[existing_job_id]['scheduled_time']
# Only replace if the new time is earlier than the existing time
if run_time >= existing_job_time:
logger.info(f"Ignoring new schedule for {task_key} - existing schedule is earlier or same ({existing_job_time})")
return None
else:
# Cancel the existing job because we have an earlier time
cancel_task(existing_job_id)
logger.info(f"Replacing existing schedule for {task_key} with earlier time ({run_time})")
# If no job_id provided, create one based on time and argument
if job_id is None:
job_id = f"job_{task_key}_{datetime.datetime.now().timestamp()}"
# Check if the function accepts arguments
sig = inspect.signature(function)
param_count = len(sig.parameters)
# Schedule the job with or without arguments based on function signature
if param_count > 0:
# Function accepts parameters, pass the argument
job = scheduler.add_job(
function,
'date',
run_date=run_time,
args=[argument],
id=job_id,
misfire_grace_time=900
)
else:
# Function doesn't accept parameters, don't pass any arguments
job = scheduler.add_job(
function,
'date',
run_date=run_time,
id=job_id,
misfire_grace_time=900
)
# Store in both registries
job_registry[job_id] = {
'argument': argument,
'function_name': function.__name__,
'scheduled_time': run_time,
'job': job
}
# Store the job ID for this function/argument combo
task_arg_registry[task_key] = job_id
logger.info(f"Scheduled job '{job_id}' with argument '{argument}' to run at {run_time}")
return job_id
def cancel_task(job_id):
"""
Cancel a scheduled task by its job_id
"""
if job_id in job_registry:
function_name = job_registry[job_id]['function_name']
argument = job_registry[job_id]['argument']
task_key = f"{function_name}_{argument}"
# Remove from task registry if this is the current job for this task/arg
if task_key in task_arg_registry and task_arg_registry[task_key] == job_id:
del task_arg_registry[task_key]
# Remove the job from the scheduler
scheduler.remove_job(job_id)
logger.info(f"Cancelled job '{job_id}' with argument '{job_registry[job_id]['argument']}'")
# Remove from job registry
del job_registry[job_id]
return True
else:
logger.error(f"Job '{job_id}' not found")
return False
def list_scheduled_tasks():
"""
List all currently scheduled tasks
"""
logger.info("\nCurrently scheduled tasks:")
for job_id, details in job_registry.items():
time_remaining = (details['scheduled_time'] - datetime.now()).total_seconds() / 60
logger.info(f"Job: {job_id}, Function: {details['function_name']}, Argument: {details['argument']}, "
f"Running in: {time_remaining:.1f} minutes")
class perpetualTimer():
def __init__(self,t,hFunction):
self.t=t
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
self.thread.start()
def start(self):
self.thread.start()
def cancel(self):
self.thread.cancel()
def on_message(client_, userdata, msg): #message from GUI
global in_queue
#print(msg.topic+" "+str(msg.payload))
#msga = msg.payload.decode("ascii")
#print(msg.timestamp, msg.topic, msg.payload)
in_queue.append((str(time.time()), msg.topic, msg.payload))
def connect_mqtt(client_id):
global client, connected
# Add timeout for the entire operation
connection_timeout = 5 # 5 seconds max for the whole connection attempt
start_time = time.time()
try:
random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6))
unique_client_id = f"{random_suffix}-{int(time.time())}"
logger.info(f"Connecting {unique_client_id}")
# Determine certificate path based on environment
certificate_path = 'new-ca.crt'
mqtt_client = mqtt.Client(client_id=unique_client_id, protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
# Define callbacks with MQTTv5 signature
def on_connect_wrapper(client, userdata, flags, rc, properties=None):
global connected
if rc == 0:
connected = True
logger.info(f"Connected to {MQTTSERVER} with {unique_client_id}")
subbedTo = [("#",1)]
logger.info(f"subscribing to , {subbedTo}")
client.subscribe(subbedTo)
else:
logger.error(f"Failed to connect, return code {rc}")
connected = False
def on_disconnect_wrapper(client, userdata, rc, properties=None):
global connected
connected = False
logger.info(f"Disconnected with result code {rc}")
# Set the callbacks
mqtt_client.on_connect = on_connect_wrapper
mqtt_client.on_disconnect = on_disconnect_wrapper
mqtt_client.on_message = on_message
if use_tls:
certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt'
mqtt_client.tls_set(ca_certs=certificate_path)
# Set TLS with your certificate
#mqtt_client.tls_set(
# ca_certs=certificate_path,
#)
# Set username and password
mqtt_client.username_pw_set("well_user", "We3l1_best!")
# Connect with shorter timeout
mqtt_client.connect(MQTTSERVER, mqtt_port, keepalive=30)
# Start the loop in a background thread
mqtt_client.loop_start()
# Wait for connection with timeout
wait_end_time = min(start_time + connection_timeout, time.time() + 3)
while not connected and time.time() < wait_end_time:
time.sleep(0.1)
# Check if we connected successfully
if connected:
logger.info(f"MQTT connected successfully in {time.time() - start_time:.2f} seconds")
client = mqtt_client
return mqtt_client
else:
logger.info(f"MQTT connection attempt timed out after {time.time() - start_time:.2f} seconds")
mqtt_client.loop_stop()
connected = False
return None
except Exception as e:
connection_time = time.time() - start_time
logger.error(f"Connection error after {connection_time:.2f} seconds: {e}")
connected = False
return None
def ensure_mqtt_connection():
global client, mqtt_client_id, connected
if not connected:
logger.info("MQTT connection needed, attempting to connect...")
try:
if client:
try:
client.disconnect()
client.loop_stop()
except:
pass # Ignore errors on disconnect
client = connect_mqtt(mqtt_client_id)
# Give a moment for connection to establish
time.sleep(2)
return connected
except Exception as e:
logger.error(f"Reconnection error: {e}")
return False
return True # Already connected
class NodePackageHandler(FileSystemEventHandler):
"""Handles file system events related to Node-RED package installations."""
def __init__(self, source_type, source_path):
super().__init__()
self.source_type = source_type # 'main' or 'user'
self.source_path = source_path
self.source_name = os.path.basename(source_path) if source_type == 'user' else 'main'
def on_created(self, event):
# Only interested in directory creation events in node_modules
if not event.is_directory or 'node_modules' not in event.src_path:
return
# Ignore hidden directories and nested node_modules
package_dir = event.src_path
package_name = os.path.basename(package_dir)
if package_name.startswith('.'):
return
# Check if this is a top-level node module
parent_dir = os.path.dirname(package_dir)
if os.path.basename(parent_dir) == 'node_modules':
self.sync_package(package_name, package_dir)
def on_modified(self, event):
# Package.json modifications could indicate a package update
if not event.is_directory and os.path.basename(event.src_path) == 'package.json':
parent_dir = os.path.dirname(event.src_path)
if os.path.basename(os.path.dirname(parent_dir)) == 'node_modules':
package_name = os.path.basename(parent_dir)
self.sync_package(package_name, parent_dir)
def sync_package(self, package_name, package_dir):
"""Synchronize a Node-RED package when detected."""
try:
# Skip node core modules
if package_name in ['npm', 'node-red', '@node-red']:
return
logger.info(f"New or updated package detected: {package_name} in {self.source_type} instance ({self.source_name})")
# Get package version
package_json_path = os.path.join(package_dir, 'package.json')
if not os.path.exists(package_json_path):
logger.warning(f"No package.json found for {package_name}, skipping")
return
with open(package_json_path, 'r') as f:
package_data = json.load(f)
version = package_data.get('version', 'unknown')
# Destination in shared nodes directory
shared_package_dir = os.path.join(SHARED_NODES_DIR, 'node_modules', package_name)
# Check if package already exists in shared directory
if os.path.exists(shared_package_dir):
shared_json_path = os.path.join(shared_package_dir, 'package.json')
if os.path.exists(shared_json_path):
with open(shared_json_path, 'r') as f:
shared_data = json.load(f)
shared_version = shared_data.get('version', 'unknown')
# Skip if versions match
if shared_version == version:
logger.info(f"Package {package_name}@{version} already exists in shared directory")
return
# Log version change
logger.info(f"Updating {package_name} from {shared_version} to {version} in shared directory")
# Remove old version
shutil.rmtree(shared_package_dir)
else:
logger.info(f"Adding new package {package_name}@{version} to shared directory")
# Copy package to shared directory
os.makedirs(os.path.dirname(shared_package_dir), exist_ok=True)
shutil.copytree(package_dir, shared_package_dir)
# Set proper permissions
subprocess.run(['chmod', '-R', '755', shared_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', shared_package_dir])
# Synchronize to all user instances if from main, or to main if from user
if self.source_type == 'main':
# Sync to all user instances
self._sync_to_users(package_name, shared_package_dir)
else:
# Sync to main instance
self._sync_to_main(package_name, shared_package_dir)
logger.info(f"Successfully synchronized {package_name}@{version}")
except Exception as e:
logger.error(f"Error synchronizing package {package_name}: {str(e)}")
def _sync_to_users(self, package_name, source_dir):
"""Synchronize a package from shared nodes to all user instances."""
for user_dir in os.listdir(USERS_DIR):
user_path = os.path.join(USERS_DIR, user_dir)
if os.path.isdir(user_path):
user_modules_dir = os.path.join(user_path, 'node_modules')
user_package_dir = os.path.join(user_modules_dir, package_name)
# Create node_modules directory if it doesn't exist
os.makedirs(user_modules_dir, exist_ok=True)
# Remove old version if it exists
if os.path.exists(user_package_dir):
shutil.rmtree(user_package_dir)
# Copy package
shutil.copytree(source_dir, user_package_dir)
logger.info(f"Synchronized {package_name} to user {user_dir}")
# Set proper permissions
subprocess.run(['chmod', '-R', '755', user_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', user_package_dir])
def _sync_to_main(self, package_name, source_dir):
"""Synchronize a package from shared nodes to main instance."""
main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules')
main_package_dir = os.path.join(main_modules_dir, package_name)
# Create node_modules directory if it doesn't exist
os.makedirs(main_modules_dir, exist_ok=True)
# Remove old version if it exists
if os.path.exists(main_package_dir):
shutil.rmtree(main_package_dir)
# Copy package
shutil.copytree(source_dir, main_package_dir)
logger.info(f"Synchronized {package_name} to main instance")
# Set proper permissions
subprocess.run(['chmod', '-R', '755', main_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', main_package_dir])
def setup_watchers():
"""Set up directory watchers for main and user instances."""
observers = []
# Set up watcher for main instance
main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules')
if os.path.exists(main_modules_dir):
observer = Observer()
handler = NodePackageHandler('main', MAIN_INSTANCE_DIR)
observer.schedule(handler, main_modules_dir, recursive=True)
observers.append(observer)
logger.info(f"Watching main instance node_modules directory at {main_modules_dir}")
# Set up watchers for user instances
for user_dir in os.listdir(USERS_DIR):
user_path = os.path.join(USERS_DIR, user_dir)
if os.path.isdir(user_path):
user_modules_dir = os.path.join(user_path, 'node_modules')
if os.path.exists(user_modules_dir):
observer = Observer()
handler = NodePackageHandler('user', user_path)
observer.schedule(handler, user_modules_dir, recursive=True)
observers.append(observer)
logger.info(f"Watching user instance node_modules directory for {user_dir}")
# Start all observers
for observer in observers:
observer.start()
return observers
def sync_all_packages():
"""Perform a full synchronization of all packages."""
logger.info("Starting full package synchronization")
# First, collect all unique packages from main and user instances
all_packages = {}
# Check main instance
main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules')
if os.path.exists(main_modules_dir):
for package_name in os.listdir(main_modules_dir):
if not package_name.startswith('.') and os.path.isdir(os.path.join(main_modules_dir, package_name)):
package_dir = os.path.join(main_modules_dir, package_name)
package_json = os.path.join(package_dir, 'package.json')
if os.path.exists(package_json):
with open(package_json, 'r') as f:
try:
data = json.load(f)
version = data.get('version', 'unknown')
all_packages[package_name] = {'dir': package_dir, 'version': version, 'source': 'main'}
except json.JSONDecodeError:
logger.warning(f"Invalid package.json in {package_dir}")
# Check user instances
for user_dir in os.listdir(USERS_DIR):
user_path = os.path.join(USERS_DIR, user_dir)
if os.path.isdir(user_path):
user_modules_dir = os.path.join(user_path, 'node_modules')
if os.path.exists(user_modules_dir):
for package_name in os.listdir(user_modules_dir):
if not package_name.startswith('.') and os.path.isdir(os.path.join(user_modules_dir, package_name)):
package_dir = os.path.join(user_modules_dir, package_name)
package_json = os.path.join(package_dir, 'package.json')
if os.path.exists(package_json):
with open(package_json, 'r') as f:
try:
data = json.load(f)
version = data.get('version', 'unknown')
# Only add if not already found or if version is newer
if package_name not in all_packages or all_packages[package_name]['version'] < version:
all_packages[package_name] = {'dir': package_dir, 'version': version, 'source': f'user-{user_dir}'}
except json.JSONDecodeError:
logger.warning(f"Invalid package.json in {package_dir}")
# Ensure shared nodes directory and node_modules exist
shared_modules_dir = os.path.join(SHARED_NODES_DIR, 'node_modules')
os.makedirs(shared_modules_dir, exist_ok=True)
# Synchronize all packages to shared directory
for package_name, info in all_packages.items():
# Skip node core modules
if package_name in ['npm', 'node-red', '@node-red']:
continue
shared_package_dir = os.path.join(shared_modules_dir, package_name)
# Check if package already exists in shared directory
if os.path.exists(shared_package_dir):
shared_json_path = os.path.join(shared_package_dir, 'package.json')
if os.path.exists(shared_json_path):
with open(shared_json_path, 'r') as f:
try:
shared_data = json.load(f)
shared_version = shared_data.get('version', 'unknown')
# Skip if shared version is newer or equal
if shared_version >= info['version']:
logger.info(f"Keeping existing version {shared_version} of {package_name} in shared directory")
continue
except json.JSONDecodeError:
pass
# Copy package to shared directory
if os.path.exists(shared_package_dir):
shutil.rmtree(shared_package_dir)
shutil.copytree(info['dir'], shared_package_dir)
logger.info(f"Copied {package_name}@{info['version']} from {info['source']} to shared directory")
# Set proper permissions
subprocess.run(['chmod', '-R', '755', shared_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', shared_package_dir])
# Now synchronize from shared directory to all instances
shared_packages = {}
for package_name in os.listdir(shared_modules_dir):
package_dir = os.path.join(shared_modules_dir, package_name)
if os.path.isdir(package_dir) and not package_name.startswith('.'):
shared_packages[package_name] = package_dir
# Sync to main instance
for package_name, package_dir in shared_packages.items():
main_package_dir = os.path.join(main_modules_dir, package_name)
if not os.path.exists(main_package_dir):
shutil.copytree(package_dir, main_package_dir)
logger.info(f"Synchronized {package_name} to main instance")
subprocess.run(['chmod', '-R', '755', main_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', main_package_dir])
# Sync to user instances
for user_dir in os.listdir(USERS_DIR):
user_path = os.path.join(USERS_DIR, user_dir)
if os.path.isdir(user_path):
user_modules_dir = os.path.join(user_path, 'node_modules')
os.makedirs(user_modules_dir, exist_ok=True)
for package_name, package_dir in shared_packages.items():
user_package_dir = os.path.join(user_modules_dir, package_name)
if not os.path.exists(user_package_dir):
shutil.copytree(package_dir, user_package_dir)
logger.info(f"Synchronized {package_name} to user {user_dir}")
subprocess.run(['chmod', '-R', '755', user_package_dir])
subprocess.run(['chown', '-R', 'ubuntu:ubuntu', user_package_dir])
logger.info("Full package synchronization completed")
def sync_installs(npm_package=None, instance_path=None):
"""
Function to be triggered on npm installation events.
Args:
npm_package: Name of the installed npm package (optional)
instance_path: Path to the instance where the package was installed (optional)
"""
logger.info(f"SyncInstalls triggered for {npm_package or 'all packages'} in {instance_path or 'unknown'}")
if npm_package and instance_path:
# Handle specific package installation
source_type = 'main' if MAIN_INSTANCE_DIR in instance_path else 'user'
modules_dir = os.path.join(instance_path, 'node_modules')
package_dir = os.path.join(modules_dir, npm_package)
if os.path.exists(package_dir):
handler = NodePackageHandler(source_type, instance_path)
handler.sync_package(npm_package, package_dir)
else:
logger.warning(f"Package directory not found: {package_dir}")
else:
# Perform full synchronization
sync_all_packages()
def initialize_sync_system():
"""Initialize the node synchronization system."""
logger.info("Initializing Node-RED package synchronization system")
# Perform initial synchronization of all packages
sync_all_packages()
# Set up watchers for future changes
observers = setup_watchers()
# Return observers so they can be stopped if needed
return observers
# --- Utility Functions ---
def SendToQueue(queue_name, message_dict):
# Send the message to the queue
with Connection(RABBITMQ_URL) as rabbit_conn:
producer = rabbit_conn.Producer()
message_str = json.dumps(message_dict)
producer.publish(
message_str.encode('utf-8'),
exchange=exchange,
routing_key=queue_name,
serializer='json', # You can use JSON as the message format
content_type='application/json'
)
def get_db_connection():
return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT, sslmode='disable' )
def publish_alert(message_body):
"""Publishes an alert message to the configured RabbitMQ queue."""
if not setup_rabbitmq():
logger.error("Cannot publish alert: RabbitMQ not connected.")
return
try:
rabbit_channel.basic_publish(
exchange='',
routing_key=RABBITMQ_ALERTS_QNAME,
body=json.dumps(message_body),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
logger.info(f"Published alert to RabbitMQ queue '{RABBITMQ_ALERTS_QNAME}': {message_body}")
except (pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.StreamLostError) as e:
logger.error(f"Failed to publish alert due to connection issue: {e}. Attempting reconnect on next cycle.")
# Force close to trigger reconnection attempt next time
close_rabbitmq_connection()
except Exception as e:
logger.error(f"An unexpected error occurred while publishing alert: {e}")
close_rabbitmq_connection() # Attempt reconnect
def GetRedisInt(key_name):
try:
result = int(redis_conn.get(key_name).decode('utf-8'))
except:
result = None
return result
def GetRedisFloat(key_name):
try:
result = float(redis_conn.get(key_name).decode('utf-8'))
except:
result = None
return result
def GetRedisString(key_name):
try:
result = redis_conn.get(key_name).decode('utf-8')
except:
result = None
return result
def GetRedisMap(key_name):
try:
result_bytes = redis_conn.hgetall(key_name)
result = {k.decode('utf-8'): v.decode('utf-8') for k, v in result_bytes.items()}
except:
result = {}
return result
def GetDeviceDetails(cur, deployment_ids, location_id):
#ID, Well id, MAC, Last_Message, Location, Description, Deployment
macs = [mac for _, mac in deployment_ids]
#macs = list(deployment_ids.keys())
macs_string_nq = ",".join(macs)
macs_string = "'" + "','".join(macs) + "'"
if location_id == -1:
sql = f"""
WITH ordered_macs AS (
SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac,
generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position
)
SELECT d.*
FROM public.devices d
JOIN ordered_macs om ON d.device_mac = om.mac::text
WHERE device_mac IN ({macs_string})
ORDER BY om.position;
"""
else:
sql = f"""
WITH ordered_macs AS (
SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac,
generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position
)
SELECT d.*
FROM public.devices d
JOIN ordered_macs om ON d.device_mac = om.mac::text
WHERE device_mac IN ({macs_string}) AND location = {location_id}
ORDER BY om.position;
"""
cur.execute(sql)
logger.info(sql)
devices_ids_records = cur.fetchall()
all_details = []
devices_ids_list = [x[0] for x in devices_ids_records]
device_ids_string = ",".join(map(str, devices_ids_list))
#sql = f"SELECT device_id, MAX(time) as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) GROUP BY device_id" #to slow
sql = f"SELECT DISTINCT ON (device_id) device_id, time as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) AND time > now() - INTERVAL '1 day' ORDER BY device_id, time DESC"
logger.info(sql)
cur.execute(sql)
devices_times = cur.fetchall()#cur.fetchone() #returns times of last readings
found_device_details = {}
for device_record in devices_times:
device_id, last_message_time = device_record
found_device_details[device_id] = last_message_time
cnt = 0
for device_table_record in devices_ids_records:
if len(devices_times) > 0:
if device_id in found_device_details:
last_message_time = found_device_details[device_id]
last_message_epoch = int(last_message_time.timestamp())
else:
try:
last_message_time = int(device_table_record[14])
except:
last_message_time = 0
last_message_epoch = last_message_time
else:
last_message_time = 0
last_message_epoch = 0
#print(last_message_epoch)
#print(type(last_message_epoch))
device_id = device_table_record[0]
mac = device_table_record[1]
well_id = device_table_record[2]
description = device_table_record[3]
if description == None:
description = ""
if device_table_record[5] != None:
if device_table_record[5] != "":
description = description + " Close to " + device_table_record[5]
location_id = device_table_record[4]
if location_id == None:
location_id = 0
alert_details = device_table_record[-1]
if alert_details == None:
alert_details = {}#{'red_sec': 28801, 'yellow_sec': 14401}
else:
print(alert_details)
last_seen = GetRedisFloat(f'lastseen_{device_id}')
if last_seen == None:
last_seen = 0
row_data = [device_id, well_id, mac, last_message_epoch, location_names[location_id], description, deployment_ids[cnt][0], alert_details, last_seen]
cnt += 1
all_details.append(row_data)
return all_details
def GetVisibleDevicesPerLocation(deployments, location):
global deployments_devices, device_alerts_all
devices_details = []
deployments_devices = {}
with get_db_connection() as conn:
#list all devices that user has access to
if deployments == "-1" or deployments == "0":
sql = "SELECT deployment_id, devices FROM public.deployment_details"
else:
sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})"
with conn.cursor() as cur:
cur.execute(sql)
devices_groups = cur.fetchall()#cur.fetchone()
deployment_ids = []
for deployment_id, dev_group in devices_groups:
if dev_group != None:
if len(dev_group) > 10:
if dev_group[0] == "[":
macs_group = literal_eval(dev_group)
else:
macs_group = dev_group.split(',')
for mac in macs_group:
deployment_ids.append((deployment_id, mac))
devices_details = GetDeviceDetails(cur, deployment_ids, location_indexes[location])
for device_details in devices_details:
deployment_id = device_details[6]
device_id = device_details[0]
devices_details_map[device_id] = device_details
if len(device_details[7]) > 2:
print(device_details[7])
try:
device_alerts_all[device_id] = json.loads(device_details[7])
except:
pass
if deployment_id not in deployments_devices:
deployments_devices[deployment_id] = []
deployments_devices[deployment_id].append(device_id)
#devices_details.append(devices_detail)
return devices_details
def GetLastDetected(device_id, field_name, threshold_value):
# Example usage:
query = GetLastPointQuery(device_id, field_name, threshold_value)
print(query)
last_detected = None
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(query)
result = cur.fetchone()
if result != None:
last_detected = result[0].timestamp()
return last_detected
def GetMatchingDevices(privileges, group, deployment, location):
global LocationsMap
results=[]
if privileges != "-1":
if deployment == "" or deployment == "0":
deployment = privileges
privileges_list = privileges.split(',')
if deployment != "0":
if "," in deployment:
deployment = FilterList(deployment, privileges)
else:
if deployment not in privileges_list:
return results
else:
if deployment == "0":
deployment = "-1"
devices = GetVisibleDevicesPerLocation(deployment, location)
return devices
def GetAlarmSettings(deployment_id):
sql = f"""SELECT "alarm_details" FROM deployments WHERE deployment_id = {deployment_id};"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
alarm_armed_settings = cur.fetchone()
if alarm_armed_settings[0] == None:
alarm_armed_settings = "{}"
else:
alarm_armed_settings = alarm_armed_settings[0]
print(deployment_id, alarm_armed_settings)
return alarm_armed_settings
def GetDeviceAlarmSettings(device_id):
sql = f"""SELECT "alert_details" FROM devices WHERE device_id = {device_id};"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
alert_details = cur.fetchone()[0]
if alert_details == None:
alert_details = ""
#print(device_id, alert_details)
return alert_details
def GetUsers():
with get_db_connection() as conn:
sql = "SELECT user_id, access_to_deployments, email, phone_number FROM public.person_details"
result = []
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
return result
def GetCaretakers(deployment_id, users):
caretakers = []
deployment_id_str = str(deployment_id)
for access_to in users:
if access_to[1] != None:
if "," in access_to[1]:
access_to_list = access_to[1].split(",")
else:
access_to_list = [access_to[1]]
if deployment_id_str in access_to_list:
caretakers.append((access_to[0], access_to[2], access_to[3]))
return caretakers
def load_device_configurations():
global devices_config
"""Loads device configurations including alert thresholds from the database."""
#Load id's of all deployed devices
deployment_id = "0"
group_id = "All"
location = "All"
devices_config = GetMatchingDevices("-1", group_id, deployment_id, location)
for device_detail in devices_config:
device_id = device_detail[0]
mac_to_device_id[device_detail[2]] = device_detail[0]
alarm_device_settings_str = GetDeviceAlarmSettings(device_id)
#if alarm_device_settings != "":
# print(alarm_device_settings)
deployment_id = device_detail[6]
#if device_id == 540:
# print(device_id)
device_to_deployment[device_id] = deployment_id
if deployment_id not in deployment_id_list:
deployment_id_list.append(deployment_id)
device_id_s = str(device_id)
last_seen = device_detail[-1] #this one comes from REDIS!
if last_seen == 0:
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
threshold_details = GetRedisString('radar_threshold'+device_id_s)
try:
radar_threshold_list = literal_eval(threshold_details)
radar_threshold_signal = radar_threshold_list[0]
radar_threshold_value = radar_threshold_list[1]
except:
#key not found so read from DB, and store to key
sql = f"""
SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s};
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
threshold_details = cur.fetchone()[0]
print(device_id_s, threshold_details)
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
if threshold_details != None:
threshold_details_list = literal_eval(threshold_details)
radar_threshold_signal = threshold_details_list[0]
radar_threshold_value = threshold_details_list[1]
redis_conn.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value]))
last_seen = GetRedisFloat('lastseen_'+device_id_s)
if last_seen == None:
last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value)
if last_seen == None:
last_seen = 0
redis_conn.set('lastseen_'+device_id_s, last_seen)
#if alarm_device_settings != "":
#do this only if REDIS has no key present! Why? DB reflects all changes!
redis_conn.set('alarm_device_settings_'+device_id_s, alarm_device_settings_str)
print(device_id_s, last_seen)
users = GetUsers()
for deployment_id in deployment_id_list:
if deployment_id == 21:
print("Stop")
if deployment_id == 38:
print("Stop")
alarm_settings_str = GetAlarmSettings(deployment_id)
if len(alarm_settings_str) > 2:
alarm_settings = json.loads(alarm_settings_str)
alarm_armed_settings = alarm_settings['enabled']
alarm_settings["armed"] = alarm_armed_settings
alarms_settings_all[deployment_id] = alarm_settings
#lets reset bit 1, so armed alarm is recognized on start if present... start of what???
#alarm_armed_settings = ClearBit(alarm_armed_settings, 1)
#alarms_settings_shadows[deployment_id] = alarm_armed_settings
redis_conn.set(f'alarm_settings_{deployment_id}', alarm_settings_str)
#lets create map deployment_id > person details
care_takers_all[deployment_id] = GetCaretakers(deployment_id, users)
for user in users:
print(user)
logger.info(f"Successfully loaded configuration for {len(devices_config)} devices.")
# Optional: Close cursor and commit if any changes were made (unlikely here)
# cur.close()
# conn.commit() # Not needed for SELECT
return True # Indicate success
def GetLastPointQuery(device_id, field_name, threshold_value):
"""
Generate a SQL query to find the last point in radar_readings where the specified field
meets the threshold condition.
Parameters:
device_id (int): The device ID to filter by
field_name (str): The field to check, e.g., "s2_max", "s28_min", etc.
threshold_value (float): The threshold value to compare against
Returns:
str: SQL query string
"""
# Parse the field name to get base field and operation
parts = field_name.split('_')
base_field = parts[0]
operation = parts[1] if len(parts) > 1 else None
# Define the field expression based on base_field
if base_field == 's28':
field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
elif base_field == 'm08':
field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9"
else:
field_expr = base_field
# Define comparison operator based on operation
operator = ">" if operation == "max" else "<"
# Generate the SQL query
query = f"""SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1;"""
return query
def GetBit(alarm_armed_settings, bit_nr):
if bit_nr <0 or bit_nr >5:
return False
return alarm_armed_settings[5-bit_nr] == "1"
def SetBit(alarm_armed_settings, bit_nr):
if bit_nr < 0 or bit_nr > 5:
return False
alarm_list = list(alarm_armed_settings)
alarm_list[5-bit_nr] = "1"
result = ''.join(alarm_list)
return result
def ClearBit(alarm_armed_settings, bit_nr):
if bit_nr < 0 or bit_nr > 5:
return False
alarm_list = list(alarm_armed_settings)
alarm_list[5-bit_nr] = "0"
result = ''.join(alarm_list)
return result
def replace_and_save(template_file, output_file, replacements):
# Read the template file
with open(template_file, 'r') as file:
content = file.read()
# Perform all replacements
for placeholder, replacement in replacements:
content = content.replace(placeholder, replacement)
# Write the modified content to the output file
with open(output_file, 'w') as file:
file.write(content)
def update_acl_file():
"""Update the Mosquitto ACL file based on database permissions."""
conn = get_db_connection()
if not conn:
return False
try:
with conn.cursor() as cur:
# Get all permissions grouped by username
cur.execute("""
SELECT user_name, array_agg(topic) as topics, array_agg(permission) as permissions
FROM user_topics
GROUP BY user_name
ORDER BY user_name
""")
users = cur.fetchall()
# Create ACL file content
acl_content = [
"# Mosquitto ACL file - Auto-generated by ACL Manager",
"# Last updated: " + time.strftime("%Y-%m-%d %H:%M:%S"),
"",
"# Default rule - deny all access",
"pattern read $SYS/#",
""
]
# Add user permissions
for username, topics, permissions in users:
acl_content.append(f"user {username}")
for i in range(len(topics)):
topic = topics[i]
perm = permissions[i]
if 'r' in perm:
acl_content.append(f"topic read {topic}")
if 'w' in perm:
acl_content.append(f"topic write {topic}")
acl_content.append("")
# Write directly to the ACL file
with open(MOSQUITTO_ACL_FILE, "w") as f:
f.write("\n".join(acl_content))
# Touch the trigger file to restart mosquitto
with open("/etc/mosquitto/.restart_trigger", "w") as f:
f.write(time.strftime("%Y-%m-%d %H:%M:%S"))
print("ACL file updated. Mosquitto will restart automatically.")
return True
except Exception as e:
print(f"Error updating ACL file: {e}")
return False
finally:
conn.close()
def GetUserPriviledges(username):
with get_db_connection() as conn:
sql = f"SELECT access_to_deployments, key FROM public.person_details WHERE user_name='{username}'"
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result != None:
return result
else:
return []
def GetUserDetails(username):
with get_db_connection() as conn:
sql = f"SELECT * FROM public.person_details WHERE user_name='{username}'"
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result != None:
return result
else:
return []
def GetBeneficiaryFromDeployment(deployment_id):
with get_db_connection() as conn:
sql = f"SELECT beneficiary_id FROM public.deployment_details WHERE deployment_id='{deployment_id}'"
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchone()
if result != None:
beneficiary_id = result[0]
sql = f"SELECT first_name, last_name FROM public.person_details WHERE user_id='{beneficiary_id}'"
cur.execute(sql)
result1 = cur.fetchone()
return result1[0] + " " + result1[1]
else:
return 0
def DeleteUserTopics(user_name):
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute("DELETE FROM user_topics WHERE user_name = %s", (user_name,))
def GetVisibleDevices(deployments):
devices_details = []
stt = time.time()
with get_db_connection() as conn:
#list all devices that user has access to
if deployments == "-1":
sql = "SELECT deployment_id, devices FROM public.deployment_details"
else:
sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})"
with conn.cursor() as cur:
print(sql)
cur.execute(sql)
devices_groups = cur.fetchall()#cur.fetchone()
deployment_ids = []
for deployment_id, dev_group in devices_groups:
if dev_group != None:
if len(dev_group) > 10:
if "[" not in dev_group:
if "," not in dev_group:
dev_group = '["' + dev_group + '"]'
else:
dev_group = dev_group.replace(" ", "")
dev_group = dev_group.replace(",", '","')
dev_group = '["' + dev_group + '"]'
macs_group = literal_eval(dev_group)
for mac in macs_group:
deployment_ids.append((deployment_id, mac))
else:
print(f"Deployment {deployment_id} has dev_group empty")
devices_details = GetDeviceDetails(cur, deployment_ids, -1)
#devices_details.append(devices_detail)
return devices_details
def add_user(username, password):
"""Add a user to the Mosquitto password file."""
try:
subprocess.run(["/home/ubuntu/mosquitto_pass_simple.sh", MOSQUITTO_PASSWORD_FILE, username, password], check=True)
#subprocess.run(["sudo", "/usr/bin/mosquitto_passwd", "-b", MOSQUITTO_PASSWORD_FILE, username, password], check=True)
print(f"User {username} added successfully to Mosquitto password file.")
return True
except subprocess.CalledProcessError as e:
print(f"Error adding user to Mosquitto password file: {e}")
return False
def delete_user(username):
"""Delete a user from the Mosquitto password file."""
try:
subprocess.run(["sudo", "mosquitto_passwd", "-D", MOSQUITTO_PASSWORD_FILE, username], check=True)
print(f"User {username} deleted successfully from Mosquitto password file.")
return True
except subprocess.CalledProcessError as e:
print(f"Error deleting user from Mosquitto password file: {e}")
return False
def add_topic_permission(username, topic, permission):
"""Add a topic permission for a user in the database."""
conn = get_db_connection()
if not conn:
return False
try:
with conn.cursor() as cur:
# Check if the permission already exists
cur.execute("SELECT id FROM user_topics WHERE user_name = %s AND topic = %s",
(username, topic))
result = cur.fetchone()
if result:
# Update existing permission
cur.execute("UPDATE user_topics SET permission = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(permission, result[0]))
print(f"Updated permission for {username} on topic {topic} to {permission}")
else:
# Insert new permission
cur.execute("INSERT INTO user_topics (user_name, topic, permission) VALUES (%s, %s, %s)",
(username, topic, permission))
print(f"Added permission for {username} on topic {topic}: {permission}")
conn.commit()
# Update the ACL file
return True
except Exception as e:
print(f"Database error: {e}")
conn.rollback()
return False
finally:
conn.close()
def delete_topic_permission(username, topic=None):
"""Delete topic permissions for a user in the database."""
conn = get_db_connection()
if not conn:
return False
try:
with conn.cursor() as cur:
if topic:
# Delete specific topic permission
cur.execute("DELETE FROM user_topics WHERE user_name = %s AND topic = %s",
(username, topic))
print(f"Deleted permission for {username} on topic {topic}")
else:
# Delete all topic permissions for user
cur.execute("DELETE FROM user_topics WHERE user_name = %s", (username,))
print(f"Deleted all permissions for {username}")
conn.commit()
# Update the ACL file
update_acl_file()
return True
except Exception as e:
print(f"Database error: {e}")
conn.rollback()
return False
finally:
conn.close()
def UpdateACL(username, token):
users_p = GetUserPriviledges(username)
privileges = users_p[0][0]
p = users_p[0][1]
DeleteUserTopics(username)
if privileges != None and privileges != 'None':
devices = GetVisibleDevices(privileges)
if len(devices) > 0:
macs_list = []
for device in devices:
mac = device[2]
if mac != None:
macs_list.append(mac)
if privileges == "-1":
add_user(username, p)
add_topic_permission(username, "#", "rw")
else:
if len(macs_list) > 0:
add_user(username, p)
#add_user(username+"_", token)
for mac in macs_list:
add_topic_permission(username, "/"+mac, "r")
#add_topic_permission(username+"_", "/"+mac, "r")
update_acl_file()
def fetch_device_data(user_name, token):
# API endpoint
url = "https://eluxnetworks.net/function/well-api/api"
# Request headers and data
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
payload = {
'user_name': user_name,
'token': token,
'function': 'device_list_4_gui'
}
# Make the API call
response = requests.post(url, headers=headers, data=payload)
# Check if the request was successful
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API request failed with status code {response.status_code}: {response.text}")
def generate_config_file(data, output_path, user_name, p):
# Extract deployment data
deployments = []
devices = []
wellnuo_configPath = os.path.join(output_path, 'wellnuo_config.js')
# Process the API response to extract deployments and devices
# Note: You may need to adjust this based on the actual structure of the API response
if 'deploymentData' in data:
for deployment in data['deploymentData']:
deployments.append({'deployment_id': deployment.get('deployment_id'),'name': deployment.get('name')})
if 'deviceData' in data:
for device in data['deviceData']:
devices.append({'well_id': device.get('well_id'),'mac': device.get('mac'),'room_name': device.get('room_name'),'deployment_id': device.get('deployment_id')})
deployment_lines = []
for d in deployments:
deployment_lines.append(f" {{ deployment_id: \"{d['deployment_id']}\", name: \"{d['name']}\" }}")
deployment_str = ",\n".join(deployment_lines)
device_lines = []
for d in devices:
device_lines.append(f" {{ well_id: \"{d['well_id']}\", mac: \"{d['mac']}\", room_name: \"{d['room_name']}\", deployment_id: \"{d['deployment_id']}\" }}")
device_str = ",\n".join(device_lines)
# Create the configuration content
config_content = f"""// User-specific configuration for WellNuo nodes
module.exports = {{
MQTT_USERNAME: "{user_name}",
MQTT_PASSWORD: "{p}",
deploymentData: [
{deployment_str}
],
deviceData: [
{device_str}
]
}};
"""
# Write to file
os.makedirs(output_path, exist_ok=True)
with open(wellnuo_configPath, 'w') as f:
f.write(config_content)
print(f"Configuration file has been created at: {wellnuo_configPath}")
def create_symbolic_link(source_dir_in, target_dir_in):
# Define source and target paths
source_dir = os.path.expanduser(source_dir_in)
target_dir = os.path.expanduser(target_dir_in)
target_link = os.path.join(target_dir, "node-red-contrib-wellnuo")
# Check if source directory exists
if not os.path.exists(source_dir):
print(f"Error: Source directory '{source_dir}' does not exist.")
return False
# Create target directory if it doesn't exist
if not os.path.exists(target_dir):
try:
os.makedirs(target_dir)
print(f"Created directory: {target_dir}")
except Exception as e:
print(f"Error creating directory '{target_dir}': {e}")
return False
# Check if the link already exists
if os.path.exists(target_link):
if os.path.islink(target_link):
#print(f"Symbolic link already exists at '{target_link}'.")
#choice = input("Do you want to remove it and create a new one? (y/n): ")
return True
#if choice.lower() == 'y':
#try:
#os.unlink(target_link)
#print(f"Removed existing symbolic link: {target_link}")
#except Exception as e:
#print(f"Error removing existing link: {e}")
#return False
#else:
#print("Operation cancelled.")
#return False
else:
print(f"Error: '{target_link}' already exists and is not a symbolic link.")
return False
# Create the symbolic link
try:
os.symlink(source_dir, target_link)
print(f"Successfully created symbolic link from '{source_dir}' to '{target_link}'")
return True
except Exception as e:
print(f"Error creating symbolic link: {e}")
return False
preload_template_path = os.path.expanduser("~/.node-red/preload_template.js")
if not os.path.exists(preload_template_path):
preload_template_content = """// CommonJS wrapper for tslib
try {
global.jsbn = require('jsbn');
const tslib = require('tslib/tslib.js'); // Use the CommonJS version
global.tslib = tslib;
module.exports = { tslib, jsbn: global.jsbn };
console.log("Successfully preloaded tslib and jsbn modules");
} catch (error) {
console.error("Error preloading modules:", error.message);
}
"""
with open(preload_template_path, "w") as f:
f.write(preload_template_content)
def SetupUserEnvironment(username, token, ps):
"""If needed, Set up the Node-RED environment for a new user"""
if True: #Lets do it always
UpdateACL(username, token)
user_dir = os.path.expanduser(f"~/.node-red/users/{username}")
os.makedirs(user_dir, exist_ok=True)
# 2. Copy settings template
settings_template_file = os.path.expanduser("~/.node-red/settings_template.js")
user_settings_file = os.path.join(user_dir, "settings.js")
replace_and_save(settings_template_file, user_settings_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)])
timeout_script_template_file = os.path.expanduser("~/.node-red/timeout-script_template.js")
user_timeout_script_file = os.path.join(user_dir, "timeout-script.js")
replace_and_save(timeout_script_template_file, user_timeout_script_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)])
# 3. Prepare user wellnuo-config file
userConfigPath = os.path.join(user_dir, 'config')
users_p = GetUserPriviledges(username)
p = users_p[0][1]
api_data = fetch_device_data(username, token)
generate_config_file(api_data, userConfigPath, username, p)
# 4. Create initial empty flows file if it doesn't exist
flows_file = os.path.join(user_dir, "flows.json")
if not os.path.exists(flows_file):
with open(flows_file, "w") as f:
f.write("[]")
user_flows_file = os.path.join(user_dir, f"flows_{username}.json")
if os.path.exists(user_flows_file):
replace_and_save(user_flows_file, flows_file, [])
# 5. Register user in activity tracking
activity_file = "/tmp/node_red_activity.json"
activity_data = {}
if os.path.exists(activity_file):
with open(activity_file, 'r') as f:
try:
activity_data = json.load(f)
except json.JSONDecodeError:
activity_data = {}
import time
activity_data[username] = time.time()
with open(activity_file, 'w') as f:
json.dump(activity_data, f)
preload_template_file = os.path.expanduser("~/.node-red/preload_template.js")
user_preload_file = os.path.join(user_dir, "preload.js")
replace_and_save(preload_template_file, user_preload_file, []) # No replacements needed
shared_nodes_dir = os.path.expanduser("~/.node-red/shared_nodes")
if not os.path.exists(os.path.join(shared_nodes_dir, "node-red-contrib-wellnuo")):
# Copy from the main node_modules if it exists there
src_path = os.path.expanduser("~/.node-red/node_modules/node-red-contrib-wellnuo")
if os.path.exists(src_path):
import shutil
os.makedirs(shared_nodes_dir, exist_ok=True)
shutil.copytree(src_path, os.path.join(shared_nodes_dir, "node-red-contrib-wellnuo"))
print(f"Copied node-red-contrib-wellnuo to shared_nodes directory")
# Add to your function after other template processing
# 6. Create symbolic link to common Wellnuo Nodes
#success = create_symbolic_link("~/.node-red/node_modules/node-red-contrib-wellnuo", f"~/.node-red/users/{username}/node_modules")
#if success:
#print("Link creation completed successfully.")
#else:
#print("Link creation failed.")
print(f"User environment set up for {username}")
return True
def StartNodeRed(port, user_name):
# Define paths explicitly
user_dir = f"/home/ubuntu/.node-red/users/{user_name}"
settings_file = f"{user_dir}/settings.js"
log_file = f"{user_dir}/node_red.log"
# Ensure the user directory exists
import os
os.makedirs(user_dir, exist_ok=True)
# Start Node-RED with explicit paths and verbose mode
cmd = f"cd /home/ubuntu/.node-red && node-red -v -p {port} -u {user_dir} -s {settings_file} > {log_file} 2>&1"
# Log the command being executed
print(f"Starting Node-RED with command: {cmd}")
# Start the process
process = subprocess.Popen(
cmd,
shell=True,
preexec_fn=os.setpgrp # Create process group
)
print(f"Node-RED started with PID: {process.pid}")
return process.pid
def StopNodeRed(pid):
try:
os.killpg(pid, signal.SIGTERM) # Kill the process group
return f"Node-RED process group {pid} stopped"
except ProcessLookupError:
return "Process not found, may have already terminated"
def GetAvailablePort(user_name):
#lets get all active
sql = f"SELECT * FROM node_reds WHERE status = 1"
port_nr = 0
with get_db_connection() as conn:
with conn.cursor() as cur:
print(sql)
cur.execute(sql)
result = cur.fetchall()
for port_nr in range(1900,2000):
if any(item[1] == port_nr for item in result):
next
else:
break
port = port_nr
return port
def StoreNodeRedUserPortOn(port, user_name, pid, time_s):
#lets get all active
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
last_activity = time_s
status = 1
cur.execute("""
INSERT INTO public.node_reds (user_name, port, last_activity, status, pid)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_name)
DO UPDATE SET
port = EXCLUDED.port,
last_activity = EXCLUDED.last_activity,
pid = EXCLUDED.pid,
status = EXCLUDED.status;
""", (user_name, port, last_activity, status, pid))
cur.execute("""
INSERT INTO public.node_reds_usage (user_name, port, time_on, pid)
VALUES (%s, %s, %s, %s)
""", (user_name, port, last_activity, pid))
return 1
except:
return 0
def StoreNodeRedUserPortOff(port, user_name, time_s):
pid = 0
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
last_activity = time.time()
status = 0
cur.execute("""
INSERT INTO public.node_reds (user_name, port, status, pid)
VALUES (%s, %s, %s, %s)
ON CONFLICT (user_name)
DO UPDATE SET
port = EXCLUDED.port,
pid = EXCLUDED.pid,
status = EXCLUDED.status;
""", (user_name, port, status, pid))
cur.execute("""
INSERT INTO public.node_reds_usage (user_name, port, time_off, pid)
VALUES (%s, %s, %s, %s)
""", (user_name, port, last_activity, pid))
return 1
except:
return 0
def ClearNodeRedUserPort(port, user_name):
#lets get all active
try:
with get_db_connection() as conn:
with conn.cursor() as cur:
last_activity = 0
status = 0
pid = 0
cur.execute("""
INSERT INTO public.node_reds (user_name, port, last_activity, status, pid)
VALUES (%s, %s, %s, %s, %s)
ON CONFLICT (user_name)
DO UPDATE SET
port = EXCLUDED.port,
last_activity = EXCLUDED.last_activity,
pid = EXCLUDED.pid,
status = EXCLUDED.status;
""", (user_name, port, last_activity, status, pid))
return 1
except:
return 0
def IsNRRunning(user_name):
port = 0
pid = 0
sql = f"SELECT port, pid FROM node_reds WHERE user_name = '{user_name}' and status = 1 and pid > 0"
with get_db_connection() as conn:
with conn.cursor() as cur:
print(sql)
cur.execute(sql)
result = cur.fetchone()
if result != None:
port = result[0]
pid = result[1]
if pid > 0:
try:
process = psutil.Process(pid)
if not process.is_running():
pid = 0
except psutil.NoSuchProcess:
pid = 0
return port, pid
def GetNodeRedDetails(user_name):
port = 0
sql = f"SELECT port, pid FROM node_reds WHERE user_name = '{user_name}'"
with get_db_connection() as conn:
with conn.cursor() as cur:
print(sql)
cur.execute(sql)
result = cur.fetchone()
if result != None:
port = result[0]
pid = result[1]
return port, pid
def combine_user_flows(user_directories, output_dir='/home/ubuntu/.node-red',
main_instance_port=1999):
"""
Combine multiple users' Node-RED configurations into a single deployment
with each user's flows contained in their own subflow.
"""
# Initialize combined data structures
combined_flows = []
combined_nodes_config = {}
combined_users_config = {"_": {}}
combined_runtime_config = {}
combined_credentials = {}
# Set margins to prevent cutting off
left_margin = 100
top_margin = 100
# First pass: collect all user flows
user_flow_data = []
for user_dir in user_directories:
username = os.path.basename(user_dir)
print(f"Processing user: {username}")
# Load user's flow file
flow_file = os.path.join(user_dir, f"flows_{username}.json")
try:
with open(flow_file, 'r') as f:
user_flows = json.load(f)
except Exception as e:
print(f"Error loading flow file for {username}: {e}")
continue
# Prepare storage for tabs and flows analysis
user_tabs = {}
nodes_by_tab = {}
global_config_nodes = []
wellplug_nodes = [] # Store wellplug sensor nodes specifically
# First identify tabs and collect global/special nodes
for node in user_flows:
if not isinstance(node, dict):
continue
if node.get('type') == 'tab':
# This is a flow tab
tab_id = node.get('id')
if tab_id:
user_tabs[tab_id] = node
nodes_by_tab[tab_id] = []
elif not node.get('z'):
# This is likely a global config node
global_config_nodes.append(node)
# Collect WellPlug sensor nodes for special handling
if node.get('type') == 'device-sensor-in':
wellplug_nodes.append(node)
# Now collect nodes by tab
for node in user_flows:
if not isinstance(node, dict):
continue
tab_id = node.get('z')
if tab_id and tab_id in nodes_by_tab:
nodes_by_tab[tab_id].append(node)
# Calculate dimensions for each tab's content
tab_dimensions = {}
for tab_id, nodes in nodes_by_tab.items():
if not nodes:
continue
# Find boundaries of the flow
min_x = min((node.get('x', 0) for node in nodes if 'x' in node), default=0)
max_x = max((node.get('x', 0) +
(node.get('w', 150) if 'w' in node else 150)
for node in nodes if 'x' in node), default=0)
min_y = min((node.get('y', 0) for node in nodes if 'y' in node), default=0)
max_y = max((node.get('y', 0) +
(node.get('h', 30) if 'h' in node else 30)
for node in nodes if 'y' in node), default=0)
width = max_x - min_x + 100 # Add extra margin
height = max_y - min_y + 100 # Add extra margin
tab_dimensions[tab_id] = {
'width': width,
'height': height,
'min_x': min_x,
'min_y': min_y,
'original_tab': user_tabs[tab_id]
}
# Load user configuration files
user_config = {}
wellnuo_config = {}
# Try to load user's WellNuo configuration - FIXED to handle JS format
wellnuo_config_path = os.path.join(user_dir, 'config', 'wellnuo_config.js')
try:
if os.path.exists(wellnuo_config_path):
# Read the JavaScript config file as text
with open(wellnuo_config_path, 'r') as f:
config_content = f.read()
# Extract configuration using regex (simple approach)
mqtt_username = re.search(r'MQTT_USERNAME\s*[=:]\s*["\']([^"\']+)["\']', config_content)
mqtt_password = re.search(r'MQTT_PASSWORD\s*[=:]\s*["\']([^"\']+)["\']', config_content)
mqtt_server = re.search(r'MQTT_SERVER\s*[=:]\s*["\']([^"\']+)["\']', config_content)
mqtt_port = re.search(r'MQTT_PORT\s*[=:]\s*["\']?(\d+)["\']?', config_content)
if mqtt_username:
wellnuo_config['MQTT_USERNAME'] = mqtt_username.group(1)
if mqtt_password:
wellnuo_config['MQTT_PASSWORD'] = mqtt_password.group(1)
if mqtt_server:
wellnuo_config['MQTT_SERVER'] = mqtt_server.group(1)
if mqtt_port:
wellnuo_config['MQTT_PORT'] = int(mqtt_port.group(1))
# Extract deployment data using a different approach
deploy_start = config_content.find('deploymentData')
deploy_end = -1
device_start = -1
device_end = -1
if deploy_start > 0:
# Find the closing bracket of the array
bracket_level = 0
in_array = False
for i in range(deploy_start, len(config_content)):
if config_content[i] == '[':
bracket_level += 1
in_array = True
elif config_content[i] == ']':
bracket_level -= 1
if in_array and bracket_level == 0:
deploy_end = i + 1
break
device_start = config_content.find('deviceData')
if device_start > 0:
# Find the closing bracket of the array
bracket_level = 0
in_array = False
for i in range(device_start, len(config_content)):
if config_content[i] == '[':
bracket_level += 1
in_array = True
elif config_content[i] == ']':
bracket_level -= 1
if in_array and bracket_level == 0:
device_end = i + 1
break
# Parse deployment data
if deploy_start > 0 and deploy_end > deploy_start:
# Extract the array part
deploy_text = config_content[deploy_start:deploy_end]
# Find the actual array start
array_start = deploy_text.find('[')
if array_start > 0:
deploy_array = deploy_text[array_start:].strip()
# Convert JS to JSON format
deploy_array = deploy_array.replace("'", '"')
# Fix unquoted property names
deploy_array = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', deploy_array)
# Parse the array
try:
wellnuo_config['deploymentData'] = json.loads(deploy_array)
print(f"Successfully parsed deploymentData for {username}")
except json.JSONDecodeError as e:
print(f"Error parsing deploymentData for {username}: {e}")
# Parse device data
if device_start > 0 and device_end > device_start:
# Extract the array part
device_text = config_content[device_start:device_end]
# Find the actual array start
array_start = device_text.find('[')
if array_start > 0:
device_array = device_text[array_start:].strip()
# Convert JS to JSON format
device_array = device_array.replace("'", '"')
# Fix unquoted property names
device_array = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', device_array)
# Parse the array
try:
wellnuo_config['deviceData'] = json.loads(device_array)
print(f"Successfully parsed deviceData for {username}")
except json.JSONDecodeError as e:
print(f"Error parsing deviceData for {username}: {e}")
except Exception as e:
print(f"Error processing WellNuo config for {username}: {e}")
# Check for timeout-script.js to extract token
timeout_script_path = os.path.join(user_dir, 'timeout-script.js')
if os.path.exists(timeout_script_path):
try:
with open(timeout_script_path, 'r') as f:
script_content = f.read()
# Extract token
token_match = re.search(r'var\s+token\s*=\s*["\']([^"\']+)["\']', script_content)
if token_match:
wellnuo_config['token'] = token_match.group(1)
print(f"Found token for user {username}")
# Extract username if present
user_match = re.search(r'USER\s*:\s*([a-zA-Z0-9_-]+)', script_content)
if user_match:
wellnuo_config['user_name'] = user_match.group(1)
print(f"Found explicit username in timeout script: {wellnuo_config['user_name']}")
else:
# Default to directory name as username
wellnuo_config['user_name'] = username
print(f"Using directory name as username: {username}")
except Exception as e:
print(f"Error extracting token for {username}: {e}")
# Default to directory name as username
wellnuo_config['user_name'] = username
else:
# Default to directory name as username if timeout script not found
wellnuo_config['user_name'] = username
# Update wellplug nodes with user information
for node in wellplug_nodes:
node['_wellnuo_user_config'] = wellnuo_config
print(f"Updated wellplug node {node.get('id')} with user config")
# Load other configuration files
try:
# Node configuration
nodes_config_file = os.path.join(user_dir, ".config.nodes.json")
if os.path.exists(nodes_config_file):
with open(nodes_config_file, 'r') as f:
user_nodes_config = json.load(f)
user_config['nodes_config'] = user_nodes_config
# Users configuration
users_config_file = os.path.join(user_dir, ".config.users.json")
if os.path.exists(users_config_file):
with open(users_config_file, 'r') as f:
user_users_config = json.load(f)
user_config['users_config'] = user_users_config
# Runtime configuration
runtime_config_file = os.path.join(user_dir, ".config.runtime.json")
if os.path.exists(runtime_config_file):
with open(runtime_config_file, 'r') as f:
user_runtime_config = json.load(f)
user_config['runtime_config'] = user_runtime_config
# Credentials file
cred_file = os.path.join(user_dir, f"flows_{username}_cred.json")
if os.path.exists(cred_file):
with open(cred_file, 'r') as f:
user_cred = json.load(f)
user_config['credentials'] = user_cred
except Exception as e:
print(f"Error processing configuration for {username}: {e}")
# Add to the collection
user_flow_data.append({
'username': username,
'tabs': user_tabs,
'nodes_by_tab': nodes_by_tab,
'dimensions': tab_dimensions,
'global_config_nodes': global_config_nodes,
'config': user_config,
'wellnuo_config': wellnuo_config,
'wellplug_nodes': wellplug_nodes
})
# Initialize the combined flow with first tab
main_tab_id = str(uuid.uuid4())
main_tab = {
"id": main_tab_id,
"type": "tab",
"label": "Combined Flow",
"disabled": False, # Correct Python boolean
"info": "Main tab for combined user flows"
}
combined_flows.append(main_tab)
# Create subflow templates for each user
subflow_templates = {}
current_position_x = left_margin # Start with left margin
current_position_y = top_margin # Start with top margin
# Track the maximum height in current row
row_max_height = 0
max_row_width = 1800 # Maximum width for a row before wrapping
# First create all the subflow templates for users
for user_data in user_flow_data:
username = user_data['username']
print(f"Creating subflow for user: {username}")
# Create a subflow template for this user
subflow_id = f"subflow_{username}_{str(uuid.uuid4()).replace('-', '')}"
# Calculate total height and width needed for all tabs
total_width = sum(dim['width'] for dim in user_data['dimensions'].values()) + 100
max_height = max((dim['height'] for dim in user_data['dimensions'].values()), default=200) + 100
# Create the subflow template
subflow_template = {
"id": subflow_id,
"type": "subflow",
"name": f"User: {username}",
"info": f"Flows from user {username}",
"category": "Imported Flows",
"in": [],
"out": [],
"color": "#DDAA99",
"icon": "font-awesome/fa-user",
"status": {
"x": 100,
"y": 100,
"wires": []
}
}
combined_flows.append(subflow_template)
subflow_templates[username] = {
'id': subflow_id,
'template': subflow_template,
'position': (current_position_x, current_position_y),
'width': 300, # Default width for subflow instances
'height': 100 # Default height for subflow instances
}
# Update position for next subflow
current_position_x += 350 # Space between subflows
row_max_height = max(row_max_height, 150) # Track max height
# Check if we need to wrap to a new row
if current_position_x > max_row_width:
current_position_x = left_margin
current_position_y += row_max_height + 50
row_max_height = 0
# For each tab in the user's flows, create nodes inside the subflow
x_offset = 50 # Starting position inside subflow
y_offset = 50
for tab_id, nodes_list in user_data['nodes_by_tab'].items():
if not nodes_list:
continue
# Get the original tab info
original_tab = user_data['tabs'][tab_id]
dim = user_data['dimensions'][tab_id]
min_x = dim['min_x']
min_y = dim['min_y']
# Add a comment node to label this tab
tab_comment_id = f"tab_label_{username}_{tab_id}"
tab_comment = {
"id": tab_comment_id,
"type": "comment",
"name": f"User: {username} - Flow: {original_tab.get('label', 'Unnamed Flow')}",
"info": "",
"x": x_offset + 30,
"y": y_offset - 30,
"z": subflow_id,
"wires": []
}
combined_flows.append(tab_comment)
# Create ID mapping for this tab's nodes
id_mapping = {}
for node in nodes_list:
if isinstance(node, dict) and 'id' in node:
id_mapping[node['id']] = f"{username}_{node['id']}"
# Add the tab's nodes to the subflow
for node in nodes_list:
if not isinstance(node, dict):
continue
node_copy = copy.deepcopy(node)
# Assign new ID
if 'id' in node_copy:
node_copy['id'] = id_mapping.get(node_copy['id'], node_copy['id'])
# Assign to the subflow
node_copy['z'] = subflow_id
# Adjust position relative to flow origin and placement position
# Add extra margin to prevent cutting off
if 'x' in node_copy and 'y' in node_copy:
node_copy['x'] = (node_copy['x'] - min_x) + x_offset + left_margin
node_copy['y'] = (node_copy['y'] - min_y) + y_offset + top_margin
# Update wire references
if 'wires' in node_copy:
new_wires = []
for wire_set in node_copy['wires']:
new_wire_set = []
for wire in wire_set:
if wire in id_mapping:
new_wire_set.append(id_mapping[wire])
else:
new_wire_set.append(wire)
new_wires.append(new_wire_set)
node_copy['wires'] = new_wires
# Special handling for WellPlug sensor nodes to preserve user credentials
# Special handling for WellPlug sensor nodes to preserve user credentials
if node_copy.get('type') == 'device-sensor-in':
# Transfer user-specific wellnuo configuration
if '_wellnuo_user_config' in node_copy:
user_config = node_copy['_wellnuo_user_config']
# Store credentials in a way that's accessible to the editor
# Use properties without underscore prefix
node_copy['username'] = user_config.get('user_name', username)
node_copy['mqtt_password'] = user_config.get('MQTT_PASSWORD', '')
# Keep internal version for backward compatibility
node_copy['_username'] = user_config.get('user_name', username)
node_copy['_mqtt_password'] = user_config.get('MQTT_PASSWORD', '')
# Also preserve device and deployment selections
if 'selectedDevices' in node:
node_copy['selectedDevices'] = node['selectedDevices']
if 'selectedDeployments' in node:
node_copy['selectedDeployments'] = node['selectedDeployments']
# Remove any token reference - we're moving away from tokens
if '_token' in node_copy:
del node_copy['_token']
# Remove the temporary field
del node_copy['_wellnuo_user_config']
# Add to combined flows
combined_flows.append(node_copy)
print(f"WellPlug node {node_copy['id']} credentials: Username={node_copy.get('_username', 'None')}")
# Update offset for next tab in this user's subflow
y_offset += dim['height'] + 50
# Process global config nodes from this user
for node in user_data['global_config_nodes']:
if not isinstance(node, dict):
continue
node_copy = copy.deepcopy(node)
# Assign new ID with username prefix for uniqueness
if 'id' in node_copy:
node_copy['id'] = f"{username}_{node_copy['id']}"
# Update any references to other nodes
# This is more complex as different node types have different reference fields
# For subflow instances:
if 'type' in node_copy and node_copy['type'].startswith('subflow:'):
# The subflow template ID is in the type field after 'subflow:'
old_subflow_id = node_copy['type'].split(':')[1]
node_copy['type'] = f"subflow:{username}_{old_subflow_id}"
combined_flows.append(node_copy)
# Process configuration nodes
if 'config' in user_data and 'nodes_config' in user_data['config']:
for node_id, node_config in user_data['config']['nodes_config'].items():
# Add username prefix to ID
new_id = f"{username}_{node_id}"
combined_nodes_config[new_id] = node_config
# Process credentials
if 'config' in user_data and 'credentials' in user_data['config']:
user_creds = user_data['config']['credentials']
if 'credentials' in user_creds:
for node_id, cred_data in user_creds['credentials'].items():
# Add username prefix to ID
new_id = f"{username}_{node_id}"
combined_credentials[new_id] = cred_data
# Now create instances of each user's subflow on the main tab
current_position_x = left_margin
current_position_y = top_margin
row_max_height = 0
for username, subflow_info in subflow_templates.items():
subflow_id = subflow_info['id']
subflow_width = subflow_info['width']
subflow_height = subflow_info['height']
# Create a subflow instance on the main tab
instance_id = f"instance_{username}_{str(uuid.uuid4()).replace('-', '')}"
instance = {
"id": instance_id,
"type": f"subflow:{subflow_id}",
"name": f"User: {username}",
"x": current_position_x,
"y": current_position_y,
"z": main_tab_id,
"wires": []
}
combined_flows.append(instance)
# Update position for next instance
current_position_x += subflow_width + 50
row_max_height = max(row_max_height, subflow_height)
# Check if we need to wrap to a new row
if current_position_x > max_row_width:
current_position_x = left_margin
current_position_y += row_max_height + 50
row_max_height = 0
# Create a directory for WellNuo user configurations
wellnuo_config_dir = os.path.join(output_dir, 'config')
os.makedirs(wellnuo_config_dir, exist_ok=True)
# Create a combined WellNuo config file with all user credentials
combined_wellnuo_config = "// Combined WellNuo configuration\n\n"
combined_wellnuo_config += "module.exports = {\n"
combined_wellnuo_config += " // Default MQTT configuration\n"
combined_wellnuo_config += " MQTT_SERVER: \"mqtt.eluxnetworks.net\",\n"
combined_wellnuo_config += " MQTT_PORT: 8883,\n"
#combined_wellnuo_config += " MQTT_USERNAME: \"artib\",\n"
#combined_wellnuo_config += " MQTT_PASSWORD: \"well_arti_2027\",\n\n"
combined_wellnuo_config += " // User credentials mapping - used by custom node handler\n"
combined_wellnuo_config += " userCredentials: {\n"
# Add credentials for each user
for user_data in user_flow_data:
username = user_data['username']
wellnuo_config = user_data.get('wellnuo_config', {})
if 'user_name' in wellnuo_config and 'token' in wellnuo_config:
combined_wellnuo_config += f" \"{username}\": {{\n"
combined_wellnuo_config += f" username: \"{wellnuo_config['user_name']}\",\n"
combined_wellnuo_config += f" mqtt_password: \"{wellnuo_config['MQTT_PASSWORD']}\",\n"
combined_wellnuo_config += f" token: \"{wellnuo_config['token']}\"\n"
combined_wellnuo_config += " },\n"
combined_wellnuo_config += " },\n\n"
# Combine all users' deployment and device data
all_deployments = []
all_devices = []
deployment_ids_seen = set()
device_ids_seen = set()
for user_data in user_flow_data:
wellnuo_config = user_data.get('wellnuo_config', {})
# Add deployments from this user
if 'deploymentData' in wellnuo_config:
for deployment in wellnuo_config['deploymentData']:
# Check if we've already added this deployment ID
dep_id = str(deployment.get('deployment_id', ''))
if dep_id and dep_id not in deployment_ids_seen:
all_deployments.append(deployment)
deployment_ids_seen.add(dep_id)
# Add devices from this user
if 'deviceData' in wellnuo_config:
for device in wellnuo_config['deviceData']:
# Check if we've already added this device ID
dev_id = str(device.get('well_id', ''))
if dev_id and dev_id not in device_ids_seen:
all_devices.append(device)
device_ids_seen.add(dev_id)
# Add the combined deployment data
combined_wellnuo_config += " // Combined deployment data from all users\n"
combined_wellnuo_config += " deploymentData: "
combined_wellnuo_config += json.dumps(all_deployments, indent=4).replace('"', "'") + ",\n\n"
# Add the combined device data
combined_wellnuo_config += " // Combined device data from all users\n"
combined_wellnuo_config += " deviceData: "
combined_wellnuo_config += json.dumps(all_devices, indent=4).replace('"', "'") + "\n"
combined_wellnuo_config += "};\n"
# Debug the combined WellNuo config
print("=" * 50)
print("COMBINED WELLNUO CONFIG PREVIEW:")
print("-" * 50)
print(combined_wellnuo_config[:500] + "..." if len(combined_wellnuo_config) > 500 else combined_wellnuo_config)
print("-" * 50)
print(f"Config contains credentials for {len([u for u in user_flow_data if 'wellnuo_config' in u and 'user_name' in u['wellnuo_config'] and 'token' in u['wellnuo_config']])} users")
print(f"Combined configuration contains {len(all_deployments)} deployments and {len(all_devices)} devices")
print("=" * 50)
# Write the combined WellNuo config
with open(os.path.join(wellnuo_config_dir, 'wellnuo_config.js'), 'w') as f:
f.write(combined_wellnuo_config)
# Write other combined configuration files
with open(os.path.join(output_dir, "common_flow.json"), 'w') as f:
json.dump(combined_flows, f, indent=4)
with open(os.path.join(output_dir, ".config.nodes.json"), 'w') as f:
json.dump(combined_nodes_config, f, indent=4)
with open(os.path.join(output_dir, ".config.users.json"), 'w') as f:
json.dump(combined_users_config, f, indent=4)
with open(os.path.join(output_dir, ".config.runtime.json"), 'w') as f:
json.dump(combined_runtime_config, f, indent=4)
# Prepare credentials file
creds_output = {
"credentials": combined_credentials,
"$": "creds" # Standard Node-RED credential format
}
with open(os.path.join(output_dir, "common_flow_cred.json"), 'w') as f:
json.dump(creds_output, f, indent=4)
# Copy the pre-created patched device-sensor-in.js for multi-user support
main_instance_node_path = os.path.join(output_dir, 'node_modules', 'node-red-contrib-wellnuo')
os.makedirs(main_instance_node_path, exist_ok=True)
## Copy the pre-created patched file
#patched_file_path = '/home/ubuntu/.node-red/patched-device-sensor-in.js'
#if os.path.exists(patched_file_path):
#with open(patched_file_path, 'r') as src, \
#open(os.path.join(main_instance_node_path, 'device-sensor-in.js'), 'w') as dst:
#dst.write(src.read())
#print("Copied pre-created patched device-sensor-in.js file")
#else:
#print("Warning: Pre-created patched file not found at", patched_file_path)
# Copy HTML file for the node
orig_html_path = '/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo/device-sensor-in.html'
if os.path.exists(orig_html_path):
with open(orig_html_path, 'r') as src, \
open(os.path.join(main_instance_node_path, 'device-sensor-in.html'), 'w') as dst:
dst.write(src.read())
# Create package.json for the node module
package_json = {
"name": "node-red-contrib-wellnuo",
"version": "1.0.0",
"description": "Modified WellNuo nodes for multi-user deployment",
"node-red": {
"nodes": {
"device-sensor-in": "device-sensor-in.js"
}
}
}
with open(os.path.join(main_instance_node_path, 'package.json'), 'w') as f:
json.dump(package_json, f, indent=4)
return os.path.join(output_dir, "common_flow.json")
# Add near other imports if needed
# import subprocess (already imported)
# from pathlib import Path (already imported)
# --- Add this NEW function ---
def install_package_in_user(username, package_name):
"""
Installs or links a specific package back into a user's node_modules.
Prioritizes linking custom nodes, otherwise installs from registry.
"""
user_dir = USERS_DIR / username
user_node_modules = user_dir / 'node_modules'
target_path = user_node_modules / package_name
logger.info(f"Attempting to restore package '{package_name}' for user '{username}'.")
print(f"Restoring package '{package_name}' for user '{username}'.")
# Check if it's a custom node first
custom_node_path = CUSTOM_NODES_DIR / package_name
is_custom = False
if custom_node_path.is_dir() and (custom_node_path / 'package.json').exists():
is_custom = True
logger.info(f"'{package_name}' identified as a custom node.")
# Remove existing remnants in user dir if they exist (e.g., broken link/dir)
if target_path.exists() or target_path.is_symlink():
logger.warning(f"Removing existing item at {target_path} before restoring.")
try:
if target_path.is_dir() and not target_path.is_symlink():
shutil.rmtree(target_path)
else:
target_path.unlink(missing_ok=True) # Remove file or link
except OSError as rm_err:
logger.error(f"Failed to remove existing item {target_path} for user {username}: {rm_err}")
# Might still proceed with install/link attempt
# Perform link for custom node or install for others
success = False
cmd = []
if is_custom:
# Create link within user's node_modules
# Assumes global link exists via 'sudo npm link' in custom node dir
cmd = ['npm', 'link', package_name]
operation_desc = f"link custom node {package_name} for user {username}"
else:
# Install latest from registry (or specific version if known? Keep simple for now)
# Note: This might install a different version than the user originally had!
logger.warning(f"Restoring non-custom package '{package_name}' for user '{username}' from registry (latest).")
cmd = ['npm', 'install', package_name, '--no-save', '--no-audit', '--no-fund', '--omit=dev']
operation_desc = f"install latest {package_name} for user {username}"
# Run the command inside the USER'S directory
logger.info(f"Running command: {' '.join(cmd)} in {user_dir}")
try:
# Don't use the main run_npm_command helper as it works in main_instance and uses temp cache
result = subprocess.run(
cmd,
cwd=user_dir, # *** Run in the specific user's directory ***
check=True,
capture_output=True,
text=True,
timeout=300
)
logger.info(f"Successfully executed {operation_desc}: {result.stdout[:200]}...")
success = True
except subprocess.CalledProcessError as e:
logger.error(f"FAILED to execute {operation_desc}. Error: {e.stderr}")
print(f"ERROR restoring package {package_name} for user {username}.")
except Exception as e:
logger.error(f"Unexpected error during {operation_desc}: {e}", exc_info=True)
print(f"ERROR restoring package {package_name} for user {username}.")
return success
def deploy_combined_flow(flow_file, node_red_url="http://localhost:1999",
credentials_file=None, admin_user=None, admin_password=None):
"""
Deploy the combined flow to the main Node-RED instance with proper authentication
Args:
flow_file: Path to the combined flow file
node_red_url: URL of the main Node-RED instance
credentials_file: Path to the credentials file
admin_user: Admin username
admin_password: Admin password
Returns:
Boolean indicating success
"""
import requests
from requests.auth import HTTPBasicAuth
# Load flow file
with open(flow_file, 'r') as f:
flows = json.load(f)
# Use Node-RED Admin API to deploy
headers = {
"Content-Type": "application/json",
"Node-RED-Deployment-Type": "full"
}
# Note: Try with different authentication methods
try:
print(f"Deploying flow to {node_red_url}/flows")
print(f"Admin credentials: {admin_user} / {admin_password[:2]}***")
# First attempt: Standard Basic Auth
response = requests.post(
f"{node_red_url}/flows",
json=flows,
headers=headers,
auth=HTTPBasicAuth(admin_user, admin_password)
)
# Check if successful
if response.status_code == 200 or response.status_code == 204:
print(f"Successfully deployed flow using standard auth")
return True
# If that fails, try with token-based auth
if response.status_code == 401:
print(f"Standard auth failed, trying token auth")
# Get a JWT token first
auth_response = requests.post(
f"{node_red_url}/auth/token",
json={"username": admin_user, "password": admin_password},
headers={"Content-Type": "application/json"}
)
if auth_response.status_code == 200:
token = auth_response.json().get("token")
# Use the token for authentication
token_headers = {
"Content-Type": "application/json",
"Node-RED-Deployment-Type": "full",
"Authorization": f"Bearer {token}"
}
token_response = requests.post(
f"{node_red_url}/flows",
json=flows,
headers=token_headers
)
if token_response.status_code == 200:
print(f"Successfully deployed flow using token auth")
return True
else:
print(f"Token auth failed with status {token_response.status_code}: {token_response.text}")
else:
print(f"Failed to get auth token: {auth_response.status_code}: {auth_response.text}")
# If all methods fail, report the error
print(f"All authentication methods failed. Status code: {response.status_code}")
print(f"Response: {response.text}")
return False
except Exception as e:
print(f"Error during deployment: {e}")
return False
def restart_node_red_service(service_name="node-red.service"):
"""Attempts to restart the Node-RED systemd service."""
print(f"Attempting to restart Node-RED service: {service_name}")
try:
# Using systemctl requires appropriate permissions.
# Consider configuring passwordless sudo for this specific command for the 'ubuntu' user via visudo,
# OR run well-alerts.py itself as root (less recommended).
cmd = ["sudo", "systemctl", "restart", service_name]
result = subprocess.run(
cmd,
check=True,
capture_output=True,
text=True,
timeout=60 # 1 minute timeout for restart command
)
logger.info(f"systemctl restart {service_name} successful.")
print(f"Successfully signaled {service_name} to restart.")
# Add a small delay to allow the service to fully stop and start
time.sleep(10) # Wait 10 seconds (adjust as needed)
# Optional: Check status after delay
status_cmd = ["sudo", "systemctl", "is-active", service_name]
status_result = subprocess.run(status_cmd, capture_output=True, text=True)
if status_result.stdout.strip() == "active":
print(f"{service_name} is active after restart.")
return True
else:
print(f"WARNING: {service_name} not reported active after restart attempt. Status: {status_result.stdout.strip()}")
logger.warning(f"{service_name} not reported active after restart attempt. Status: {status_result.stdout.strip()}")
return False # Indicate potential issue
except subprocess.CalledProcessError as e:
logger.error(f"Failed to restart {service_name}: {e.stderr}")
print(f"ERROR: Failed to restart {service_name}. Check logs and permissions.")
return False
except FileNotFoundError:
logger.error(f"Failed to restart {service_name}: 'sudo' or 'systemctl' command not found.")
print(f"ERROR: Failed to restart {service_name}: command not found.")
return False
except Exception as e_restart:
logger.error(f"An unexpected error occurred during {service_name} restart: {e_restart}")
print(f"ERROR: An unexpected error occurred during {service_name} restart.")
return False
def task_a(arg):
print(f"task_a {arg}")
def task_b(arg):
print(f"task_b {arg}")
def string_to_uuid(input_string, namespace=uuid.NAMESPACE_DNS):
"""
Convert a string to a UUID using uuid5 (SHA-1 hash)
Args:
input_string: The string to convert
namespace: UUID namespace (default: NAMESPACE_DNS)
Returns:
A UUID object
"""
return uuid.uuid5(namespace, input_string)
class EmailSender:
def __init__(self, gmail_user, gmail_password, max_workers=5):
"""
Initialize the email sender with your Gmail credentials.
Args:
gmail_user (str): Your Gmail email address
gmail_password (str): Your Gmail app password (not your regular password)
max_workers (int): Maximum number of worker threads
"""
self.gmail_user = gmail_user
self.gmail_password = gmail_password
self.max_workers = max_workers
self.email_queue = queue.Queue()
self.workers = []
self.running = False
def start_workers(self):
"""Start the worker threads to process the email queue."""
self.running = True
for _ in range(self.max_workers):
worker = threading.Thread(target=self._email_worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def stop_workers(self):
"""Stop all worker threads."""
self.running = False
for worker in self.workers:
if worker.is_alive():
worker.join(timeout=2.0)
self.workers = []
def _email_worker(self):
"""Worker thread that processes emails from the queue."""
while self.running:
try:
# Get an email job from the queue with a timeout
email_job = self.email_queue.get(timeout=1.0)
if email_job:
recipient, message, subject = email_job
self._send_single_email(recipient, message, subject)
self.email_queue.task_done()
except queue.Empty:
# Queue is empty, just continue and check running status
continue
except Exception as e:
print(f"Error in email worker: {e}")
# Still mark the task as done even if it failed
try:
self.email_queue.task_done()
except:
pass
def _send_single_email(self, to_email, text_content, subject):
"""
Send a single email using Gmail SMTP.
Args:
to_email (str): Recipient email address
text_content (str): Email message content
"""
try:
# Create the email message
msg = MIMEMultipart()
msg['From'] = self.gmail_user
msg['To'] = to_email
msg['Subject'] = subject
# Attach the text content
msg.attach(MIMEText(text_content, 'plain'))
print("Connecting to SMTP server...")
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
print("Connected to server")
print("Attempting login...")
server.login(self.gmail_user, self.gmail_password)
print("Login successful")
print("Sending message...")
server.send_message(msg)
print("Message sent")
server.quit()
print(f"Email sent successfully to {to_email}")
except Exception as e:
print(f"Failed to send email to {to_email}: {e}")
traceback.print_exc() # This will print the detailed error stack
def SendEmailTo(self, email_address, text_str, subject):
"""
Queue an email to be sent by a worker thread.
Args:
email_address (str): Recipient email address
text_str (str): Email message content
"""
# Add the email to the queue
if email_address[0] != "_" and email_address[0] != "-":
self.email_queue.put((email_address, text_str, subject))
def SendMessageTo(user_id, text_str):
print(user_id, text_str)
def normalize_phone_number(phone_number_str: str) -> str:
if not phone_number_str: return ""
cleaned_number = "".join(filter(lambda char: char.isdigit() or char == '+', phone_number_str))
if not cleaned_number.startswith('+'):
if len(cleaned_number) == 10: #assume it is US number if exactly 10 digits long
cleaned_number = '+1' + cleaned_number
elif cleaned_number.startswith('1') and len(cleaned_number) >= 11:
cleaned_number = '+' + cleaned_number
else:
cleaned_number = '+' + cleaned_number
return cleaned_number
# --- Telnyx SMS Sending Function ---
def setup_telnyx_sms_client():
try:
telnyx.api_key = TELNYX_API_KEY
logger.info("Telnyx client for SMS configured.")
return True
except Exception as e:
logger.error(f"Failed to configure Telnyx client for SMS: {e}")
return False
def send_telnyx_sms(recipient_phone: str, message_body: str, caller_id_type: str = "auto") -> bool:
"""Sends an SMS using the Telnyx API, with dynamic 'from_' based on caller_id_type."""
if not setup_telnyx_sms_client():
return False
if not recipient_phone or not message_body:
logger.error("Cannot send Telnyx SMS: Recipient phone and message are required.")
return False
#recipient_phone = normalize_phone_number(recipient_phone) #done outside already
from_id = TELNYX_SENDER_ID # Default to numeric
#return
if caller_id_type == "alpha":
if TELNYX_SENDER_ID_ALPHA:
from_id = TELNYX_SENDER_ID_ALPHA
else:
logger.warning("SMS Caller ID type 'alpha' requested, but TELNYX_SENDER_ID_ALPHA is not set. Falling back to numeric.")
elif caller_id_type == "numeric":
from_id = TELNYX_SENDER_ID
elif caller_id_type == "auto":
if recipient_phone.startswith("+1"): # US/Canada
from_id = TELNYX_SENDER_ID
elif TELNYX_SENDER_ID_ALPHA: # Other international, try Alpha if available
from_id = TELNYX_SENDER_ID_ALPHA
else: # Fallback to numeric if Alpha not set for international
logger.warning("SMS Caller ID type 'auto' for non-US/Canada destination, but TELNYX_SENDER_ID_ALPHA not set. Using numeric.")
from_id = TELNYX_SENDER_ID
else: # Should not happen with argparse choices
logger.warning(f"Invalid caller_id_type '{caller_id_type}' for SMS. Defaulting to numeric.")
from_id = TELNYX_SENDER_ID
logger.info(f"Attempting to send Telnyx SMS from '{from_id}' to '{recipient_phone}'")
try:
message_create_params = {
"from_": from_id,
"to": recipient_phone,
"text": message_body,
"messaging_profile_id": TELNYX_MESSAGING_PROFILE_ID,
"type": "sms"
}
if not message_create_params["messaging_profile_id"]:
del message_create_params["messaging_profile_id"]
response = telnyx.Message.create(**message_create_params)
logger.info(f"SMS submitted successfully. Message ID: {response.id}")
return True
except telnyx.error.TelnyxError as e:
logger.error(f"Telnyx API Error sending SMS to {recipient_phone} from '{from_id}': {e}")
if hasattr(e, 'json_body') and e.json_body and 'errors' in e.json_body:
for err in e.json_body['errors']:
logger.error(f" - Code: {err.get('code')}, Title: {err.get('title')}, Detail: {err.get('detail')}")
return False
except Exception as e:
logger.error(f"Unexpected error sending Telnyx SMS to {recipient_phone}: {e}")
return False
def SendSMSTo(phone_nr, text_str):
if phone_nr == None or phone_nr == 'None':
return
if phone_nr[0] == "-" or phone_nr[0] == "_":
return
if len(text_str) < 1:
return
phone_nr = normalize_phone_number(phone_nr)
if phone_nr[0] == "+":
sms_sent = send_telnyx_sms(recipient_phone=phone_nr, message_body=text_str, caller_id_type="auto")
if sms_sent:
logger.info(f"Telnyx SMS sent.")
return True
else:
logger.error(f"Failed to send SMS.")
return False
def text2mp3(voice_library, text_to_speak, file_to_save, wait=True, timeout=60):
"""
Submit a TTS job and optionally wait for the result.
Args:
voice_library: Model name to use
text_to_speak: Text to convert to speech
file_to_save: Target filename (will be placed in the results dir)
wait: Whether to wait for the job to complete
timeout: Maximum time to wait in seconds
Returns:
dict: Job result information
"""
# Generate a unique job ID
job_id = str(uuid.uuid4())
# Create the job file
job = {
'job_id': job_id,
'voice_library': voice_library,
'text_to_speak': text_to_speak,
'file_to_save': file_to_save
}
job_file = os.path.join(JOBS_DIR, f"{job_id}.job")
with open(job_file, 'w') as f:
json.dump(job, f)
if not wait:
return {'job_id': job_id, 'pending': True}
# Wait for the result
result_file = os.path.join(RESULTS_DIR, f"{job_id}.result")
start_time = time.time()
while not os.path.exists(result_file):
if time.time() - start_time > timeout:
return {
'job_id': job_id,
'success': False,
'error': 'Job timed out'
}
time.sleep(0.5)
# Read the result
with open(result_file, 'r') as f:
result = json.load(f)
# Clean up result file
os.remove(result_file)
return result
def PrepareMP3(text_to_speak):
#todo add multi-language support
#tts_model = "tts_models/en/ljspeech/tacotron2-DDC"
tts_model = "tts_models/en/vctk/vits"
my_uuid = str(string_to_uuid(text_to_speak+tts_model))
voice_file_long_source = os.path.join(RESULTS_DIR, f"{my_uuid}.mp3")
voice_file_long = os.path.join(RESULTS_SHARE_DIR, f"{my_uuid}.mp3")
if not os.path.exists(voice_file_long):
result = text2mp3(
tts_model,
text_to_speak,
my_uuid+".mp3"
)
if result.get('success'):
print(f"TTS generated successfully: {result.get('file_path')}")
destination_path = RESULTS_SHARE_DIR
shutil.copy2(voice_file_long_source, destination_path)
else:
print(f"TTS generation failed: {result.get('error')}")
return ""
return my_uuid
def SendPhoneCall(phone_nr, text_str):
phone_nr = normalize_phone_number(phone_nr)
#TELNYX_WEBHOOK_URL_VOICE = "http://eluxnetworks.net:1998/telnyx-webhook"
uuid_str = PrepareMP3(text_str)
if uuid_str != "":
# Headers
headers = {
"Authorization": f"Bearer {TELNYX_API_KEY}",
"Content-Type": "application/json"
}
# Request payload
payload = {
"to": phone_nr,
"from": TELNYX_SENDER_ID,
"connection_id": TELNYX_CONNECTION_ID_VOICE,
"webhook_url": TELNYX_WEBHOOK_URL_VOICE,
"webhook_url_method": "POST",
"answering_machine_detection": "disabled",
"custom_headers": [
{
"name": "X-Audio-Url",
"value": "https://eluxnetworks.net/shared/clips/" + uuid_str + ".mp3"
}
]
}
# Make the API call
print(f"posting call:\n {payload}")
response = requests.post(TELNYX_VOICE_URL, headers=headers, json=payload)
# Print the response
print(response.status_code)
print(response.json())
print("Call voice")
def SendAlerts(deployment_id, method, text_str, subject, user_name):
global sender
if user_name == "" or user_name == None: #real request so send to all caretakers
care_takers = care_takers_all[int(deployment_id)]
for user_details in care_takers:
if method.upper() == "MSG":
destination = user_details[0]
SendMessageTo(user_details[0], text_str)
if method.upper() == "EMAIL":
destination = user_details[1]
sender.SendEmailTo(user_details[1], text_str, subject)
if method.upper() == "SMS":
destination = user_details[2]
success = SendSMSTo(user_details[2], text_str)
#result is logged, so for now just ignore
if method.upper() == "PHONE":
destination = user_details[2]
SendPhoneCall(user_details[2], text_str)
if False:
logger.info(f"Sending Alert to {destination} via {method} Content:{text_str}")
message_dict = {}
message_dict["deployment_id"] = deployment_id
message_dict["method"] = method
message_dict["receipient"] = destination
message_dict["text_str"] = text_str
SendToQueue(RABBITMQ_ALERTS_QNAME, message_dict)
else:
#send to logged user only
user_details = GetUserDetails(user_name)[0]
if method.upper() == "MSG":
destination = user_details[0]
SendMessageTo(destination, text_str)
if method.upper() == "EMAIL":
destination = user_details[3]
sender.SendEmailTo(destination, text_str, subject)
if method.upper() == "SMS":
destination = user_details[14]
success = SendSMSTo(destination, text_str)
#result is logged, so for now just ignore
if method.upper() == "PHONE":
destination = user_details[14]
SendPhoneCall(destination, text_str)
def set_character(some_string, bit_nr, new_char):
"""
Replace a character in a string at position bit_nr from the right.
Parameters:
some_string (str): The input string
bit_nr (int): Position from right (0 = rightmost, 1 = second from right, etc.)
new_char (str): The replacement character
Returns:
str: The modified string
"""
if bit_nr < 0 or bit_nr >= len(some_string):
return some_string # Invalid position
# Convert string to list for easier manipulation
chars = list(some_string)
# Replace character at position bit_nr from right
chars[len(chars) - 1 - bit_nr] = new_char
# Convert back to string
return ''.join(chars)
def ReadCleanStringDB(cur, sql):
cur.execute(sql)
temp_string = cur.fetchone()
if temp_string == None:
return ""
else:
return str(temp_string[0]).strip()
def GetTimeZoneOfDeployment(deployment_id):
global time_zones_all
if deployment_id not in time_zones_all:
time_zone_st = 'America/Los_Angeles'
with get_db_connection() as conn:
with conn.cursor() as cur:
sqlr = f"SELECT time_zone_s from public.deployments WHERE deployment_id ={deployment_id}"
time_zone_st = ReadCleanStringDB(cur, sqlr)
time_zones_all[deployment_id] = time_zone_st
else:
time_zone_st = time_zones_all[deployment_id]
return time_zone_st
def StoreLastSentToRedis(deployment_id):
alarms_settings = alarms_settings_all[deployment_id]
alarms_settings["last_triggered_utc"] = datetime.datetime.utcnow()
alarm_deployment_settings_str = json.dumps(alarms_settings)
redis_conn.set('alarm_deployment_settings_'+str(deployment_id), alarm_deployment_settings_str)
def StoreDeviceAlarmsToRedis(device_id, device_alarm_settings):
alarm_device_settings_str = json.dumps(device_alarm_settings)
redis_conn.set('alarm_device_settings_'+str(device_id), alarm_device_settings_str)
def SetupTasks():
global local_daily_minute_last
print("SetupTasks")
stt = time.time()
#This will determine current state of the Z-graf based warnings/alarms, and setup future triggers
#This warnings/alarms that are sensor based need to be done in
next_run_in_minutes = 1440 #this will be reduced depending on data
#each time when local daily time becomes smaller than before (passes daily midnight), then all alarm triggeres need to be re-armed as per DB settings.
for deployment_id in deployment_id_list:
if deployment_id in alarms_settings_all:
time_zone_s = GetTimeZoneOfDeployment(deployment_id)
local_timezone = pytz.timezone(time_zone_s)
local_time = datetime.datetime.now(local_timezone)
deployment_id_s = str(deployment_id)
#at this point we want to read settings from redis and not local memory, since settings could change by user
#redis_conn.set('alarm_device_settings_'+device_id_s, alarm_device_settings_str)
alarm_deployment_settings_str = GetRedisString('alarm_deployment_settings_'+deployment_id_s)
if alarm_deployment_settings_str == None:
alarms_settings = alarms_settings_all[deployment_id]
alarm_deployment_settings_str = json.dumps(alarms_settings)
redis_conn.set('alarm_deployment_settings_'+deployment_id_s, alarm_deployment_settings_str)
else:
alarms_settings = json.loads(alarm_deployment_settings_str)
if "armed" in alarms_settings:
alarm_armed_settings_str = alarms_settings["armed"]
else:
alarm_armed_settings_str = "000"
if "last_triggered_utc" in alarms_settings:
last_triggered_utc = alarms_settings["last_triggered_utc"]
else:
last_triggered_utc = 0
if "rearm_policy" in alarms_settings:
rearm_policy = alarms_settings["rearm_policy"]
else:
rearm_policy = "Never"
#alarm_armed_settings_str = GetRedisString(f'alarm_armed_settings_{deployment_id}')
#alert_mode bits:
#0: 0 = default, 1 = set (so I can distinquish between nothing set (0s default) and purposefully set to 0s)
#1: Home Security bit 0 = not used, 1 = alarm armed
#2: Warning level environmental condition (Yellow) Temperatures High/Low
#3: Alarm level environmental condition (Red) Temperatures High/Low
#4: Warning level medical condition (Yellow) Too long present/absent
#5: Alarm level medical condition (Red) Too long present/absent
#6: Alarm if alone too long
utc_time = datetime.datetime.utcnow()
# Calculate minutes since start of day
hours = local_time.hour
minutes = local_time.minute
local_daily_minute = (hours * 60) + minutes
user_first_last_name = GetBeneficiaryFromDeployment(deployment_id)
#check if re-arming should happen
#Sent message to user is tied to Benficiary, not receivers
do_rearm = False
if rearm_policy == "At midnight":
if local_daily_minute < local_daily_minute_last:
do_rearm = True
elif rearm_policy == "1H":
if utc_time > (last_triggered_utc + 3600):
do_rearm = True
elif rearm_policy == "6H":
if utc_time > (last_triggered_utc + 6 * 3600):
do_rearm = True
elif rearm_policy == "12H":
if utc_time > (last_triggered_utc + 12 * 3600):
do_rearm = True
elif rearm_policy == "1D":
if utc_time > (last_triggered_utc + 24 * 3600):
do_rearm = True
elif rearm_policy == "2D":
if utc_time > (last_triggered_utc + 48 * 3600):
do_rearm = True
elif rearm_policy == "Never":
pass
if do_rearm:
alarm_armed_settings, alarm_settings_str = GetAlarmSettings(deployment_id)
if len(alarm_settings_str) > 2:
alarm_settings = json.loads(alarm_settings_str)
alarm_settings["armed"] = alarm_armed_settings
alarms_settings_all[deployment_id] = alarm_settings
alarm_armed_settings_str = alarms_settings["armed"]
devices_lst = deployments_devices[deployment_id]
for device_id in devices_lst:
if device_id in device_alerts_all:
device_id_s = str(device_id)
alarm_device_settings_str = GetDeviceAlarmSettings(device_id)
redis_conn.set('alarm_device_settings_'+device_id_s, alarm_device_settings_str)
if alarm_armed_settings_str != "":
if alarm_armed_settings_str[-1] == "1": #used
#print(alarm_armed_settings_str)
if alarm_armed_settings_str[-2] == "0": #alarm not armed, so compare individual conditions
if alarm_armed_settings_str[-5] == "1" or alarm_armed_settings_str[-6] == "1": #Too long present/absent
devices_lst = deployments_devices[deployment_id]
numbers = []
for device_id in devices_lst:
if device_id in device_alerts_all:
since_seen = time.time() - GetRedisFloat('lastseen_'+str(device_id))
numbers.append((since_seen, device_id))
sorted_numbers = numbers
second_smallest = -1
if len(numbers) == 1:
smallest = sorted_numbers[0]
if len(numbers) >= 2:
sorted_numbers = sorted(numbers, key=lambda x: x[0])
smallest = sorted_numbers[0]
second_smallest = sorted_numbers[1]
device_id_to_last_seen = {}
for tpl in sorted_numbers:
last_seen_ago = tpl[0]
device_id = tpl[1]
#device_id_to_last_seen[tpl[1]] = tpl[0]
device_alarm_settings = device_alerts_all[device_id]
enabled_alarms_str = device_alarm_settings["enabled_alarms"]
#lets check larm first, because if satisfied, no need to check for warning
if enabled_alarms_str[-2] == "1": #Too long present alarm
if device_id == smallest[1]: #now present... how long?
if (second_smallest[0] - smallest[0]) > device_alarm_settings["stuck_minutes_alarm"] * 60:
#cancel alarm and warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 0, "0")
enabled_alarms_str = set_character(enabled_alarms_str, 1, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["stuck_alarm_method_1"]
if method.upper() != "PHONE":
SendAlerts(deployment_id, method, f"Alarm: {user_first_last_name} way too long ({int((second_smallest[0] - smallest[0]) / 60)} minutes) in {location}", "", "")
else:
SendAlerts(deployment_id, method, f"Alarm: {user_first_last_name} way too long in {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
StoreLastSentToRedis(deployment_id)
device_alerts_all[device_id] = device_alarm_settings
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
else:
check_after = device_alarm_settings["stuck_minutes_alarm"] - int((second_smallest[0] - smallest[0])/60)
if check_after < next_run_in_minutes:
next_run_in_minutes = check_after
elif enabled_alarms_str[-1] == "1": #Too long present warning
if (second_smallest[0] - smallest[0]) > device_alarm_settings["stuck_minutes_warning"] * 60:
#cancel warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 0, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["stuck_warning_method_0"]
if method.upper() != "PHONE":
SendAlerts(deployment_id, method, f"Warning: {user_first_last_name} too long ({int((second_smallest[0] - smallest[0]) / 60)} minutes) in {location}", "", "")
else:
SendAlerts(deployment_id, method, f"Warning: {user_first_last_name} too long in {location}", "", "")
StoreLastSentToRedis(deployment_id)
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
else:
check_after = device_alarm_settings["stuck_minutes_warning"] - int((second_smallest[0] - smallest[0])/60)
if check_after < next_run_in_minutes:
next_run_in_minutes = check_after
if enabled_alarms_str[-4] == "1": #Too long absent alarm
if last_seen_ago > device_alarm_settings["absent_minutes_alarm"] * 60:
enabled_alarms_str = set_character(enabled_alarms_str, 3, "0")
enabled_alarms_str = set_character(enabled_alarms_str, 2, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["absent_alarm_method_3"]
if method.upper() != "PHONE":
SendAlerts(deployment_id, method, f"Alarm: Way too long since {user_first_last_name} visited {location} ({int(last_seen_ago / 60)} minutes)", "", "")
else:
SendAlerts(deployment_id, method, f"Alarm: Way too long since {user_first_last_name} visited {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
else:
check_after = device_alarm_settings["absent_minutes_alarm"] - int(last_seen_ago/60)
if check_after < next_run_in_minutes:
next_run_in_minutes = check_after
if enabled_alarms_str[-3] == "1": #Too long absent alarm
if last_seen_ago > device_alarm_settings["absent_minutes_warning"] * 60:
enabled_alarms_str = set_character(enabled_alarms_str, 2, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["absent_warning_method_2"]
if method.upper() != "PHONE":
SendAlerts(deployment_id, method, f"Warning: Too long since {user_first_last_name} visited {location} ({int(last_seen_ago / 60)} minutes)", "", "")
else:
SendAlerts(deployment_id, method, f"Warning: Too long since {user_first_last_name} visited {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
else:
check_after = device_alarm_settings["absent_minutes_warning"] - int(last_seen_ago/60)
if check_after < next_run_in_minutes:
next_run_in_minutes = check_after
#"stuck_warning_method_0": "SMS",
#"stuck_alarm_method_1": "PHONE",
#"absent_warning_method_2": "SMS",
#"absent_alarm_method_3": "PHONE",
#"temperature_high_warning_method_4": "SMS",
#"temperature_high_alarm_method_5": "PHONE",
#"temperature_low_warning_method_6": "SMS",
#"temperature_low_alarm_method_7": "PHONE",
#"radar_alarm_method_8":"MSG",
#"pressure_alarm_method_9":"MSG",
#"light_alarm_method_10":"MSG"
#how to determine when user arived here (time of any other place!)
if alarm_armed_settings_str[-7] == "1": #Too long alone
pass #todo
if next_run_in_minutes > 1:
next_run_in_minutes = 1
schedule_task(SetupTasks, "", next_run_in_minutes)
print(time.time()-stt)
# --- Main Monitoring Loop ---
def monitor_devices():
"""Main monitoring loop that checks devices periodically."""
logger.info("Starting monitoring loop...")
while not stop_event.is_set():
start_time = time.monotonic()
#logger.info("Starting monitoring cycle...")
if not devices_config:
logger.warning("No device configurations loaded, skipping monitoring cycle.")
success = load_device_configurations() # Try reloading if empty
if not success:
# Wait longer before retrying if config load fails
stop_event.wait(CHECK_INTERVAL_SECONDS * 5)
continue # Skip to next cycle attempt
current_time_unix = time.time()
#======================== Do node-red instances management ==================================================
hash_data = GetRedisMap('node_red_requests')
#logger.debug(f"node_red_requests: {hash_data}")
requests_count = 0
if hash_data != {}:
requests_count = int(hash_data['requests'])
if requests_count > 0:
logger.debug(f"node_red_requests: {str(hash_data)}")
user_name = hash_data['user_name']
#user needs Node-red. Is his session up and running?
port, pid = IsNRRunning(user_name)
#delete request #this might need switching to queue... todo
redis_conn.hset('node_red_requests', mapping={
'user_name': user_name,
'token': "",
'time': "",
'requests': 0
})
if port == 0 or pid == 0: #not running
token = hash_data['token']
time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
port = GetAvailablePort(user_name)
users_p = GetUserPriviledges(user_name)
ps = users_p[0][1]
SetupUserEnvironment(user_name, token, ps)
pid = StartNodeRed(port, user_name)
#Lets store it to postgres and REDIS
StoreNodeRedUserPortOn(port, user_name, pid, time_s)
redis_conn.hset(f'node_red_status_{user_name}', mapping={
'port': port,
'started_time': time_s,
'last_activity': time_s,
'pid': pid,
})
else:
#time_s = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
time_s = time.time()
redis_conn.hset(f'node_red_status_{user_name}', mapping={
'port': port,
'started_time': time_s,
'last_activity': time.time(),
'pid': pid
})
hash_data = GetRedisMap('node-red_deployed')
#logger.debug(f"node_red_requests: {hash_data}")
requests_count = 0
if hash_data != {}:
requests_count = int(hash_data['requests'])
if requests_count > 0:
logger.debug(f"node-red_deployed: {str(hash_data)}")
user_name = hash_data['user_name']
#delete request #this might need switching to queue... todo
redis_conn.hset('node-red_deployed', mapping={
'user_name': user_name,
'token': "",
'time': "",
'requests': 0
})
sync_success = False
changes_were_made = False # Flag to track if sync modified files
signal_file = Path("/tmp/node-red-sync-changes.signal") # Define path again
try:
# Use sys.executable to ensure using the same python interpreter
print("Starting node synchronization...")
sync_script_path = "/home/ubuntu/.node-red/sync_nodes.py"
# Add error checking
result = subprocess.run(
[sys.executable, sync_script_path],
check=True, # Raise CalledProcessError if script exits with non-zero code
capture_output=True,
text=True,
timeout=600 # Add a generous timeout (10 minutes?) for npm operations
)
print("Node synchronization script output:\n", result.stdout)
if result.stderr: print("Node synchronization script stderr:\n", result.stderr)
print("Node synchronization completed successfully.")
sync_success = True
if signal_file.exists():
changes_were_made = True
logger.info("Sync script indicated changes were made (signal file found).")
try:
signal_file.unlink() # Consume the signal
logger.debug("Removed signal file.")
except OSError as e:
logger.warning(f"Could not remove signal file {signal_file}: {e}")
else:
logger.info("Sync script indicated no changes were made (signal file not found).")
print("No node changes detected by sync script.")
except FileNotFoundError:
print(f"ERROR: Sync script not found at {sync_script_path}")
# Decide how to handle: exit, skip deployment, etc.
#return # Or raise an exception
except subprocess.CalledProcessError as e:
print(f"ERROR: Node synchronization script failed with exit code {e.returncode}")
print(f"Stderr:\n{e.stderr}")
print(f"Stdout:\n{e.stdout}")
# Decide how to handle: exit, skip deployment, etc.
#return # Or raise an exception
except subprocess.TimeoutExpired:
print(f"ERROR: Node synchronization script timed out.")
# Decide how to handle
#return # Or raise an exception
except Exception as e_sync:
print(f"ERROR: An unexpected error occurred while running sync script: {e_sync}")
#return # Or raise
# --- 2. Restart Node-RED Service IF Sync Succeeded AND Changes Were Made ---
restart_successful = True # Default to true if no restart needed
if sync_success and changes_were_made: # Only restart if sync OK AND changes happened
print("Node changes detected, restarting Node-RED service...")
restart_successful = restart_node_red_service("node-red.service")
if not restart_successful:
print("ERROR: Node-RED service failed to restart properly after changes. Aborting deployment.")
return # Exit processing this request
elif sync_success and not changes_were_made:
print("No node changes required restart.")
else:
# Sync failed, don't restart
print("Skipping Node-RED restart due to sync script failure.")
# --- 3. Combine and Deploy Flows (only if sync didn't fail AND restart was ok/not needed) ---
if sync_success and restart_successful: # Proceed if sync ran ok and restart (if needed) was ok
print("Proceeding with flow combination and deployment...")
# ... (rest of combine and deploy logic) ...
else:
print("Skipping flow deployment due to previous errors during sync or restart.")
# Get list of user directories
base_dir = "/home/ubuntu/.node-red/users"
user_dirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir)
if os.path.isdir(os.path.join(base_dir, d))]
# Combine flows
output_dir = "/home/ubuntu/.node-red/main_instance"
flow_file = combine_user_flows(user_dirs, output_dir)
# Deploy combined flow
creds_file = os.path.join(output_dir, "common_flow_cred.json")
success = deploy_combined_flow(
flow_file,
node_red_url="https://eluxnetworks.net:1999",
credentials_file=creds_file,
admin_user=ADMIN_USER,
admin_password=ADMIN_PASSWORD
)
#print(f"Combined flow created at: {combined_file}")
#lets check if any node-reds are stale
pattern = 'node_red_status_*'
threshold = time.time() - 10
# Find all matching keys
matching_keys = []
cursor = 0
while True:
cursor, keys = redis_conn.scan(cursor=cursor, match=pattern, count=100)
for key in keys:
key_str = key.decode('utf-8') # Convert bytes to string
# Get the last_activity value
last_activity = redis_conn.hget(key_str, 'last_activity').decode('utf-8')
pid = 0
try:
pid = int(redis_conn.hget(key_str, 'pid'))
if pid > 0:
if last_activity:
last_activity_value = int(float(last_activity))
# Check if it's below the threshold
print(last_activity_value-threshold)
if last_activity_value < threshold:
#if True:
if key_str not in permament_users:
matching_keys.append(key_str)
except:
pass
# Exit the loop when scan is complete
if cursor == 0:
break
# Print the result
if len(matching_keys) > 0:
print(f"Found {len(matching_keys)} keys with last_activity < {threshold}:")
for key in matching_keys:
print(key)
user_name = key[len("node_red_status_"):]
time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
port, pid = GetNodeRedDetails(user_name)
result = StopNodeRed(pid)
#Lets store it to postgres and REDIS
StoreNodeRedUserPortOff(port, user_name, time_s)
redis_conn.hset(f'node_red_status_{user_name}', mapping={
'port': 0,
'started_time': time_s,
'last_activity': time_s,
'pid': 0,
})
print(result)
# --- Cycle Cleanup & Wait ---
elapsed_time = time.monotonic() - start_time
#logger.info(f"Monitoring cycle finished in {elapsed_time:.2f} seconds.")
wait_time = CHECK_INTERVAL_SECONDS - elapsed_time
if wait_time > 0:
#logger.debug(f"Waiting for {wait_time:.2f} seconds before next cycle.")
# Use stop_event.wait() for graceful shutdown during sleep
stop_event.wait(wait_time)
else:
logger.warning("Monitoring cycle took longer than the configured interval.")
logger.info("Monitoring loop stopped.")
# --- Signal Handling ---
def signal_handler(signum, frame):
"""Handles termination signals gracefully."""
logger.warning(f"Received signal {signal.Signals(signum).name}. Initiating graceful shutdown...")
stop_event.set()
t1.cancel()
t2.cancel()
for job_id in job_registry:
function_name = job_registry[job_id]['function_name']
argument = job_registry[job_id]['argument']
task_key = f"{function_name}_{argument}"
# Remove from task registry if this is the current job for this task/arg
if task_key in task_arg_registry and task_arg_registry[task_key] == job_id:
del task_arg_registry[task_key]
# Remove the job from the scheduler
scheduler.remove_job(job_id)
logger.info(f"Cancelled job '{job_id}' with argument '{job_registry[job_id]['argument']}'")
def FahrenheitToCelsius(F):
if isinstance(F, str):
F = float(F)
C = (F - 32) * 5/9
return C
def ProcessQueue():
#here we are looking for alarm conditions in data
global in_queue
if in_queue:
while in_queue:
try:
tim, topic, messagein = in_queue.pop(0)
mac = topic[1:]
if mac in mac_to_device_id:
device_id = mac_to_device_id[mac]
deployment_id = device_to_deployment[device_id]
if deployment_id in alarms_settings_all:
alarms_settings = alarms_settings_all[deployment_id]
alarm_armed_settings_str = alarms_settings["armed"]
#0: 0 = default, 1 = set (so I can distinquish between nothing set (0s default) and purposefully set to 0s)
#1: Home Security bit 0 = not used, 1 = alarm armed
#2: Warning level environmental condition (Yellow) Temperatures High/Low
#3: Alarm level environmental condition (Red) Temperatures High/Low
#4: Warning level medical condition (Yellow) Too long present/absent
#5: Alarm level medical condition (Red) Too long present/absent
#6: Alarm if alone too long
temp_offset = -16.0
if alarm_armed_settings_str[-1] == "1": #used
message_dict = json.loads(messagein.decode('utf-8'))
#print(alarm_armed_settings_str)
if alarm_armed_settings_str[-2] == "0": #alarm not armed, so compare individual conditions
if alarm_armed_settings_str[-3] == "1" or alarm_armed_settings_str[-4] == "1": #Temperatures Too High/Low
if "temperature" in message_dict:
temperature = message_dict["temperature"] + temp_offset
#at this point temperature is in C
if temperature > 0 and temperature < 100: #ignore others
device_alarm_settings = device_alerts_all[device_id]
enabled_alarms_str = device_alarm_settings["enabled_alarms"]
'''
{
"enabled_alarms":"000000000101",
"armed_states":"000000000000",
"stuck_minutes_warning":"771.3",
"stuck_warning_method_0":"SMS",
"stuck_minutes_alarm":600,
"stuck_alarm_method_1":"PHONE",
"absent_minutes_warning":"-1013.4",
"absent_warning_method_2":"SMS",
"absent_minutes_alarm":30,
"absent_alarm_method_3":"PHONE",
"temperature_high_warning":"85",
"temperature_high_warning_method_4":
"SMS","temperature_high_alarm":"95",
"temperature_high_alarm_method_5":"PHONE",
"temperature_low_warning":"60",
"temperature_low_warning_method_6":"SMS",
"temperature_low_alarm":"50",
"temperature_low_alarm_method_7":"PHONE",
"radar_alarm_method_8":"MSG",
"pressure_alarm_method_9":"MSG",
"light_alarm_method_10":"MSG",
"smell_alarm_method_11":"EMAIL",
"rearm_policy":"At midnight"
}
'''
#"stuck_warning_method_0": "SMS", -1
#"stuck_alarm_method_1": "PHONE", -2
#"absent_warning_method_2": "SMS", -3
#"absent_alarm_method_3": "PHONE", -4
#"temperature_high_warning_method_4": "SMS", -5
#"temperature_high_alarm_method_5": "PHONE", -6
#"temperature_low_warning_method_6": "SMS", -7
#"temperature_low_alarm_method_7": "PHONE", -8
#"radar_alarm_method_8":"MSG", -9
#"pressure_alarm_method_9":"MSG", -10
#"light_alarm_method_10":"MSG" -11
#"smell_alarm_method_11":"MSG" -12
if enabled_alarms_str[-6] == "1": #Temperatures Too High Alarm!
if temperature > FahrenheitToCelsius(device_alarm_settings["temperature_high_alarm"]):
#cancel alarm and warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 5, "0")
enabled_alarms_str = set_character(enabled_alarms_str, 4, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["temperature_high_alarm_method_5"]
SendAlerts(deployment_id, method, f"Alarm @ {first_last_name}: Temperature too high! ({temperature} C) in {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
elif enabled_alarms_str[-5] == "1": #Temperatures Too High Warning!
if temperature > FahrenheitToCelsius(device_alarm_settings["temperature_high_warning"]):
#cancel alarm and warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 4, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["temperature_high_warning_method_4"]
SendAlerts(deployment_id, method, f"Warning @ {first_last_name}: Temperature too high! ({temperature} C) in {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
if enabled_alarms_str[-8] == "1": #Temperatures Too Low Alarm!
if temperature < FahrenheitToCelsius(device_alarm_settings["temperature_low_alarm"]):
#cancel alarm and warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 7, "0")
enabled_alarms_str = set_character(enabled_alarms_str, 6, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["temperature_low_alarm_method_7"]
SendAlerts(deployment_id, method, f"Alarm @ {first_last_name} Temperature too low! ({temperature} C) in {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
elif enabled_alarms_str[-7] == "1": #Temperatures Too Low Warning!
if temperature < FahrenheitToCelsius(device_alarm_settings["temperature_low_warning"]):
#cancel alarm and warning, until re-armed
enabled_alarms_str = set_character(enabled_alarms_str, 6, "0")
dev_det = devices_details_map[device_id]
location = (dev_det[4] + " " + dev_det[5].strip()).strip()
method = device_alarm_settings["temperature_low_warning_method_6"]
SendAlerts(deployment_id, method, f"Warning @ {first_last_name} Temperature too low! ({temperature} C) in {location}", "", "")
device_alarm_settings["enabled_alarms"] = enabled_alarms_str
device_alerts_all[device_id] = device_alarm_settings
StoreLastSentToRedis(deployment_id)
StoreDeviceAlarmsToRedis(device_id, device_alarm_settings)
logger.info(f"{tim}, {mac}, {temperature}")
else: #radar packet
pass
else: #alarm is armed
if "radar" in message_dict:
radar = message_dict["radar"]
else:
pass #alarm not setup for this device
#print(f"{deployment_id} not in {alarms_settings_all}")
else:
logger.error(f"MAC: {mac} not part of any deployment")
except Exception as e:
logger.error(f"Error: {str(e)} {traceback.format_exc()}")
def CheckMessageSends():
requests_count = 0
# Check if queue exists and has items. These items are from manual GUI interactions in alerts page
queue_length = redis_conn.llen('send_requests')
if queue_length == 0:
return 0
print(f"Processing {queue_length} messages from queue...")
# Process each item
for i in range(queue_length):
item_json = redis_conn.rpop('send_requests')
if item_json is None:
break
try:
record = json.loads(item_json)
requests_count += 1
# Print the record
print(f"Request #{requests_count}:")
for key, value in record.items():
print(f" {key}: {value}")
method = record["method"]
location_str = record["location"]
location = location_str.split("_")[2]
deployment_id = record["deployment_id"]
content = record["content"]
feature = record["feature"]
enabledCellContent = record["enabledCellContent"]
currentUnits = record["currentUnits"]
currentAlertTableMode = record["currentAlertTableMode"] #Warning/Alarm
test_only = record["test_only"]
#action = record["action"]
user_name = record["user_name"]
user_first_last_name = GetBeneficiaryFromDeployment(deployment_id)
if feature == "stuck":
msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} is spending more than {enabledCellContent} in {location}"
elif feature == "absent":
msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} did not visit {location} in more than {enabledCellContent[1:-1]} {currentUnits}"
elif feature == "tempLow":
msg_ext = f"{currentAlertTableMode}: {content} temperature is lower then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}"
elif feature == "tempHigh":
msg_ext = f"{currentAlertTableMode}: {content} temperature is higher then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}"
elif feature == "pressure":
msg_ext = f"{currentAlertTableMode}: {content} door was opened or closed in the {location} at {user_first_last_name}"
elif feature == "radar":
msg_ext = f"{currentAlertTableMode}: {content} motion detected in the {location} at {user_first_last_name}"
else:
msg_ext = f"{currentAlertTableMode}: {content} {feature} in {location} at {user_first_last_name}"
SendAlerts(deployment_id, method, msg_ext, "Test message", user_name)
#these are testing messages, so do not count them as real triggered... so do not update in REDIS
#StoreLastSentToRedis(deployment_id)
print("-" * 40)
except json.JSONDecodeError as e:
logger.error(f"Failed to parse JSON from queue item: {e}")
continue
print(f"Total requests processed: {requests_count}")
return requests_count
# --- Main Execution ---
if __name__ == "__main__":
logger.info(f"Starting Well Alert Monitoring Service (PID: {os.getpid()})...")
logger.info(f"Alert Timeout Threshold: {TIME_OUT_THRESHOLD_MIN} minutes")
logger.info(f"Check Interval: {CHECK_INTERVAL_SECONDS} seconds")
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
#UpdateACL("artia", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFydGlhIiwiZXhwIjoxNzQ0NzY4NDYzfQ.-DFXbAeXTpKt4a-rGAPzOqcV6HBho0264qIOfZ3QdZM")
#SetupUserEnvironment("artia", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFydGlhIiwiZXhwIjoxNzQ0NzcyNjQ2fQ.cu5CUWBn_6cwj5hidP61vRlr-GZwi08bgxeRFxiU2fI")
if not load_device_configurations():
logger.critical("Failed to load initial device configurations. Exiting.")
# Clean up potentially established connections
sys.exit(1)
t1 = perpetualTimer(1, ensure_mqtt_connection)
t2 = perpetualTimer(.01, ProcessQueue)
SetupTasks()
t1.start()
t2.start()
schedule_task(task_a, "first", 1)
schedule_task(task_b, "second", 1)
musr = os.getenv('GU')
#print(musr)
sender = EmailSender(os.getenv('GU'), os.getenv('GP'), max_workers=3)
sender.start_workers()
#SendAlerts(21, "MSG", f"Test: User way too long ({120} minutes) in Bathroom", "")
SendAlerts(21, "EMAIL", f"well-alert was started", "Program started", "robster")
#SendAlerts(21, "SMS", f"Test: User way too long ({120} minutes) in Bathroom", "")
#SendAlerts(21, "PHONE", f"Test: User way too long ({120} minutes) in Bathroom", "")
#SendPhoneCall("4086462191", "Hi Robert. How are you?", "")
#SendPhoneCall("4085505424", "Hi Fred. How are you? Are you hungry?", "")
#SendPhoneCall("4087055709", "Hi Bernhard. How are you? Are you hungry?", "")
#SendPhoneCall("4086903883", "Hi Sasha. How are you? Are you hungry?", "")
#SendPhoneCall("6507968313", "Hi Julica. How are you? Are you hungry?", "") #NOT WORKING!!!
#SendPhoneCall("+385981919229", "Hi Danko. Are you hungry?", "")
#SendPhoneCall("4086462191", "Hi Danko. How are you? Your mom seems stable.", "")
#success = SendSMSTo("(408)646-2191", "This is only a test")
try:
# Start the main monitoring loop
monitor_thread = threading.Thread(target=monitor_devices, daemon=True)
monitor_thread.start()
# Keep the main thread alive until stop_event is set by signal or error
while not stop_event.is_set():
# Can add periodic health checks here if needed
CheckMessageSends()
time.sleep(1) # Check stop_event periodically
logger.info("Stop event received, waiting for monitoring thread to finish...")
monitor_thread.join(timeout=CHECK_INTERVAL_SECONDS + 10) # Wait for thread with timeout
if monitor_thread.is_alive():
logger.warning("Monitoring thread did not finish cleanly.")
except Exception as e:
logger.exception(f"An unexpected critical error occurred in main execution: {e}")
finally:
logger.info("Well Alert Monitoring Service finished.")
sys.exit(0) # Explicitly exit with success code after cleanup
print("Last line")