#!/usr/bin/env python3 #this maintains 2 mqtt connections #1 to wellnua (to talk and listen to devices) #2 to eluxnetworks.net import os import time import signal import sys import threading from threading import Timer, Lock import json import logging import datetime from datetime import timezone, timedelta import time import psycopg2 import redis from ast import literal_eval from kombu import Connection, Exchange #, Queue, Consumer, Message #from kombu.exceptions import TimeoutError from dotenv import load_dotenv import uuid import subprocess import shutil import psutil import requests import copy import re import ast import random import traceback #import math from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from pathlib import Path from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor import paho.mqtt.client as mqtt import inspect import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import queue import telnyx # For SMS import pytz import struct #from datetime import datetime, timedelta executors = { 'default': ThreadPoolExecutor(20), # Allow up to 20 concurrent tasks 'processpool': ProcessPoolExecutor(5) # 5 parallel processes for CPU-bound tasks } scheduler = BackgroundScheduler(executors=executors) scheduler.start() # Store job IDs for later reference job_registry = {} task_arg_registry = {} time_zones_all = {} # --- Configuration Loading --- # Load environment variables from $HOME/.env dotenv_path = os.path.join(os.environ.get("HOME", "."), '.env') #load_dotenv(dotenv_path=dotenv_path) load_dotenv('.env_rz') # Logging Configuration default_uuid = str(uuid.uuid4()) logger = logging.getLogger(default_uuid) logger.propagate = False logger.setLevel(logging.DEBUG) #DEBUG, INFO, WARNING, ERROR, and CRITICAL handler = logging.StreamHandler() #formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') formatter = logging.Formatter('%(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) #permament_users = ["node_red_status_brankol", "node_red_status_robster", "node_red_status_bknigge"] permament_users = [] filesDir = "/home/app/well_web_storage" # Database Configuration DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') DB_PASSWORD = os.getenv('DB_PASSWORD') DB_HOST = os.getenv('DB_HOST') DB_PORT = os.getenv('DB_PORT', '5432') ADMIN_USER = os.getenv('ADMIN_USER') ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD') UNLOCK = os.getenv('UNLOCK') # Mosquitto files MOSQUITTO_PASSWORD_FILE = os.getenv('MOSQUITTO_PASSWORD_FILE') MOSQUITTO_ACL_FILE = os.getenv('MOSQUITTO_ACL_FILE') # Redis Configuration REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PORT = int(os.getenv('REDIS_PORT')) REDIS_DB = int(os.getenv('REDIS_DB')) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None) # RabbitMQ Configuration RABBITMQ_URL = os.getenv('RABBITMQ_URL') RABBITMQ_ALERTS_QNAME = os.getenv('RABBITMQ_ALERTS_QNAME') exchange = Exchange("", type='direct') #MQTT Configuration client = None connected = False in_queue = [] MQTTSERVER = os.getenv('MQTTSERVER', "") #"mqtt.eluxnetworks.net" mqtt_port = int(os.getenv('MQTT_PORT', "1883")) # TLS port mqtt_client_id = "client777aaa" MQTT_USER=os.getenv('MQTT_USER', "") MQTT_PASSWORD=os.getenv('MQTT_PASSWORD', "") use_tls = MQTTSERVER not in ["localhost", "127.0.0.1", "192.168.68.70"] #MQTT Master Configuration (wellnua mqtt!) RECEIVED_COUNTER = 0 clientM = None connectedM = False in_queueM = [] MQTTSERVERM = os.getenv('MQTTSERVERM', "") mqtt_portM = int(os.getenv('MQTT_PORTM', "8883")) # TLS port mqtt_client_idM = "client90a-authn-ID" MQTT_USERM=os.getenv('MQTT_USERM', "") MQTT_PASSWORDM=os.getenv('MQTT_PASSWORDM', "") use_tlsM = True CONNECTION_STATE = 0 #0=idle, 1=connected, 2=lost connection, 3=connection requested # Alerting Thresholds DEFAULT_TIMEOUT_MINUTES = 48 * 60 # Default 48 hours in minutes TIME_OUT_THRESHOLD_MIN = int(os.getenv('time_out_threshold', DEFAULT_TIMEOUT_MINUTES)) TIME_OUT_THRESHOLD_SEC = TIME_OUT_THRESHOLD_MIN * 60 TIME_TO_GET_OUT_SEC = 5#60 TIME_BETWEEN_ALERTS_SEC = 12 * 60 * 60 #12 hours BASE_DIR = "/home/ubuntu/.node-red" MAIN_INSTANCE_DIR = f"{BASE_DIR}/main_instance" USERS_DIR = f"{BASE_DIR}/users" SHARED_NODES_DIR = f"{BASE_DIR}/shared_nodes" TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY") TELNYX_MESSAGING_PROFILE_ID = os.environ.get("TELNYX_MESSAGING_PROFILE_ID") TELNYX_CONNECTION_ID_VOICE = os.environ.get("TELNYX_CONNECTION_ID_VOICE") TELNYX_WEBHOOK_URL_VOICE = os.environ.get("TELNYX_WEBHOOK_URL_VOICE") TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID")# (for numeric SMS sender) # TELNYX_SENDER_ID_ALPHA is needed if 'alpha' or 'auto' for non-US SMS is used TELNYX_SENDER_ID_ALPHA = os.environ.get("TELNYX_SENDER_ID_ALPHA")# not set (optional for SMS if only numeric is used or for US destinations in auto mode).") TELNYX_VOICE_URL = os.getenv("TELNYX_VOICE_URL") JOBS_DIR = '/home/ubuntu/tts_jobs' RESULTS_DIR = '/home/ubuntu/tts_results' RESULTS_SHARE_DIR = '/mnt/data/shared/clips' for directory in [MAIN_INSTANCE_DIR, USERS_DIR, SHARED_NODES_DIR]: if not os.path.exists(directory): os.makedirs(directory) logger.info(f"Created directory: {directory}") # Monitoring Interval CHECK_INTERVAL_SECONDS = 1 local_daily_minute_last = 0 reload_needed = False # --- Global Variables --- db_conn = None redis_conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB) rabbit_conn = None rabbit_channel = None stop_event = threading.Event() devices_config = [] # Dictionary to store device config {device_id: {config}} location_names = {-1:"All",0:"?",5:"Office",6:"Hallway",7:"Garage",8:"Outside",9:"Conference Room",10:"Room",34:"Kitchen", 56:"Bedroom",78:"Living Room",102:"Bathroom",103:"Dining Room",104:"Bathroom Main",105:"Bathroom Guest", 106:"Bedroom Master", 107:"Bedroom Guest", 108:"Conference Room", 109:"Basement", 110:"Attic", 200:"Other"} location_indexes = {} for i in location_names: location_indexes[location_names[i]] = i alarms_settings_all = {} device_alerts_all = {} care_takers_all = {} times_found_armed = {} device_to_deployment = {} deployment_id_list = [] deployments_devices = {} mac_to_device_id = {} devices_details_map = {} def schedule_task(function, argument, minutes_from_now, job_id=None): """ Schedule a function to run with a specific argument after X minutes If a job with the same function and argument is already scheduled: - Only replace it if the new schedule is EARLIER than the existing one - Ignore the new schedule if it's later than the existing one Returns the job_id or None if scheduling was ignored """ run_time = datetime.datetime.now() + timedelta(minutes=minutes_from_now) # Create a key based on function name and argument to track related jobs task_key = f"{function.__name__}_{argument}" # Check if a job with the same function and argument is already scheduled if task_key in task_arg_registry: existing_job_id = task_arg_registry[task_key] existing_job_time = job_registry[existing_job_id]['scheduled_time'] # Only replace if the new time is earlier than the existing time if run_time >= existing_job_time: logger.info(f"Ignoring new schedule for {task_key} - existing schedule is earlier or same ({existing_job_time})") return None else: # Cancel the existing job because we have an earlier time cancel_task(existing_job_id) logger.info(f"Replacing existing schedule for {task_key} with earlier time ({run_time})") # If no job_id provided, create one based on time and argument if job_id is None: job_id = f"job_{task_key}_{datetime.datetime.now().timestamp()}" # Check if the function accepts arguments sig = inspect.signature(function) param_count = len(sig.parameters) # Schedule the job with or without arguments based on function signature if param_count > 0: # Function accepts parameters, pass the argument job = scheduler.add_job( function, 'date', run_date=run_time, args=[argument], id=job_id, misfire_grace_time=900 ) else: # Function doesn't accept parameters, don't pass any arguments job = scheduler.add_job( function, 'date', run_date=run_time, id=job_id, misfire_grace_time=900 ) # Store in both registries job_registry[job_id] = { 'argument': argument, 'function_name': function.__name__, 'scheduled_time': run_time, 'job': job } # Store the job ID for this function/argument combo task_arg_registry[task_key] = job_id logger.info(f"Scheduled job '{job_id}' with argument '{argument}' to run at {run_time}") return job_id def cancel_task(job_id): """ Cancel a scheduled task by its job_id """ if job_id in job_registry: function_name = job_registry[job_id]['function_name'] argument = job_registry[job_id]['argument'] task_key = f"{function_name}_{argument}" # Remove from task registry if this is the current job for this task/arg if task_key in task_arg_registry and task_arg_registry[task_key] == job_id: del task_arg_registry[task_key] # Remove the job from the scheduler scheduler.remove_job(job_id) logger.info(f"Cancelled job '{job_id}' with argument '{job_registry[job_id]['argument']}'") # Remove from job registry del job_registry[job_id] return True else: logger.error(f"Job '{job_id}' not found") return False def list_scheduled_tasks(): """ List all currently scheduled tasks """ logger.info("\nCurrently scheduled tasks:") for job_id, details in job_registry.items(): time_remaining = (details['scheduled_time'] - datetime.now()).total_seconds() / 60 logger.info(f"Job: {job_id}, Function: {details['function_name']}, Argument: {details['argument']}, " f"Running in: {time_remaining:.1f} minutes") class perpetualTimer(): def __init__(self, t, hFunction): self.t = t self.hFunction = hFunction self.thread = None self.is_running = False self.lock = Lock() def _run(self): with self.lock: if not self.is_running: return self.hFunction() with self.lock: if self.is_running: self.thread = Timer(self.t, self._run) self.thread.start() def start(self): with self.lock: if not self.is_running: self.is_running = True self.thread = Timer(self.t, self._run) self.thread.start() def stop(self): with self.lock: self.is_running = False if self.thread: self.thread.cancel() def pause(self): self.stop() def resume(self): self.start() def is_active(self): with self.lock: return self.is_running def MQSend_worker(topic, content, retain=False): """Worker function that handles MQTT sending in a separate thread""" global client, mqtt_client_id, connected #currentTime = int(time.time()) #print(">", currentTime, topic, content) # Limit connection attempts to prevent flooding max_connection_attempts = 3 connection_attempts = 0 # Use a local semaphore to prevent multiple simultaneous connection attempts if not hasattr(MQSend_worker, 'connection_in_progress'): MQSend_worker.connection_in_progress = False try: # Only attempt connection if not already in progress if client is None or not connected: if not MQSend_worker.connection_in_progress and connection_attempts < max_connection_attempts: MQSend_worker.connection_in_progress = True try: print(f"Thread {threading.current_thread().name} attempting MQTT connection") client = connect_mqtt(mqtt_client_id) connection_attempts += 1 finally: MQSend_worker.connection_in_progress = False # If we have a valid connection, try to send if connected and client is not None: try: if retain: result = client.publish(topic, content, qos=1, retain=retain) # Higher QoS for better throughput NOT! else: result = client.publish(topic, content, qos=1, retain=retain) # Lower QoS for better throughput if hasattr(result, 'rc') and result.rc == mqtt.MQTT_ERR_SUCCESS: pass print(f"Successfully published to {topic}") else: print(f"Failed to publish to {topic}") except Exception as err: print(f"Error in MQSend publish: {str(err)}") else: print("MQTT not connected, message not sent") except Exception as err: print(f"Error in MQSend_worker: {str(err)}") def on_message(clientt, userdata, msg): global in_queue #print(msg.topic+" "+str(msg.payload)) #msga = msg.payload.decode("ascii") #print(msg.timestamp, msg.topic, msg.payload) in_queue.append((str(time.time()), msg.topic, msg.payload)) def MQSendM(topic, content, retain=False): global clientM currentTime = int(time.time()) logger.info(f"> {currentTime} , {topic}, {content}") try: enc_msg = content if retain: clientM.publish(topic, enc_msg, qos=2, retain=retain) else: clientM.publish(topic, enc_msg, qos=1, retain=retain) except Exception as err: print("Err2B:"+ str(err)) # The callback for when the client receives a CONNACK response from the server. def on_connectM(clientt, userdata, flags, rc, properties=None): global CONNECTION_STATE, in_queueM logger.info(f"Connected to {MQTTSERVERM} with result code {str(rc)}") #subbedTo = [("/w",1),("wellnuotopics/topic1",1), ("/well_hub",1)] subbedTo = [("/well_hub",1)] #GLog("subscribing to /w and /w_enc") logger.info(f"MQTTM subscribing to {subbedTo}") clientt.subscribe(subbedTo) CONNECTION_STATE = 1 in_queueM = [] def on_disconnectM(clientt, userdata, rc): global CONNECTION_STATE if rc != 0: if True: #CONNECTION_STATE != 2: logger.info(f"MQTTM Found disconnected.") CONNECTION_STATE = 2 #lost connection def on_messageM(client_, userdata, msg): #message from GUI global in_packets_count, in_queueM, RECEIVED_COUNTER RECEIVED_COUNTER += 1 if len(msg.payload) > 0: logger.info(f"<<<<< {msg.payload}") in_queueM.append([msg.topic,msg.payload]) def connect_mqtt(client_id): global client, connected # Add timeout for the entire operation connection_timeout = 5 # 5 seconds max for the whole connection attempt start_time = time.time() try: random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6)) unique_client_id = f"{random_suffix}-{int(time.time())}" logger.info(f"Connecting {unique_client_id}") # Determine certificate path based on environment certificate_path = 'new-ca.crt' mqtt_client = mqtt.Client(client_id=unique_client_id, protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2) # Define callbacks with MQTTv5 signature def on_connect_wrapper(client, userdata, flags, rc, properties=None): global connected if rc == 0: connected = True logger.info(f"Connected to {MQTTSERVER} with {unique_client_id}") subbedTo = [("#",1)] logger.info(f"subscribing to , {subbedTo}") client.subscribe(subbedTo) else: logger.error(f"Failed to connect to {MQTTSERVER} with {unique_client_id} return code {rc}") connected = False def on_disconnect_wrapper(client, userdata, flags, reason_code, properties): global connected connected = False logger.info(f"Disconnected with result code {reason_code}") # Set the callbacks mqtt_client.on_connect = on_connect_wrapper mqtt_client.on_disconnect = on_disconnect_wrapper mqtt_client.on_message = on_message if use_tls: certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt' mqtt_client.tls_set(ca_certs=certificate_path) # Set TLS with your certificate #mqtt_client.tls_set( # ca_certs=certificate_path, #) # Set username and password mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) # Connect with shorter timeout mqtt_client.connect(MQTTSERVER, mqtt_port, keepalive=30) # Start the loop in a background thread mqtt_client.loop_start() # Wait for connection with timeout wait_end_time = min(start_time + connection_timeout, time.time() + 3) while not connected and time.time() < wait_end_time: time.sleep(0.1) # Check if we connected successfully if connected: logger.info(f"MQTT connected successfully in {time.time() - start_time:.2f} seconds") client = mqtt_client return mqtt_client else: logger.info(f"MQTT connection attempt timed out after {time.time() - start_time:.2f} seconds") mqtt_client.loop_stop() connected = False return None except Exception as e: connection_time = time.time() - start_time logger.error(f"Connection error after {connection_time:.2f} seconds: {e}") connected = False return None def ensure_mqtt_connection(): global client, mqtt_client_id, connected if not connected: logger.info("MQTT connection needed, attempting to connect...") try: if client: try: client.disconnect() client.loop_stop() except: pass # Ignore errors on disconnect client = connect_mqtt(mqtt_client_id) # Give a moment for connection to establish time.sleep(2) return connected except Exception as e: logger.error(f"Reconnection error: {e}") return False return True # Already connected def ensure_mqtt_connectionM(): global clientM, CONNECTION_STATE, RECEIVED_COUNTER random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6)) unique_client_idm = f"{random_suffix}-{int(time.time())}" if CONNECTION_STATE == 0: #clientM = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "client_id8") clientM = mqtt.Client(client_id=unique_client_idm, protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2) clientM.tls_set( ca_certs="wellnua.com.crt", # Path to your new CA certificate ) clientM.username_pw_set(MQTT_USERM, MQTT_PASSWORDM) clientM.on_connect = on_connectM clientM.on_disconnect = on_disconnectM clientM.on_message = on_messageM print(f"MQTTM Connecting to {MQTTSERVERM}...") clientM.connect(MQTTSERVERM, mqtt_portM, keepalive=30) try: clientM.loop_forever() print("MQTTM Stopped listening!") except Exception as e: print(e) time.sleep(3) CONNECTION_STATE = 3 pass elif CONNECTION_STATE == 1: #connected if RECEIVED_COUNTER == 0: print("No new data...") CONNECTION_STATE = 2 elif CONNECTION_STATE == 2: #lost connection print("MQTTM Lost connection. Going to idle...") if clientM.is_connected(): clientM.disconnect() time.sleep(2) #clientM.reconnect CONNECTION_STATE = 0 elif CONNECTION_STATE == 3: #connection requested print("MQTTM Waiting for connection...") class NodePackageHandler(FileSystemEventHandler): """Handles file system events related to Node-RED package installations.""" def __init__(self, source_type, source_path): super().__init__() self.source_type = source_type # 'main' or 'user' self.source_path = source_path self.source_name = os.path.basename(source_path) if source_type == 'user' else 'main' def on_created(self, event): # Only interested in directory creation events in node_modules if not event.is_directory or 'node_modules' not in event.src_path: return # Ignore hidden directories and nested node_modules package_dir = event.src_path package_name = os.path.basename(package_dir) if package_name.startswith('.'): return # Check if this is a top-level node module parent_dir = os.path.dirname(package_dir) if os.path.basename(parent_dir) == 'node_modules': self.sync_package(package_name, package_dir) def on_modified(self, event): # Package.json modifications could indicate a package update if not event.is_directory and os.path.basename(event.src_path) == 'package.json': parent_dir = os.path.dirname(event.src_path) if os.path.basename(os.path.dirname(parent_dir)) == 'node_modules': package_name = os.path.basename(parent_dir) self.sync_package(package_name, parent_dir) def sync_package(self, package_name, package_dir): """Synchronize a Node-RED package when detected.""" try: # Skip node core modules if package_name in ['npm', 'node-red', '@node-red']: return logger.info(f"New or updated package detected: {package_name} in {self.source_type} instance ({self.source_name})") # Get package version package_json_path = os.path.join(package_dir, 'package.json') if not os.path.exists(package_json_path): logger.warning(f"No package.json found for {package_name}, skipping") return with open(package_json_path, 'r') as f: package_data = json.load(f) version = package_data.get('version', 'unknown') # Destination in shared nodes directory shared_package_dir = os.path.join(SHARED_NODES_DIR, 'node_modules', package_name) # Check if package already exists in shared directory if os.path.exists(shared_package_dir): shared_json_path = os.path.join(shared_package_dir, 'package.json') if os.path.exists(shared_json_path): with open(shared_json_path, 'r') as f: shared_data = json.load(f) shared_version = shared_data.get('version', 'unknown') # Skip if versions match if shared_version == version: logger.info(f"Package {package_name}@{version} already exists in shared directory") return # Log version change logger.info(f"Updating {package_name} from {shared_version} to {version} in shared directory") # Remove old version shutil.rmtree(shared_package_dir) else: logger.info(f"Adding new package {package_name}@{version} to shared directory") # Copy package to shared directory os.makedirs(os.path.dirname(shared_package_dir), exist_ok=True) shutil.copytree(package_dir, shared_package_dir) # Set proper permissions subprocess.run(['chmod', '-R', '755', shared_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', shared_package_dir]) # Synchronize to all user instances if from main, or to main if from user if self.source_type == 'main': # Sync to all user instances self._sync_to_users(package_name, shared_package_dir) else: # Sync to main instance self._sync_to_main(package_name, shared_package_dir) logger.info(f"Successfully synchronized {package_name}@{version}") except Exception as e: logger.error(f"Error synchronizing package {package_name}: {str(e)}") def _sync_to_users(self, package_name, source_dir): """Synchronize a package from shared nodes to all user instances.""" for user_dir in os.listdir(USERS_DIR): user_path = os.path.join(USERS_DIR, user_dir) if os.path.isdir(user_path): user_modules_dir = os.path.join(user_path, 'node_modules') user_package_dir = os.path.join(user_modules_dir, package_name) # Create node_modules directory if it doesn't exist os.makedirs(user_modules_dir, exist_ok=True) # Remove old version if it exists if os.path.exists(user_package_dir): shutil.rmtree(user_package_dir) # Copy package shutil.copytree(source_dir, user_package_dir) logger.info(f"Synchronized {package_name} to user {user_dir}") # Set proper permissions subprocess.run(['chmod', '-R', '755', user_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', user_package_dir]) def _sync_to_main(self, package_name, source_dir): """Synchronize a package from shared nodes to main instance.""" main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules') main_package_dir = os.path.join(main_modules_dir, package_name) # Create node_modules directory if it doesn't exist os.makedirs(main_modules_dir, exist_ok=True) # Remove old version if it exists if os.path.exists(main_package_dir): shutil.rmtree(main_package_dir) # Copy package shutil.copytree(source_dir, main_package_dir) logger.info(f"Synchronized {package_name} to main instance") # Set proper permissions subprocess.run(['chmod', '-R', '755', main_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', main_package_dir]) def setup_watchers(): """Set up directory watchers for main and user instances.""" observers = [] # Set up watcher for main instance main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules') if os.path.exists(main_modules_dir): observer = Observer() handler = NodePackageHandler('main', MAIN_INSTANCE_DIR) observer.schedule(handler, main_modules_dir, recursive=True) observers.append(observer) logger.info(f"Watching main instance node_modules directory at {main_modules_dir}") # Set up watchers for user instances for user_dir in os.listdir(USERS_DIR): user_path = os.path.join(USERS_DIR, user_dir) if os.path.isdir(user_path): user_modules_dir = os.path.join(user_path, 'node_modules') if os.path.exists(user_modules_dir): observer = Observer() handler = NodePackageHandler('user', user_path) observer.schedule(handler, user_modules_dir, recursive=True) observers.append(observer) logger.info(f"Watching user instance node_modules directory for {user_dir}") # Start all observers for observer in observers: observer.start() return observers def sync_all_packages(): """Perform a full synchronization of all packages.""" logger.info("Starting full package synchronization") # First, collect all unique packages from main and user instances all_packages = {} # Check main instance main_modules_dir = os.path.join(MAIN_INSTANCE_DIR, 'node_modules') if os.path.exists(main_modules_dir): for package_name in os.listdir(main_modules_dir): if not package_name.startswith('.') and os.path.isdir(os.path.join(main_modules_dir, package_name)): package_dir = os.path.join(main_modules_dir, package_name) package_json = os.path.join(package_dir, 'package.json') if os.path.exists(package_json): with open(package_json, 'r') as f: try: data = json.load(f) version = data.get('version', 'unknown') all_packages[package_name] = {'dir': package_dir, 'version': version, 'source': 'main'} except json.JSONDecodeError: logger.warning(f"Invalid package.json in {package_dir}") # Check user instances for user_dir in os.listdir(USERS_DIR): user_path = os.path.join(USERS_DIR, user_dir) if os.path.isdir(user_path): user_modules_dir = os.path.join(user_path, 'node_modules') if os.path.exists(user_modules_dir): for package_name in os.listdir(user_modules_dir): if not package_name.startswith('.') and os.path.isdir(os.path.join(user_modules_dir, package_name)): package_dir = os.path.join(user_modules_dir, package_name) package_json = os.path.join(package_dir, 'package.json') if os.path.exists(package_json): with open(package_json, 'r') as f: try: data = json.load(f) version = data.get('version', 'unknown') # Only add if not already found or if version is newer if package_name not in all_packages or all_packages[package_name]['version'] < version: all_packages[package_name] = {'dir': package_dir, 'version': version, 'source': f'user-{user_dir}'} except json.JSONDecodeError: logger.warning(f"Invalid package.json in {package_dir}") # Ensure shared nodes directory and node_modules exist shared_modules_dir = os.path.join(SHARED_NODES_DIR, 'node_modules') os.makedirs(shared_modules_dir, exist_ok=True) # Synchronize all packages to shared directory for package_name, info in all_packages.items(): # Skip node core modules if package_name in ['npm', 'node-red', '@node-red']: continue shared_package_dir = os.path.join(shared_modules_dir, package_name) # Check if package already exists in shared directory if os.path.exists(shared_package_dir): shared_json_path = os.path.join(shared_package_dir, 'package.json') if os.path.exists(shared_json_path): with open(shared_json_path, 'r') as f: try: shared_data = json.load(f) shared_version = shared_data.get('version', 'unknown') # Skip if shared version is newer or equal if shared_version >= info['version']: logger.info(f"Keeping existing version {shared_version} of {package_name} in shared directory") continue except json.JSONDecodeError: pass # Copy package to shared directory if os.path.exists(shared_package_dir): shutil.rmtree(shared_package_dir) shutil.copytree(info['dir'], shared_package_dir) logger.info(f"Copied {package_name}@{info['version']} from {info['source']} to shared directory") # Set proper permissions subprocess.run(['chmod', '-R', '755', shared_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', shared_package_dir]) # Now synchronize from shared directory to all instances shared_packages = {} for package_name in os.listdir(shared_modules_dir): package_dir = os.path.join(shared_modules_dir, package_name) if os.path.isdir(package_dir) and not package_name.startswith('.'): shared_packages[package_name] = package_dir # Sync to main instance for package_name, package_dir in shared_packages.items(): main_package_dir = os.path.join(main_modules_dir, package_name) if not os.path.exists(main_package_dir): shutil.copytree(package_dir, main_package_dir) logger.info(f"Synchronized {package_name} to main instance") subprocess.run(['chmod', '-R', '755', main_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', main_package_dir]) # Sync to user instances for user_dir in os.listdir(USERS_DIR): user_path = os.path.join(USERS_DIR, user_dir) if os.path.isdir(user_path): user_modules_dir = os.path.join(user_path, 'node_modules') os.makedirs(user_modules_dir, exist_ok=True) for package_name, package_dir in shared_packages.items(): user_package_dir = os.path.join(user_modules_dir, package_name) if not os.path.exists(user_package_dir): shutil.copytree(package_dir, user_package_dir) logger.info(f"Synchronized {package_name} to user {user_dir}") subprocess.run(['chmod', '-R', '755', user_package_dir]) subprocess.run(['chown', '-R', 'ubuntu:ubuntu', user_package_dir]) logger.info("Full package synchronization completed") def sync_installs(npm_package=None, instance_path=None): """ Function to be triggered on npm installation events. Args: npm_package: Name of the installed npm package (optional) instance_path: Path to the instance where the package was installed (optional) """ logger.info(f"SyncInstalls triggered for {npm_package or 'all packages'} in {instance_path or 'unknown'}") if npm_package and instance_path: # Handle specific package installation source_type = 'main' if MAIN_INSTANCE_DIR in instance_path else 'user' modules_dir = os.path.join(instance_path, 'node_modules') package_dir = os.path.join(modules_dir, npm_package) if os.path.exists(package_dir): handler = NodePackageHandler(source_type, instance_path) handler.sync_package(npm_package, package_dir) else: logger.warning(f"Package directory not found: {package_dir}") else: # Perform full synchronization sync_all_packages() def initialize_sync_system(): """Initialize the node synchronization system.""" logger.info("Initializing Node-RED package synchronization system") # Perform initial synchronization of all packages sync_all_packages() # Set up watchers for future changes observers = setup_watchers() # Return observers so they can be stopped if needed return observers # --- Utility Functions --- def SendToQueue(queue_name, message_dict): # Send the message to the queue with Connection(RABBITMQ_URL) as rabbit_conn: producer = rabbit_conn.Producer() message_str = json.dumps(message_dict) producer.publish( message_str.encode('utf-8'), exchange=exchange, routing_key=queue_name, serializer='json', # You can use JSON as the message format content_type='application/json' ) def get_db_connection(): return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT, sslmode='disable' ) def publish_alert(message_body): """Publishes an alert message to the configured RabbitMQ queue.""" if not setup_rabbitmq(): logger.error("Cannot publish alert: RabbitMQ not connected.") return try: rabbit_channel.basic_publish( exchange='', routing_key=RABBITMQ_ALERTS_QNAME, body=json.dumps(message_body), properties=pika.BasicProperties( delivery_mode=2, # make message persistent )) logger.info(f"Published alert to RabbitMQ queue '{RABBITMQ_ALERTS_QNAME}': {message_body}") except (pika.exceptions.AMQPConnectionError, pika.exceptions.ChannelClosedByBroker, pika.exceptions.StreamLostError) as e: logger.error(f"Failed to publish alert due to connection issue: {e}. Attempting reconnect on next cycle.") # Force close to trigger reconnection attempt next time close_rabbitmq_connection() except Exception as e: logger.error(f"An unexpected error occurred while publishing alert: {e}") close_rabbitmq_connection() # Attempt reconnect def GetRedisInt(key_name): try: result = int(redis_conn.get(key_name).decode('utf-8')) except: result = None return result def GetRedisFloat(key_name): try: result = float(redis_conn.get(key_name).decode('utf-8')) except: result = None return result def GetRedisString(key_name): try: result = redis_conn.get(key_name).decode('utf-8') except: result = None return result def GetRedisMap(key_name): try: result_bytes = redis_conn.hgetall(key_name) result = {k.decode('utf-8'): v.decode('utf-8') for k, v in result_bytes.items()} except: result = {} return result def GetDeviceDetails(cur, deployment_ids, location_id): #ID, Well id, MAC, Last_Message, Location, Description, Deployment macs = [MAC for _, MAC in deployment_ids] #macs = list(deployment_ids.keys()) macs_string_nq = ",".join(macs) macs_string = "'" + "','".join(macs) + "'" if location_id == -1: sql = f""" WITH ordered_macs AS ( SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac, generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position ) SELECT d.* FROM public.devices d JOIN ordered_macs om ON d.device_mac = om.mac::text WHERE device_mac IN ({macs_string}) ORDER BY om.position; """ else: sql = f""" WITH ordered_macs AS ( SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac, generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position ) SELECT d.* FROM public.devices d JOIN ordered_macs om ON d.device_mac = om.mac::text WHERE device_mac IN ({macs_string}) AND location = {location_id} ORDER BY om.position; """ cur.execute(sql) logger.info(sql) devices_ids_records = cur.fetchall() all_details = [] devices_ids_list = [x[0] for x in devices_ids_records] device_ids_string = ",".join(map(str, devices_ids_list)) #sql = f"SELECT device_id, MAX(time) as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) GROUP BY device_id" #to slow sql1 = f"SELECT DISTINCT ON (device_id) device_id, time as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) AND time > now() - INTERVAL '1 day' ORDER BY device_id, time DESC" logger.info(sql1) cur.execute(sql1) devices_times = cur.fetchall()#cur.fetchone() #returns times of last readings found_device_details = {} for device_record in devices_times: device_id, last_message_time = device_record found_device_details[device_id] = last_message_time cnt = 0 for device_table_record in devices_ids_records: if len(devices_times) > 0: if device_id in found_device_details: last_message_time = found_device_details[device_id] last_message_epoch = int(last_message_time.timestamp()) else: try: last_message_time = int(device_table_record[14]) except: last_message_time = 0 last_message_epoch = last_message_time else: last_message_time = 0 last_message_epoch = 0 #print(last_message_epoch) #print(type(last_message_epoch)) device_id = device_table_record[0] MAC = device_table_record[1] well_id = device_table_record[2] if well_id == 230: print(".") description = device_table_record[3] if description == None: description = "" if device_table_record[5] != None: if device_table_record[5] != "": description = description + " Close to " + device_table_record[5] location_id = device_table_record[4] if location_id == None: location_id = 0 alert_details = device_table_record[17] if alert_details == None: alert_details = {}#{'red_sec': 28801, 'yellow_sec': 14401} else: print(alert_details) last_seen = GetRedisFloat(f'lastseen_{device_id}') if last_seen == None: last_seen = 0 if well_id == 230: print("stop") row_data = [device_id, well_id, MAC, last_message_epoch, location_names[location_id], description, deployment_ids[cnt][0], alert_details, last_seen] cnt += 1 all_details.append(row_data) return all_details def GetVisibleDevicesPerLocation(deployments, location): global deployments_devices, device_alerts_all devices_details = [] deployments_devices = {} with get_db_connection() as conn: #list all devices that user has access to if deployments == "-1" or deployments == "0": sql = "SELECT deployment_id, devices FROM public.deployment_details" else: sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})" with conn.cursor() as cur: cur.execute(sql) devices_groups = cur.fetchall()#cur.fetchone() deployment_ids = [] for deployment_id, dev_group in devices_groups: if dev_group != None: if len(dev_group) > 10: if dev_group[0] == "[": macs_group = literal_eval(dev_group) else: macs_group = dev_group.split(',') for MAC in macs_group: deployment_ids.append((deployment_id, MAC)) devices_details = GetDeviceDetails(cur, deployment_ids, location_indexes[location]) for device_details in devices_details: deployment_id = device_details[6] device_id = device_details[0] devices_details_map[device_id] = device_details if len(device_details[7]) > 2: print(device_details[7]) try: device_alerts_all[device_id] = json.loads(device_details[7]) except: pass if deployment_id not in deployments_devices: deployments_devices[deployment_id] = [] deployments_devices[deployment_id].append(device_id) #devices_details.append(devices_detail) return devices_details def GetLastDetected(device_id, field_name, threshold_value): # Example usage: query = GetLastPointQuery(device_id, field_name, threshold_value) print(query) last_detected = None with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(query) result = cur.fetchone() if result != None: last_detected = result[0].timestamp() return last_detected def GetMatchingDevices(privileges, group, deployment, location): global LocationsMap results=[] if privileges != "-1": if deployment == "" or deployment == "0": deployment = privileges privileges_list = privileges.split(',') if deployment != "0": if "," in deployment: deployment = FilterList(deployment, privileges) else: if deployment not in privileges_list: return results else: if deployment == "0": deployment = "-1" devices = GetVisibleDevicesPerLocation(deployment, location) return devices def GetAlarmSettings(deployment_id): sql = f"""SELECT "alarm_details" FROM deployments WHERE deployment_id = {deployment_id};""" with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(sql) alarm_armed_settings = cur.fetchone() if alarm_armed_settings[0] == None: alarm_armed_settings = "{}" else: alarm_armed_settings = alarm_armed_settings[0] print(deployment_id, alarm_armed_settings) return alarm_armed_settings def GetDeviceAlarmSettings(device_id): sql = f"""SELECT "alert_details" FROM devices WHERE device_id = {device_id};""" with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(sql) alert_details = cur.fetchone()[0] if alert_details == None: alert_details = "" #print(device_id, alert_details) return alert_details def GetUsers(): with get_db_connection() as conn: sql = "SELECT user_id, access_to_deployments, email, phone_number FROM public.person_details" result = [] with conn.cursor() as cur: cur.execute(sql) result = cur.fetchall()#cur.fetchone() return result def GetCaretakers(deployment_id, users): caretakers = [] deployment_id_str = str(deployment_id) for access_to in users: if access_to[1] != None: if "," in access_to[1]: access_to_list = access_to[1].split(",") else: access_to_list = [access_to[1]] if deployment_id_str in access_to_list or access_to_list == ['-1']: caretakers.append((access_to[0], access_to[2], access_to[3])) return caretakers def load_device_configurations(): global devices_config """Loads device configurations including alert thresholds from the database.""" #Load id's of all deployed devices deployment_id = "0" group_id = "All" location = "All" devices_config = GetMatchingDevices("-1", group_id, deployment_id, location) for device_detail in devices_config: device_id = device_detail[0] mac_to_device_id[device_detail[2]] = device_detail[0] alarm_device_settings_str = GetDeviceAlarmSettings(device_id) #if alarm_device_settings != "": # print(alarm_device_settings) deployment_id = device_detail[6] #if device_id == 540: # print(device_id) device_to_deployment[device_id] = deployment_id if alarm_device_settings_str != None and alarm_device_settings_str != "": device_alerts_all[device_id] = json.loads(alarm_device_settings_str) else: device_alerts_all[device_id] = {} if deployment_id not in deployment_id_list: deployment_id_list.append(deployment_id) device_id_s = str(device_id) last_seen = device_detail[-1] #this one comes from REDIS! if last_seen == 0: radar_threshold_signal = "s3_max" radar_threshold_value = 10 threshold_details = GetRedisString('radar_threshold'+device_id_s) try: radar_threshold_list = literal_eval(threshold_details) radar_threshold_signal = radar_threshold_list[0] radar_threshold_value = radar_threshold_list[1] except: #key not found so read from DB, and store to key sql = f""" SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s}; """ with get_db_connection() as conn: with conn.cursor() as cur: cur.execute(sql) threshold_details = cur.fetchone()[0] print(device_id_s, threshold_details) radar_threshold_signal = "s3_max" radar_threshold_value = 10 if threshold_details != None: threshold_details_list = literal_eval(threshold_details) radar_threshold_signal = threshold_details_list[0] radar_threshold_value = threshold_details_list[1] redis_conn.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value])) last_seen = GetRedisFloat('lastseen_'+device_id_s) if last_seen == None: #Check from DB last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value) if last_seen == None: last_seen = 0 redis_conn.set('lastseen_'+device_id_s, last_seen) #if alarm_device_settings != "": #do this only if REDIS has no key present! Why? DB reflects all changes! redis_conn.set('alarm_device_settings_'+device_id_s, alarm_device_settings_str) print(device_id_s, last_seen) users = GetUsers() for deployment_id in deployment_id_list: if deployment_id == 21: print("Stop") if deployment_id == 38: print("Stop") alarm_settings_str = GetAlarmSettings(deployment_id) if len(alarm_settings_str) > 2: alarm_settings = json.loads(alarm_settings_str) alarm_armed_settings = alarm_settings['enabled'] alarm_settings["armed_states"] = alarm_armed_settings alarms_settings_all[deployment_id] = alarm_settings #lets reset bit 1, so armed alarm is recognized on start if present... start of what??? #alarm_armed_settings = ClearBit(alarm_armed_settings, 1) #alarms_settings_shadows[deployment_id] = alarm_armed_settings redis_conn.set(f'alarm_settings_{deployment_id}', alarm_settings_str) #lets create map deployment_id > person details care_takers_all[deployment_id] = GetCaretakers(deployment_id, users) for user in users: print(user) logger.info(f"Successfully loaded configuration for {len(devices_config)} devices.") # Optional: Close cursor and commit if any changes were made (unlikely here) # cur.close() # conn.commit() # Not needed for SELECT return True # Indicate success def GetLastPointQuery(device_id, field_name, threshold_value): """ Generate a SQL query to find the last point in radar_readings where the specified field meets the threshold condition. Parameters: device_id (int): The device ID to filter by field_name (str): The field to check, e.g., "s2_max", "s28_min", etc. threshold_value (float): The threshold value to compare against Returns: str: SQL query string """ # Parse the field name to get base field and operation parts = field_name.split('_') base_field = parts[0] operation = parts[1] if len(parts) > 1 else None # Define the field expression based on base_field if base_field == 's28': field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7" elif base_field == 'm08': field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9" else: field_expr = base_field # Define comparison operator based on operation operator = ">" if operation == "max" else "<" # Generate the SQL query query = f"""SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1;""" return query def GetBit(alarm_armed_settings, bit_nr): if bit_nr <0 or bit_nr >5: return False return alarm_armed_settings[5-bit_nr] == "1" def SetBit(alarm_armed_settings, bit_nr): if bit_nr < 0 or bit_nr > 5: return False alarm_list = list(alarm_armed_settings) alarm_list[5-bit_nr] = "1" result = ''.join(alarm_list) return result def ClearBit(alarm_armed_settings, bit_nr): if bit_nr < 0 or bit_nr > 5: return False alarm_list = list(alarm_armed_settings) alarm_list[5-bit_nr] = "0" result = ''.join(alarm_list) return result def replace_and_save(template_file, output_file, replacements): # Read the template file with open(template_file, 'r') as file: content = file.read() # Perform all replacements for placeholder, replacement in replacements: content = content.replace(placeholder, replacement) # Write the modified content to the output file with open(output_file, 'w') as file: file.write(content) def update_acl_file(): """Update the Mosquitto ACL file based on database permissions.""" conn = get_db_connection() if not conn: return False try: with conn.cursor() as cur: # Get all permissions grouped by username cur.execute(""" SELECT user_name, array_agg(topic) as topics, array_agg(permission) as permissions FROM user_topics GROUP BY user_name ORDER BY user_name """) users = cur.fetchall() # Create ACL file content acl_content = [ "# Mosquitto ACL file - Auto-generated by ACL Manager", "# Last updated: " + time.strftime("%Y-%m-%d %H:%M:%S"), "", "# Default rule - deny all access", "pattern read $SYS/#", "" ] # Add user permissions for username, topics, permissions in users: if username != None and user_name != "": acl_content.append(f"user {username}") for i in range(len(topics)): topic = topics[i] perm = permissions[i] if 'r' in perm: acl_content.append(f"topic read {topic}") if 'w' in perm: acl_content.append(f"topic write {topic}") acl_content.append("") # Write directly to the ACL file with open(MOSQUITTO_ACL_FILE, "w") as f: f.write("\n".join(acl_content)) # Touch the trigger file to restart mosquitto with open("/etc/mosquitto/.restart_trigger", "w") as f: f.write(time.strftime("%Y-%m-%d %H:%M:%S")) print("ACL file updated. Mosquitto will restart automatically.") return True except Exception as e: print(f"Error updating ACL file: {e}") return False finally: conn.close() def GetUserPriviledges(username): with get_db_connection() as conn: sql = f"SELECT access_to_deployments, key FROM public.person_details WHERE user_name='{username}'" with conn.cursor() as cur: cur.execute(sql) result = cur.fetchall()#cur.fetchone() if result != None: return result else: return [] def GetUserDetails(username): with get_db_connection() as conn: sql = f"SELECT * FROM public.person_details WHERE user_name='{username}'" with conn.cursor() as cur: cur.execute(sql) result = cur.fetchall()#cur.fetchone() if result != None: return result else: return [] def GetBeneficiaryFromDeployment(deployment_id): with get_db_connection() as conn: sql = f"SELECT beneficiary_id FROM public.deployment_details WHERE deployment_id='{deployment_id}'" with conn.cursor() as cur: cur.execute(sql) result = cur.fetchone() if result != None: beneficiary_id = result[0] sql = f"SELECT first_name, last_name FROM public.person_details WHERE user_id='{beneficiary_id}'" cur.execute(sql) result1 = cur.fetchone() if result1 != None: return result1[0] + " " + result1[1] else: return "" else: return "" def DeleteUserTopics(user_name): with get_db_connection() as conn: with conn.cursor() as cur: cur.execute("DELETE FROM user_topics WHERE user_name = %s", (user_name,)) def GetVisibleDevices(deployments): devices_details = [] stt = time.time() with get_db_connection() as conn: #list all devices that user has access to if deployments == "-1": sql = "SELECT deployment_id, devices FROM public.deployment_details" else: sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})" with conn.cursor() as cur: print(sql) cur.execute(sql) devices_groups = cur.fetchall()#cur.fetchone() deployment_ids = [] for deployment_id, dev_group in devices_groups: if dev_group != None: if len(dev_group) > 10: if "[" not in dev_group: if "," not in dev_group: dev_group = '["' + dev_group + '"]' else: dev_group = dev_group.replace(" ", "") dev_group = dev_group.replace(",", '","') dev_group = '["' + dev_group + '"]' macs_group = literal_eval(dev_group) for MAC in macs_group: deployment_ids.append((deployment_id, MAC)) else: print(f"Deployment {deployment_id} has dev_group empty") devices_details = GetDeviceDetails(cur, deployment_ids, -1) #devices_details.append(devices_detail) return devices_details def add_user(username, password): """Add a user to the Mosquitto password file.""" try: subprocess.run(["/home/ubuntu/mosquitto_pass_simple.sh", MOSQUITTO_PASSWORD_FILE, username, password], check=True) #subprocess.run(["sudo", "/usr/bin/mosquitto_passwd", "-b", MOSQUITTO_PASSWORD_FILE, username, password], check=True) print(f"User {username} added successfully to Mosquitto password file.") return True except subprocess.CalledProcessError as e: print(f"Error adding user to Mosquitto password file: {e}") return False def delete_user(username): """Delete a user from the Mosquitto password file.""" try: subprocess.run(["sudo", "mosquitto_passwd", "-D", MOSQUITTO_PASSWORD_FILE, username], check=True) print(f"User {username} deleted successfully from Mosquitto password file.") return True except subprocess.CalledProcessError as e: print(f"Error deleting user from Mosquitto password file: {e}") return False def add_topic_permission(username, topic, permission): """Add a topic permission for a user in the database.""" conn = get_db_connection() if not conn: return False try: with conn.cursor() as cur: # Check if the permission already exists cur.execute("SELECT id FROM user_topics WHERE user_name = %s AND topic = %s", (username, topic)) result = cur.fetchone() if result: # Update existing permission cur.execute("UPDATE user_topics SET permission = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s", (permission, result[0])) print(f"Updated permission for {username} on topic {topic} to {permission}") else: # Insert new permission cur.execute("INSERT INTO user_topics (user_name, topic, permission) VALUES (%s, %s, %s)", (username, topic, permission)) print(f"Added permission for {username} on topic {topic}: {permission}") conn.commit() # Update the ACL file return True except Exception as e: print(f"Database error: {e}") conn.rollback() return False finally: conn.close() def delete_topic_permission(username, topic=None): """Delete topic permissions for a user in the database.""" conn = get_db_connection() if not conn: return False try: with conn.cursor() as cur: if topic: # Delete specific topic permission cur.execute("DELETE FROM user_topics WHERE user_name = %s AND topic = %s", (username, topic)) print(f"Deleted permission for {username} on topic {topic}") else: # Delete all topic permissions for user cur.execute("DELETE FROM user_topics WHERE user_name = %s", (username,)) print(f"Deleted all permissions for {username}") conn.commit() # Update the ACL file update_acl_file() return True except Exception as e: print(f"Database error: {e}") conn.rollback() return False finally: conn.close() def UpdateACL(username, token): users_p = GetUserPriviledges(username) privileges = users_p[0][0] p = users_p[0][1] DeleteUserTopics(username) if privileges != None and privileges != 'None': devices = GetVisibleDevices(privileges) if len(devices) > 0: macs_list = [] for device in devices: MAC = device[2] if MAC != None: macs_list.append(MAC) if privileges == "-1": add_user(username, p) add_topic_permission(username, "#", "rw") else: if len(macs_list) > 0: add_user(username, p) #add_user(username+"_", token) for MAC in macs_list: add_topic_permission(username, "/"+MAC, "r") #add_topic_permission(username+"_", "/"+MAC, "r") update_acl_file() def fetch_device_data(user_name, token): # API endpoint url = "https://eluxnetworks.net/function/well-api/api" #url = "http://192.168.68.70/function/well-api/api" # Request headers and data headers = { 'Content-Type': 'application/x-www-form-urlencoded' } payload = { 'user_name': user_name, 'token': token, 'function': 'device_list_4_gui' } # Make the API call response = requests.post(url, headers=headers, data=payload) # Check if the request was successful if response.status_code == 200: return response.json() else: raise Exception(f"API request failed with status code {response.status_code}: {response.text}") def generate_config_file(data, output_path, user_name, p): # Extract deployment data deployments = [] devices = [] wellnuo_configPath = os.path.join(output_path, 'wellnuo_config.js') # Process the API response to extract deployments and devices # Note: You may need to adjust this based on the actual structure of the API response if 'deploymentData' in data: for deployment in data['deploymentData']: deployments.append({'deployment_id': deployment.get('deployment_id'),'name': deployment.get('name')}) if 'deviceData' in data: for device in data['deviceData']: devices.append({'well_id': device.get('well_id'),'mac': device.get('mac'),'room_name': device.get('room_name'),'deployment_id': device.get('deployment_id')}) deployment_lines = [] for d in deployments: deployment_lines.append(f" {{ deployment_id: \"{d['deployment_id']}\", name: \"{d['name']}\" }}") deployment_str = ",\n".join(deployment_lines) device_lines = [] for d in devices: device_lines.append(f" {{ well_id: \"{d['well_id']}\", mac: \"{d['mac']}\", room_name: \"{d['room_name']}\", deployment_id: \"{d['deployment_id']}\" }}") device_str = ",\n".join(device_lines) # Create the configuration content config_content = f"""// User-specific configuration for WellNuo nodes module.exports = {{ MQTT_USERNAME: "{user_name}", MQTT_PASSWORD: "{p}", deploymentData: [ {deployment_str} ], deviceData: [ {device_str} ] }}; """ # Write to file os.makedirs(output_path, exist_ok=True) with open(wellnuo_configPath, 'w') as f: f.write(config_content) print(f"Configuration file has been created at: {wellnuo_configPath}") def create_symbolic_link(source_dir_in, target_dir_in): # Define source and target paths source_dir = os.path.expanduser(source_dir_in) target_dir = os.path.expanduser(target_dir_in) target_link = os.path.join(target_dir, "node-red-contrib-wellnuo") # Check if source directory exists if not os.path.exists(source_dir): print(f"Error: Source directory '{source_dir}' does not exist.") return False # Create target directory if it doesn't exist if not os.path.exists(target_dir): try: os.makedirs(target_dir) print(f"Created directory: {target_dir}") except Exception as e: print(f"Error creating directory '{target_dir}': {e}") return False # Check if the link already exists if os.path.exists(target_link): if os.path.islink(target_link): #print(f"Symbolic link already exists at '{target_link}'.") #choice = input("Do you want to remove it and create a new one? (y/n): ") return True #if choice.lower() == 'y': #try: #os.unlink(target_link) #print(f"Removed existing symbolic link: {target_link}") #except Exception as e: #print(f"Error removing existing link: {e}") #return False #else: #print("Operation cancelled.") #return False else: print(f"Error: '{target_link}' already exists and is not a symbolic link.") return False # Create the symbolic link try: os.symlink(source_dir, target_link) print(f"Successfully created symbolic link from '{source_dir}' to '{target_link}'") return True except Exception as e: print(f"Error creating symbolic link: {e}") return False preload_template_path = os.path.expanduser("/home/ubuntu/.node-red/preload_template.js") if not os.path.exists(preload_template_path): preload_template_content = """// CommonJS wrapper for tslib try { global.jsbn = require('jsbn'); const tslib = require('tslib/tslib.js'); // Use the CommonJS version global.tslib = tslib; module.exports = { tslib, jsbn: global.jsbn }; console.log("Successfully preloaded tslib and jsbn modules"); } catch (error) { console.error("Error preloading modules:", error.message); } """ with open(preload_template_path, "w") as f: f.write(preload_template_content) def SetupUserEnvironment(username, token, ps): """If needed, Set up the Node-RED environment for a new user""" if True: #Lets do it always UpdateACL(username, token) user_dir = os.path.expanduser(f"/home/ubuntu/.node-red/users/{username}") os.makedirs(user_dir, exist_ok=True) # 2. Copy settings template settings_template_file = os.path.expanduser("/home/ubuntu/.node-red/settings_template.js") user_settings_file = os.path.join(user_dir, "settings.js") replace_and_save(settings_template_file, user_settings_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)]) timeout_script_template_file = os.path.expanduser("/home/ubuntu/.node-red/timeout-script_template.js") user_timeout_script_file = os.path.join(user_dir, "timeout-script.js") replace_and_save(timeout_script_template_file, user_timeout_script_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)]) # 3. Prepare user wellnuo-config file userConfigPath = os.path.join(user_dir, 'config') users_p = GetUserPriviledges(username) p = users_p[0][1] api_data = fetch_device_data(username, token) generate_config_file(api_data, userConfigPath, username, p) # 4. Create initial empty flows file if it doesn't exist flows_file = os.path.join(user_dir, "flows.json") if not os.path.exists(flows_file): with open(flows_file, "w") as f: f.write("[]") user_flows_file = os.path.join(user_dir, f"flows_{username}.json") if os.path.exists(user_flows_file): replace_and_save(user_flows_file, flows_file, []) # 5. Register user in activity tracking activity_file = "/tmp/node_red_activity.json" activity_data = {} if os.path.exists(activity_file): with open(activity_file, 'r') as f: try: activity_data = json.load(f) except json.JSONDecodeError: activity_data = {} import time activity_data[username] = time.time() with open(activity_file, 'w') as f: json.dump(activity_data, f) preload_template_file = os.path.expanduser("/home/ubuntu/.node-red/preload_template.js") user_preload_file = os.path.join(user_dir, "preload.js") replace_and_save(preload_template_file, user_preload_file, []) # No replacements needed shared_nodes_dir = os.path.expanduser("/home/ubuntu/.node-red/shared_nodes") if not os.path.exists(os.path.join(shared_nodes_dir, "node-red-contrib-wellnuo")): # Copy from the main node_modules if it exists there src_path = os.path.expanduser("/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo") if os.path.exists(src_path): import shutil os.makedirs(shared_nodes_dir, exist_ok=True) shutil.copytree(src_path, os.path.join(shared_nodes_dir, "node-red-contrib-wellnuo")) print(f"Copied node-red-contrib-wellnuo to shared_nodes directory") # Add to your function after other template processing # 6. Create symbolic link to common Wellnuo Nodes #success = create_symbolic_link("/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo", f"/home/ubuntu/.node-red/users/{username}/node_modules") #if success: #print("Link creation completed successfully.") #else: #print("Link creation failed.") print(f"User environment set up for {username}") return True def StartNodeRed(port, user_name): # Define paths explicitly user_dir = f"/home/ubuntu/.node-red/users/{user_name}" settings_file = f"{user_dir}/settings.js" log_file = f"{user_dir}/node_red.log" # Ensure the user directory exists import os os.makedirs(user_dir, exist_ok=True) # Start Node-RED with explicit paths and verbose mode cmd = f"cd /home/ubuntu/.node-red && node-red -v -p {port} -u {user_dir} -s {settings_file} > {log_file} 2>&1" # Log the command being executed print(f"Starting Node-RED with command: {cmd}") # Start the process process = subprocess.Popen( cmd, shell=True, preexec_fn=os.setpgrp # Create process group ) print(f"Node-RED started with PID: {process.pid}") return process.pid def StopNodeRed(pid): try: os.killpg(pid, signal.SIGTERM) # Kill the process group return f"Node-RED process group {pid} stopped" except ProcessLookupError: return "Process not found, may have already terminated" def GetAvailablePort(user_name): #lets get all active sql = f"SELECT * FROM node_reds WHERE status = 1" port_nr = 0 with get_db_connection() as conn: with conn.cursor() as cur: print(sql) cur.execute(sql) result = cur.fetchall() for port_nr in range(1900,2000): if any(item[1] == port_nr for item in result): next else: break port = port_nr return port def StoreNodeRedUserPortOn(port, user_name, pid, time_s): #lets get all active try: with get_db_connection() as conn: with conn.cursor() as cur: last_activity = time_s status = 1 cur.execute(""" INSERT INTO public.node_reds (user_name, port, last_activity, status, pid) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (user_name) DO UPDATE SET port = EXCLUDED.port, last_activity = EXCLUDED.last_activity, pid = EXCLUDED.pid, status = EXCLUDED.status; """, (user_name, port, last_activity, status, pid)) cur.execute(""" INSERT INTO public.node_reds_usage (user_name, port, time_on, pid) VALUES (%s, %s, %s, %s) """, (user_name, port, last_activity, pid)) return 1 except: return 0 def StoreNodeRedUserPortOff(port, user_name, time_s): pid = 0 try: with get_db_connection() as conn: with conn.cursor() as cur: last_activity = time.time() status = 0 cur.execute(""" INSERT INTO public.node_reds (user_name, port, status, pid) VALUES (%s, %s, %s, %s) ON CONFLICT (user_name) DO UPDATE SET port = EXCLUDED.port, pid = EXCLUDED.pid, status = EXCLUDED.status; """, (user_name, port, status, pid)) cur.execute(""" INSERT INTO public.node_reds_usage (user_name, port, time_off, pid) VALUES (%s, %s, %s, %s) """, (user_name, port, last_activity, pid)) return 1 except: return 0 def ClearNodeRedUserPort(port, user_name): #lets get all active try: with get_db_connection() as conn: with conn.cursor() as cur: last_activity = 0 status = 0 pid = 0 cur.execute(""" INSERT INTO public.node_reds (user_name, port, last_activity, status, pid) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (user_name) DO UPDATE SET port = EXCLUDED.port, last_activity = EXCLUDED.last_activity, pid = EXCLUDED.pid, status = EXCLUDED.status; """, (user_name, port, last_activity, status, pid)) return 1 except: return 0 def IsNRRunning(user_name): port = 0 pid = 0 sql = f"SELECT port, pid FROM node_reds WHERE user_name = '{user_name}' and status = 1 and pid > 0" with get_db_connection() as conn: with conn.cursor() as cur: print(sql) cur.execute(sql) result = cur.fetchone() if result != None: port = result[0] pid = result[1] if pid > 0: try: process = psutil.Process(pid) if not process.is_running(): pid = 0 except psutil.NoSuchProcess: pid = 0 return port, pid def GetNodeRedDetails(user_name): port = 0 sql = f"SELECT port, pid FROM node_reds WHERE user_name = '{user_name}'" with get_db_connection() as conn: with conn.cursor() as cur: print(sql) cur.execute(sql) result = cur.fetchone() if result != None: port = result[0] pid = result[1] return port, pid def combine_user_flows(user_directories, output_dir='/home/ubuntu/.node-red', main_instance_port=1999): """ Combine multiple users' Node-RED configurations into a single deployment with each user's flows contained in their own subflow. """ # Initialize combined data structures combined_flows = [] combined_nodes_config = {} combined_users_config = {"_": {}} combined_runtime_config = {} combined_credentials = {} # Set margins to prevent cutting off left_margin = 100 top_margin = 100 # First pass: collect all user flows user_flow_data = [] for user_dir in user_directories: username = os.path.basename(user_dir) print(f"Processing user: {username}") # Load user's flow file flow_file = os.path.join(user_dir, f"flows_{username}.json") try: with open(flow_file, 'r') as f: user_flows = json.load(f) except Exception as e: print(f"Error loading flow file for {username}: {e}") continue # Prepare storage for tabs and flows analysis user_tabs = {} nodes_by_tab = {} global_config_nodes = [] wellplug_nodes = [] # Store wellplug sensor nodes specifically # First identify tabs and collect global/special nodes for node in user_flows: if not isinstance(node, dict): continue if node.get('type') == 'tab': # This is a flow tab tab_id = node.get('id') if tab_id: user_tabs[tab_id] = node nodes_by_tab[tab_id] = [] elif not node.get('z'): # This is likely a global config node global_config_nodes.append(node) # Collect WellPlug sensor nodes for special handling if node.get('type') == 'device-sensor-in': wellplug_nodes.append(node) # Now collect nodes by tab for node in user_flows: if not isinstance(node, dict): continue tab_id = node.get('z') if tab_id and tab_id in nodes_by_tab: nodes_by_tab[tab_id].append(node) # Calculate dimensions for each tab's content tab_dimensions = {} for tab_id, nodes in nodes_by_tab.items(): if not nodes: continue # Find boundaries of the flow min_x = min((node.get('x', 0) for node in nodes if 'x' in node), default=0) max_x = max((node.get('x', 0) + (node.get('w', 150) if 'w' in node else 150) for node in nodes if 'x' in node), default=0) min_y = min((node.get('y', 0) for node in nodes if 'y' in node), default=0) max_y = max((node.get('y', 0) + (node.get('h', 30) if 'h' in node else 30) for node in nodes if 'y' in node), default=0) width = max_x - min_x + 100 # Add extra margin height = max_y - min_y + 100 # Add extra margin tab_dimensions[tab_id] = { 'width': width, 'height': height, 'min_x': min_x, 'min_y': min_y, 'original_tab': user_tabs[tab_id] } # Load user configuration files user_config = {} wellnuo_config = {} # Try to load user's WellNuo configuration - FIXED to handle JS format wellnuo_config_path = os.path.join(user_dir, 'config', 'wellnuo_config.js') try: if os.path.exists(wellnuo_config_path): # Read the JavaScript config file as text with open(wellnuo_config_path, 'r') as f: config_content = f.read() # Extract configuration using regex (simple approach) mqtt_username = re.search(r'MQTT_USERNAME\s*[=:]\s*["\']([^"\']+)["\']', config_content) mqtt_password = re.search(r'MQTT_PASSWORD\s*[=:]\s*["\']([^"\']+)["\']', config_content) mqtt_server = re.search(r'MQTT_SERVER\s*[=:]\s*["\']([^"\']+)["\']', config_content) mqtt_port_nr = re.search(r'MQTT_PORT\s*[=:]\s*["\']?(\d+)["\']?', config_content) if mqtt_username: wellnuo_config['MQTT_USERNAME'] = mqtt_username.group(1) if mqtt_password: wellnuo_config['MQTT_PASSWORD'] = mqtt_password.group(1) if mqtt_server: wellnuo_config['MQTT_SERVER'] = mqtt_server.group(1) if mqtt_port_nr: wellnuo_config['MQTT_PORT'] = int(mqtt_port_nr.group(1)) # Extract deployment data using a different approach deploy_start = config_content.find('deploymentData') deploy_end = -1 device_start = -1 device_end = -1 if deploy_start > 0: # Find the closing bracket of the array bracket_level = 0 in_array = False for i in range(deploy_start, len(config_content)): if config_content[i] == '[': bracket_level += 1 in_array = True elif config_content[i] == ']': bracket_level -= 1 if in_array and bracket_level == 0: deploy_end = i + 1 break device_start = config_content.find('deviceData') if device_start > 0: # Find the closing bracket of the array bracket_level = 0 in_array = False for i in range(device_start, len(config_content)): if config_content[i] == '[': bracket_level += 1 in_array = True elif config_content[i] == ']': bracket_level -= 1 if in_array and bracket_level == 0: device_end = i + 1 break # Parse deployment data if deploy_start > 0 and deploy_end > deploy_start: # Extract the array part deploy_text = config_content[deploy_start:deploy_end] # Find the actual array start array_start = deploy_text.find('[') if array_start > 0: deploy_array = deploy_text[array_start:].strip() # Convert JS to JSON format deploy_array = deploy_array.replace("'", '"') # Fix unquoted property names deploy_array = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', deploy_array) # Parse the array try: wellnuo_config['deploymentData'] = json.loads(deploy_array) print(f"Successfully parsed deploymentData for {username}") except json.JSONDecodeError as e: print(f"Error parsing deploymentData for {username}: {e}") # Parse device data if device_start > 0 and device_end > device_start: # Extract the array part device_text = config_content[device_start:device_end] # Find the actual array start array_start = device_text.find('[') if array_start > 0: device_array = device_text[array_start:].strip() # Convert JS to JSON format device_array = device_array.replace("'", '"') # Fix unquoted property names device_array = re.sub(r'(\s*)(\w+)(\s*):(\s*)', r'\1"\2"\3:\4', device_array) # Parse the array try: wellnuo_config['deviceData'] = json.loads(device_array) print(f"Successfully parsed deviceData for {username}") except json.JSONDecodeError as e: print(f"Error parsing deviceData for {username}: {e}") except Exception as e: print(f"Error processing WellNuo config for {username}: {e}") # Check for timeout-script.js to extract token timeout_script_path = os.path.join(user_dir, 'timeout-script.js') if os.path.exists(timeout_script_path): try: with open(timeout_script_path, 'r') as f: script_content = f.read() # Extract token token_match = re.search(r'var\s+token\s*=\s*["\']([^"\']+)["\']', script_content) if token_match: wellnuo_config['token'] = token_match.group(1) print(f"Found token for user {username}") # Extract username if present user_match = re.search(r'USER\s*:\s*([a-zA-Z0-9_-]+)', script_content) if user_match: wellnuo_config['user_name'] = user_match.group(1) print(f"Found explicit username in timeout script: {wellnuo_config['user_name']}") else: # Default to directory name as username wellnuo_config['user_name'] = username print(f"Using directory name as username: {username}") except Exception as e: print(f"Error extracting token for {username}: {e}") # Default to directory name as username wellnuo_config['user_name'] = username else: # Default to directory name as username if timeout script not found wellnuo_config['user_name'] = username # Update wellplug nodes with user information for node in wellplug_nodes: node['_wellnuo_user_config'] = wellnuo_config print(f"Updated wellplug node {node.get('id')} with user config") # Load other configuration files try: # Node configuration nodes_config_file = os.path.join(user_dir, ".config.nodes.json") if os.path.exists(nodes_config_file): with open(nodes_config_file, 'r') as f: user_nodes_config = json.load(f) user_config['nodes_config'] = user_nodes_config # Users configuration users_config_file = os.path.join(user_dir, ".config.users.json") if os.path.exists(users_config_file): with open(users_config_file, 'r') as f: user_users_config = json.load(f) user_config['users_config'] = user_users_config # Runtime configuration runtime_config_file = os.path.join(user_dir, ".config.runtime.json") if os.path.exists(runtime_config_file): with open(runtime_config_file, 'r') as f: user_runtime_config = json.load(f) user_config['runtime_config'] = user_runtime_config # Credentials file cred_file = os.path.join(user_dir, f"flows_{username}_cred.json") if os.path.exists(cred_file): with open(cred_file, 'r') as f: user_cred = json.load(f) user_config['credentials'] = user_cred except Exception as e: print(f"Error processing configuration for {username}: {e}") # Add to the collection user_flow_data.append({ 'username': username, 'tabs': user_tabs, 'nodes_by_tab': nodes_by_tab, 'dimensions': tab_dimensions, 'global_config_nodes': global_config_nodes, 'config': user_config, 'wellnuo_config': wellnuo_config, 'wellplug_nodes': wellplug_nodes }) # Initialize the combined flow with first tab main_tab_id = str(uuid.uuid4()) main_tab = { "id": main_tab_id, "type": "tab", "label": "Combined Flow", "disabled": False, # Correct Python boolean "info": "Main tab for combined user flows" } combined_flows.append(main_tab) # Create subflow templates for each user subflow_templates = {} current_position_x = left_margin # Start with left margin current_position_y = top_margin # Start with top margin # Track the maximum height in current row row_max_height = 0 max_row_width = 1800 # Maximum width for a row before wrapping # First create all the subflow templates for users for user_data in user_flow_data: username = user_data['username'] print(f"Creating subflow for user: {username}") # Create a subflow template for this user subflow_id = f"subflow_{username}_{str(uuid.uuid4()).replace('-', '')}" # Calculate total height and width needed for all tabs total_width = sum(dim['width'] for dim in user_data['dimensions'].values()) + 100 max_height = max((dim['height'] for dim in user_data['dimensions'].values()), default=200) + 100 # Create the subflow template subflow_template = { "id": subflow_id, "type": "subflow", "name": f"User: {username}", "info": f"Flows from user {username}", "category": "Imported Flows", "in": [], "out": [], "color": "#DDAA99", "icon": "font-awesome/fa-user", "status": { "x": 100, "y": 100, "wires": [] } } combined_flows.append(subflow_template) subflow_templates[username] = { 'id': subflow_id, 'template': subflow_template, 'position': (current_position_x, current_position_y), 'width': 300, # Default width for subflow instances 'height': 100 # Default height for subflow instances } # Update position for next subflow current_position_x += 350 # Space between subflows row_max_height = max(row_max_height, 150) # Track max height # Check if we need to wrap to a new row if current_position_x > max_row_width: current_position_x = left_margin current_position_y += row_max_height + 50 row_max_height = 0 # For each tab in the user's flows, create nodes inside the subflow x_offset = 50 # Starting position inside subflow y_offset = 50 for tab_id, nodes_list in user_data['nodes_by_tab'].items(): if not nodes_list: continue # Get the original tab info original_tab = user_data['tabs'][tab_id] dim = user_data['dimensions'][tab_id] min_x = dim['min_x'] min_y = dim['min_y'] # Add a comment node to label this tab tab_comment_id = f"tab_label_{username}_{tab_id}" tab_comment = { "id": tab_comment_id, "type": "comment", "name": f"User: {username} - Flow: {original_tab.get('label', 'Unnamed Flow')}", "info": "", "x": x_offset + 30, "y": y_offset - 30, "z": subflow_id, "wires": [] } combined_flows.append(tab_comment) # Create ID mapping for this tab's nodes id_mapping = {} for node in nodes_list: if isinstance(node, dict) and 'id' in node: id_mapping[node['id']] = f"{username}_{node['id']}" # Add the tab's nodes to the subflow for node in nodes_list: if not isinstance(node, dict): continue node_copy = copy.deepcopy(node) # Assign new ID if 'id' in node_copy: node_copy['id'] = id_mapping.get(node_copy['id'], node_copy['id']) # Assign to the subflow node_copy['z'] = subflow_id # Adjust position relative to flow origin and placement position # Add extra margin to prevent cutting off if 'x' in node_copy and 'y' in node_copy: node_copy['x'] = (node_copy['x'] - min_x) + x_offset + left_margin node_copy['y'] = (node_copy['y'] - min_y) + y_offset + top_margin # Update wire references if 'wires' in node_copy: new_wires = [] for wire_set in node_copy['wires']: new_wire_set = [] for wire in wire_set: if wire in id_mapping: new_wire_set.append(id_mapping[wire]) else: new_wire_set.append(wire) new_wires.append(new_wire_set) node_copy['wires'] = new_wires # Special handling for WellPlug sensor nodes to preserve user credentials # Special handling for WellPlug sensor nodes to preserve user credentials if node_copy.get('type') == 'device-sensor-in': # Transfer user-specific wellnuo configuration if '_wellnuo_user_config' in node_copy: user_config = node_copy['_wellnuo_user_config'] # Store credentials in a way that's accessible to the editor # Use properties without underscore prefix node_copy['username'] = user_config.get('user_name', username) node_copy['mqtt_password'] = user_config.get('MQTT_PASSWORD', '') # Keep internal version for backward compatibility node_copy['_username'] = user_config.get('user_name', username) node_copy['_mqtt_password'] = user_config.get('MQTT_PASSWORD', '') # Also preserve device and deployment selections if 'selectedDevices' in node: node_copy['selectedDevices'] = node['selectedDevices'] if 'selectedDeployments' in node: node_copy['selectedDeployments'] = node['selectedDeployments'] # Remove any token reference - we're moving away from tokens if '_token' in node_copy: del node_copy['_token'] # Remove the temporary field del node_copy['_wellnuo_user_config'] # Add to combined flows combined_flows.append(node_copy) print(f"WellPlug node {node_copy['id']} credentials: Username={node_copy.get('_username', 'None')}") # Update offset for next tab in this user's subflow y_offset += dim['height'] + 50 # Process global config nodes from this user for node in user_data['global_config_nodes']: if not isinstance(node, dict): continue node_copy = copy.deepcopy(node) # Assign new ID with username prefix for uniqueness if 'id' in node_copy: node_copy['id'] = f"{username}_{node_copy['id']}" # Update any references to other nodes # This is more complex as different node types have different reference fields # For subflow instances: if 'type' in node_copy and node_copy['type'].startswith('subflow:'): # The subflow template ID is in the type field after 'subflow:' old_subflow_id = node_copy['type'].split(':')[1] node_copy['type'] = f"subflow:{username}_{old_subflow_id}" combined_flows.append(node_copy) # Process configuration nodes if 'config' in user_data and 'nodes_config' in user_data['config']: for node_id, node_config in user_data['config']['nodes_config'].items(): # Add username prefix to ID new_id = f"{username}_{node_id}" combined_nodes_config[new_id] = node_config # Process credentials if 'config' in user_data and 'credentials' in user_data['config']: user_creds = user_data['config']['credentials'] if 'credentials' in user_creds: for node_id, cred_data in user_creds['credentials'].items(): # Add username prefix to ID new_id = f"{username}_{node_id}" combined_credentials[new_id] = cred_data # Now create instances of each user's subflow on the main tab current_position_x = left_margin current_position_y = top_margin row_max_height = 0 for username, subflow_info in subflow_templates.items(): subflow_id = subflow_info['id'] subflow_width = subflow_info['width'] subflow_height = subflow_info['height'] # Create a subflow instance on the main tab instance_id = f"instance_{username}_{str(uuid.uuid4()).replace('-', '')}" instance = { "id": instance_id, "type": f"subflow:{subflow_id}", "name": f"User: {username}", "x": current_position_x, "y": current_position_y, "z": main_tab_id, "wires": [] } combined_flows.append(instance) # Update position for next instance current_position_x += subflow_width + 50 row_max_height = max(row_max_height, subflow_height) # Check if we need to wrap to a new row if current_position_x > max_row_width: current_position_x = left_margin current_position_y += row_max_height + 50 row_max_height = 0 # Create a directory for WellNuo user configurations wellnuo_config_dir = os.path.join(output_dir, 'config') os.makedirs(wellnuo_config_dir, exist_ok=True) # Create a combined WellNuo config file with all user credentials combined_wellnuo_config = "// Combined WellNuo configuration\n\n" combined_wellnuo_config += "module.exports = {\n" combined_wellnuo_config += " // Default MQTT configuration\n" #combined_wellnuo_config += " MQTT_SERVER: \"mqtt.eluxnetworks.net\",\n" #combined_wellnuo_config += " MQTT_PORT: 8883,\n" combined_wellnuo_config += " MQTT_SERVER: \"127.0.0.1\",\n" combined_wellnuo_config += " MQTT_PORT: 1883,\n" #combined_wellnuo_config += " MQTT_USERNAME: \"artib\",\n" #combined_wellnuo_config += " MQTT_PASSWORD: \"well_arti_2027\",\n\n" combined_wellnuo_config += " // User credentials mapping - used by custom node handler\n" combined_wellnuo_config += " userCredentials: {\n" # Add credentials for each user for user_data in user_flow_data: username = user_data['username'] wellnuo_config = user_data.get('wellnuo_config', {}) if 'user_name' in wellnuo_config and 'token' in wellnuo_config: combined_wellnuo_config += f" \"{username}\": {{\n" combined_wellnuo_config += f" username: \"{wellnuo_config['user_name']}\",\n" combined_wellnuo_config += f" mqtt_password: \"{wellnuo_config['MQTT_PASSWORD']}\",\n" combined_wellnuo_config += f" token: \"{wellnuo_config['token']}\"\n" combined_wellnuo_config += " },\n" combined_wellnuo_config += " },\n\n" # Combine all users' deployment and device data all_deployments = [] all_devices = [] deployment_ids_seen = set() device_ids_seen = set() for user_data in user_flow_data: wellnuo_config = user_data.get('wellnuo_config', {}) # Add deployments from this user if 'deploymentData' in wellnuo_config: for deployment in wellnuo_config['deploymentData']: # Check if we've already added this deployment ID dep_id = str(deployment.get('deployment_id', '')) if dep_id and dep_id not in deployment_ids_seen: all_deployments.append(deployment) deployment_ids_seen.add(dep_id) # Add devices from this user if 'deviceData' in wellnuo_config: for device in wellnuo_config['deviceData']: # Check if we've already added this device ID dev_id = str(device.get('well_id', '')) if dev_id and dev_id not in device_ids_seen: all_devices.append(device) device_ids_seen.add(dev_id) # Add the combined deployment data combined_wellnuo_config += " // Combined deployment data from all users\n" combined_wellnuo_config += " deploymentData: " combined_wellnuo_config += json.dumps(all_deployments, indent=4).replace('"', "'") + ",\n\n" # Add the combined device data combined_wellnuo_config += " // Combined device data from all users\n" combined_wellnuo_config += " deviceData: " combined_wellnuo_config += json.dumps(all_devices, indent=4).replace('"', "'") + "\n" combined_wellnuo_config += "};\n" # Debug the combined WellNuo config print("=" * 50) print("COMBINED WELLNUO CONFIG PREVIEW:") print("-" * 50) print(combined_wellnuo_config[:500] + "..." if len(combined_wellnuo_config) > 500 else combined_wellnuo_config) print("-" * 50) print(f"Config contains credentials for {len([u for u in user_flow_data if 'wellnuo_config' in u and 'user_name' in u['wellnuo_config'] and 'token' in u['wellnuo_config']])} users") print(f"Combined configuration contains {len(all_deployments)} deployments and {len(all_devices)} devices") print("=" * 50) # Write the combined WellNuo config with open(os.path.join(wellnuo_config_dir, 'wellnuo_config.js'), 'w') as f: f.write(combined_wellnuo_config) # Write other combined configuration files with open(os.path.join(output_dir, "common_flow.json"), 'w') as f: json.dump(combined_flows, f, indent=4) with open(os.path.join(output_dir, ".config.nodes.json"), 'w') as f: json.dump(combined_nodes_config, f, indent=4) with open(os.path.join(output_dir, ".config.users.json"), 'w') as f: json.dump(combined_users_config, f, indent=4) with open(os.path.join(output_dir, ".config.runtime.json"), 'w') as f: json.dump(combined_runtime_config, f, indent=4) # Prepare credentials file creds_output = { "credentials": combined_credentials, "$": "creds" # Standard Node-RED credential format } with open(os.path.join(output_dir, "common_flow_cred.json"), 'w') as f: json.dump(creds_output, f, indent=4) # Copy the pre-created patched device-sensor-in.js for multi-user support main_instance_node_path = os.path.join(output_dir, 'node_modules', 'node-red-contrib-wellnuo') os.makedirs(main_instance_node_path, exist_ok=True) ## Copy the pre-created patched file #patched_file_path = '/home/ubuntu/.node-red/patched-device-sensor-in.js' #if os.path.exists(patched_file_path): #with open(patched_file_path, 'r') as src, \ #open(os.path.join(main_instance_node_path, 'device-sensor-in.js'), 'w') as dst: #dst.write(src.read()) #print("Copied pre-created patched device-sensor-in.js file") #else: #print("Warning: Pre-created patched file not found at", patched_file_path) # Copy HTML file for the node orig_html_path = '/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo/device-sensor-in.html' if os.path.exists(orig_html_path): with open(orig_html_path, 'r') as src, \ open(os.path.join(main_instance_node_path, 'device-sensor-in.html'), 'w') as dst: dst.write(src.read()) # Create package.json for the node module package_json = { "name": "node-red-contrib-wellnuo", "version": "1.0.0", "description": "Modified WellNuo nodes for multi-user deployment", "node-red": { "nodes": { "device-sensor-in": "device-sensor-in.js" } } } with open(os.path.join(main_instance_node_path, 'package.json'), 'w') as f: json.dump(package_json, f, indent=4) return os.path.join(output_dir, "common_flow.json") # Add near other imports if needed # import subprocess (already imported) # from pathlib import Path (already imported) # --- Add this NEW function --- def install_package_in_user(username, package_name): """ Installs or links a specific package back into a user's node_modules. Prioritizes linking custom nodes, otherwise installs from registry. """ user_dir = USERS_DIR / username user_node_modules = user_dir / 'node_modules' target_path = user_node_modules / package_name logger.info(f"Attempting to restore package '{package_name}' for user '{username}'.") print(f"Restoring package '{package_name}' for user '{username}'.") # Check if it's a custom node first custom_node_path = CUSTOM_NODES_DIR / package_name is_custom = False if custom_node_path.is_dir() and (custom_node_path / 'package.json').exists(): is_custom = True logger.info(f"'{package_name}' identified as a custom node.") # Remove existing remnants in user dir if they exist (e.g., broken link/dir) if target_path.exists() or target_path.is_symlink(): logger.warning(f"Removing existing item at {target_path} before restoring.") try: if target_path.is_dir() and not target_path.is_symlink(): shutil.rmtree(target_path) else: target_path.unlink(missing_ok=True) # Remove file or link except OSError as rm_err: logger.error(f"Failed to remove existing item {target_path} for user {username}: {rm_err}") # Might still proceed with install/link attempt # Perform link for custom node or install for others success = False cmd = [] if is_custom: # Create link within user's node_modules # Assumes global link exists via 'sudo npm link' in custom node dir cmd = ['npm', 'link', package_name] operation_desc = f"link custom node {package_name} for user {username}" else: # Install latest from registry (or specific version if known? Keep simple for now) # Note: This might install a different version than the user originally had! logger.warning(f"Restoring non-custom package '{package_name}' for user '{username}' from registry (latest).") cmd = ['npm', 'install', package_name, '--no-save', '--no-audit', '--no-fund', '--omit=dev'] operation_desc = f"install latest {package_name} for user {username}" # Run the command inside the USER'S directory logger.info(f"Running command: {' '.join(cmd)} in {user_dir}") try: # Don't use the main run_npm_command helper as it works in main_instance and uses temp cache result = subprocess.run( cmd, cwd=user_dir, # *** Run in the specific user's directory *** check=True, capture_output=True, text=True, timeout=300 ) logger.info(f"Successfully executed {operation_desc}: {result.stdout[:200]}...") success = True except subprocess.CalledProcessError as e: logger.error(f"FAILED to execute {operation_desc}. Error: {e.stderr}") print(f"ERROR restoring package {package_name} for user {username}.") except Exception as e: logger.error(f"Unexpected error during {operation_desc}: {e}", exc_info=True) print(f"ERROR restoring package {package_name} for user {username}.") return success def deploy_combined_flow(flow_file, node_red_url="http://localhost:1999", credentials_file=None, admin_user=None, admin_password=None): """ Deploy the combined flow to the main Node-RED instance with proper authentication Args: flow_file: Path to the combined flow file node_red_url: URL of the main Node-RED instance credentials_file: Path to the credentials file admin_user: Admin username admin_password: Admin password Returns: Boolean indicating success """ import requests from requests.auth import HTTPBasicAuth # Load flow file with open(flow_file, 'r') as f: flows = json.load(f) # Use Node-RED Admin API to deploy headers = { "Content-Type": "application/json", "Node-RED-Deployment-Type": "full" } # Note: Try with different authentication methods try: print(f"Deploying flow to {node_red_url}/flows") print(f"Admin credentials: {admin_user} / {admin_password[:2]}***") # First attempt: Standard Basic Auth response = requests.post( f"{node_red_url}/flows", json=flows, headers=headers, auth=HTTPBasicAuth(admin_user, admin_password) ) # Check if successful if response.status_code == 200 or response.status_code == 204: print(f"Successfully deployed flow using standard auth") return True # If that fails, try with token-based auth if response.status_code == 401: print(f"Standard auth failed, trying token auth") # Get a JWT token first auth_response = requests.post( f"{node_red_url}/auth/token", json={"username": admin_user, "password": admin_password}, headers={"Content-Type": "application/json"} ) if auth_response.status_code == 200: token = auth_response.json().get("token") # Use the token for authentication token_headers = { "Content-Type": "application/json", "Node-RED-Deployment-Type": "full", "Authorization": f"Bearer {token}" } token_response = requests.post( f"{node_red_url}/flows", json=flows, headers=token_headers ) if token_response.status_code == 200: print(f"Successfully deployed flow using token auth") return True else: print(f"Token auth failed with status {token_response.status_code}: {token_response.text}") else: print(f"Failed to get auth token: {auth_response.status_code}: {auth_response.text}") # If all methods fail, report the error print(f"All authentication methods failed. Status code: {response.status_code}") print(f"Response: {response.text}") return False except Exception as e: print(f"Error during deployment: {e}") return False def restart_node_red_service(service_name="node-red.service"): """Attempts to restart the Node-RED systemd service.""" print(f"Attempting to restart Node-RED service: {service_name}") try: # Using systemctl requires appropriate permissions. # Consider configuring passwordless sudo for this specific command for the 'ubuntu' user via visudo, # OR run well-alerts.py itself as root (less recommended). cmd = ["sudo", "systemctl", "restart", service_name] result = subprocess.run( cmd, check=True, capture_output=True, text=True, timeout=60 # 1 minute timeout for restart command ) logger.info(f"systemctl restart {service_name} successful.") print(f"Successfully signaled {service_name} to restart.") # Add a small delay to allow the service to fully stop and start time.sleep(10) # Wait 10 seconds (adjust as needed) # Optional: Check status after delay status_cmd = ["sudo", "systemctl", "is-active", service_name] status_result = subprocess.run(status_cmd, capture_output=True, text=True) if status_result.stdout.strip() == "active": print(f"{service_name} is active after restart.") return True else: print(f"WARNING: {service_name} not reported active after restart attempt. Status: {status_result.stdout.strip()}") logger.warning(f"{service_name} not reported active after restart attempt. Status: {status_result.stdout.strip()}") return False # Indicate potential issue except subprocess.CalledProcessError as e: logger.error(f"Failed to restart {service_name}: {e.stderr}") print(f"ERROR: Failed to restart {service_name}. Check logs and permissions.") return False except FileNotFoundError: logger.error(f"Failed to restart {service_name}: 'sudo' or 'systemctl' command not found.") print(f"ERROR: Failed to restart {service_name}: command not found.") return False except Exception as e_restart: logger.error(f"An unexpected error occurred during {service_name} restart: {e_restart}") print(f"ERROR: An unexpected error occurred during {service_name} restart.") return False def task_a(arg): print(f"task_a {arg}") def task_b(arg): print(f"task_b {arg}") def string_to_uuid(input_string, namespace=uuid.NAMESPACE_DNS): """ Convert a string to a UUID using uuid5 (SHA-1 hash) Args: input_string: The string to convert namespace: UUID namespace (default: NAMESPACE_DNS) Returns: A UUID object """ return uuid.uuid5(namespace, input_string) class EmailSender: def __init__(self, gmail_user, gmail_password, max_workers=5): """ Initialize the email sender with your Gmail credentials. Args: gmail_user (str): Your Gmail email address gmail_password (str): Your Gmail app password (not your regular password) max_workers (int): Maximum number of worker threads """ self.gmail_user = gmail_user self.gmail_password = gmail_password self.max_workers = max_workers self.email_queue = queue.Queue() self.workers = [] self.running = False def start_workers(self): """Start the worker threads to process the email queue.""" self.running = True for _ in range(self.max_workers): worker = threading.Thread(target=self._email_worker) worker.daemon = True worker.start() self.workers.append(worker) def stop_workers(self): """Stop all worker threads.""" self.running = False for worker in self.workers: if worker.is_alive(): worker.join(timeout=2.0) self.workers = [] def _email_worker(self): """Worker thread that processes emails from the queue.""" while self.running: try: # Get an email job from the queue with a timeout email_job = self.email_queue.get(timeout=1.0) if email_job: recipient, message, subject = email_job self._send_single_email(recipient, message, subject) self.email_queue.task_done() except queue.Empty: # Queue is empty, just continue and check running status continue except Exception as e: print(f"Error in email worker: {e}") # Still mark the task as done even if it failed try: self.email_queue.task_done() except: pass def _send_single_email(self, to_email, html_content, subject): """ Send a single email using Gmail SMTP. Args: to_email (str): Recipient email address text_content (str): Email message content """ try: # Create the email message msg = MIMEMultipart() msg['From'] = self.gmail_user msg['To'] = to_email msg['Subject'] = subject # Attach the text content #plain_text = "Please view this email in an HTML-compatible client to see its full content." #part1 = MIMEText(plain_text, 'plain') # 2. Create the HTML part part2 = MIMEText(html_content, 'html') #msg.attach(part1) msg.attach(part2) print("Connecting to SMTP server...") server = smtplib.SMTP_SSL('smtp.gmail.com', 465) print("Connected to server") print("Attempting login...") server.login(self.gmail_user, self.gmail_password) print("Login successful") print("Sending email...") server.send_message(msg) print("Message sent") server.quit() print(f"Email sent successfully to {to_email}") except Exception as e: print(f"Failed to send email to {to_email}: {e}") traceback.print_exc() # This will print the detailed error stack def SendEmailTo(self, email_address, text_str, subject): """ Queue an email to be sent by a worker thread. Args: email_address (str): Recipient email address text_str (str): Email message content """ # Add the email to the queue if email_address[0] != "_" and email_address[0] != "-": self.email_queue.put((email_address, text_str, subject)) def GetMQTTid(user_id): with get_db_connection() as conn: with conn.cursor() as cur: sqlr = f"SELECT mqtt_id from public.mobile_clients WHERE user_id ={user_id}" mqtt_id = ReadCleanStringDB(cur, sqlr) return mqtt_id def SendMessageTo(user_id, text_str): #we need mqtt_id for user print(user_id, text_str) mqtt_id = GetMQTTid(user_id) mqtt_message_str = f""" {{ "Command": "REPORT","body":"{text_str}","reflected":"","language":"","time":{time.time() *1000} }} """ print(mqtt_message_str) if mqtt_id != "": MQSendM(mqtt_id, mqtt_message_str) def SendMessageDirectTo(topic, text_str): mqtt_message_str = f""" {{ "Command": "REPORT","body":"{text_str}","reflected":"","language":"","time":{time.time() *1000} }} """ #print(mqtt_message_str) if topic != "": MQSend_worker(topic, mqtt_message_str, True)#(topic, mqtt_message_str, True) def normalize_phone_number(phone_number_str: str) -> str: if not phone_number_str: return "" cleaned_number = "".join(filter(lambda char: char.isdigit() or char == '+', phone_number_str)) if not cleaned_number.startswith('+'): if len(cleaned_number) == 10: #assume it is US number if exactly 10 digits long cleaned_number = '+1' + cleaned_number elif cleaned_number.startswith('1') and len(cleaned_number) >= 11: cleaned_number = '+' + cleaned_number else: cleaned_number = '+' + cleaned_number return cleaned_number # --- Telnyx SMS Sending Function --- def setup_telnyx_sms_client(): try: telnyx.api_key = TELNYX_API_KEY logger.info("Telnyx client for SMS configured.") return True except Exception as e: logger.error(f"Failed to configure Telnyx client for SMS: {e}") return False def send_telnyx_sms(recipient_phone: str, message_body: str, caller_id_type: str = "auto") -> bool: """Sends an SMS using the Telnyx API, with dynamic 'from_' based on caller_id_type.""" if not setup_telnyx_sms_client(): return False if not recipient_phone or not message_body: logger.error("Cannot send Telnyx SMS: Recipient phone and message are required.") return False #recipient_phone = normalize_phone_number(recipient_phone) #done outside already from_id = TELNYX_SENDER_ID # Default to numeric #return if caller_id_type == "alpha": if TELNYX_SENDER_ID_ALPHA: from_id = TELNYX_SENDER_ID_ALPHA else: logger.warning("SMS Caller ID type 'alpha' requested, but TELNYX_SENDER_ID_ALPHA is not set. Falling back to numeric.") elif caller_id_type == "numeric": from_id = TELNYX_SENDER_ID elif caller_id_type == "auto": if recipient_phone.startswith("+1"): # US/Canada from_id = TELNYX_SENDER_ID elif TELNYX_SENDER_ID_ALPHA: # Other international, try Alpha if available from_id = TELNYX_SENDER_ID_ALPHA else: # Fallback to numeric if Alpha not set for international logger.warning("SMS Caller ID type 'auto' for non-US/Canada destination, but TELNYX_SENDER_ID_ALPHA not set. Using numeric.") from_id = TELNYX_SENDER_ID else: # Should not happen with argparse choices logger.warning(f"Invalid caller_id_type '{caller_id_type}' for SMS. Defaulting to numeric.") from_id = TELNYX_SENDER_ID logger.info(f"Attempting to send Telnyx SMS from '{from_id}' to '{recipient_phone}'") try: message_create_params = { "from_": from_id, "to": recipient_phone, "text": message_body, "messaging_profile_id": TELNYX_MESSAGING_PROFILE_ID, "type": "sms" } if not message_create_params["messaging_profile_id"]: del message_create_params["messaging_profile_id"] response = telnyx.Message.create(**message_create_params) logger.info(f"SMS submitted successfully. Message ID: {response.id}") return True except telnyx.error.TelnyxError as e: logger.error(f"Telnyx API Error sending SMS to {recipient_phone} from '{from_id}': {e}") if hasattr(e, 'json_body') and e.json_body and 'errors' in e.json_body: for err in e.json_body['errors']: logger.error(f" - Code: {err.get('code')}, Title: {err.get('title')}, Detail: {err.get('detail')}") return False except Exception as e: logger.error(f"Unexpected error sending Telnyx SMS to {recipient_phone}: {e}") return False def SendSMSTo(phone_nr, text_str): if phone_nr == None or phone_nr == 'None': return if phone_nr[0] == "-" or phone_nr[0] == "_": return if len(text_str) < 1: return phone_nr = normalize_phone_number(phone_nr) if phone_nr[0] == "+": sms_sent = send_telnyx_sms(recipient_phone=phone_nr, message_body=text_str, caller_id_type="auto") if sms_sent: logger.info(f"Telnyx SMS sent.") return True else: logger.error(f"Failed to send SMS.") return False def text2mp3(voice_library, text_to_speak, file_to_save, wait=True, timeout=60): """ Submit a TTS job and optionally wait for the result. Args: voice_library: Model name to use text_to_speak: Text to convert to speech file_to_save: Target filename (will be placed in the results dir) wait: Whether to wait for the job to complete timeout: Maximum time to wait in seconds Returns: dict: Job result information """ # Generate a unique job ID job_id = str(uuid.uuid4()) # Create the job file job = { 'job_id': job_id, 'voice_library': voice_library, 'text_to_speak': text_to_speak, 'file_to_save': file_to_save } job_file = os.path.join(JOBS_DIR, f"{job_id}.job") with open(job_file, 'w') as f: json.dump(job, f) if not wait: return {'job_id': job_id, 'pending': True} # Wait for the result result_file = os.path.join(RESULTS_DIR, f"{job_id}.result") start_time = time.time() while not os.path.exists(result_file): if time.time() - start_time > timeout: return { 'job_id': job_id, 'success': False, 'error': 'Job timed out' } time.sleep(0.5) # Read the result with open(result_file, 'r') as f: result = json.load(f) # Clean up result file os.remove(result_file) return result def PrepareMP3(text_to_speak): #todo add multi-language support #tts_model = "tts_models/en/ljspeech/tacotron2-DDC" tts_model = "tts_models/en/vctk/vits" my_uuid = str(string_to_uuid(text_to_speak+tts_model)) voice_file_long_source = os.path.join(RESULTS_DIR, f"{my_uuid}.mp3") voice_file_long = os.path.join(RESULTS_SHARE_DIR, f"{my_uuid}.mp3") if not os.path.exists(voice_file_long): result = text2mp3( tts_model, text_to_speak, my_uuid+".mp3" ) if result.get('success'): print(f"TTS generated successfully: {result.get('file_path')}") destination_path = RESULTS_SHARE_DIR shutil.copy2(voice_file_long_source, destination_path) else: print(f"TTS generation failed: {result.get('error')}") return "" return my_uuid def SendPhoneCall(phone_nr, text_str): if phone_nr[0] == "-" or phone_nr[0] == "_": return phone_nr = normalize_phone_number(phone_nr) #TELNYX_WEBHOOK_URL_VOICE = "http://eluxnetworks.net:1998/telnyx-webhook" uuid_str = PrepareMP3(text_str) if uuid_str != "": # Headers headers = { "Authorization": f"Bearer {TELNYX_API_KEY}", "Content-Type": "application/json" } # Request payload payload = { "to": phone_nr, "from": TELNYX_SENDER_ID, "connection_id": TELNYX_CONNECTION_ID_VOICE, "webhook_url": TELNYX_WEBHOOK_URL_VOICE, "webhook_url_method": "POST", "answering_machine_detection": "disabled", "custom_headers": [ { "name": "X-Audio-Url", "value": "https://eluxnetworks.net/shared/clips/" + uuid_str + ".mp3" } ] } # Make the API call print(f"posting call:\n {payload}") response = requests.post(TELNYX_VOICE_URL, headers=headers, json=payload) # Print the response print(response.status_code) print(response.json()) print("Call voice") def SendAlerts(deployment_id, method, text_str, subject, user_name): global sender #return #todo remove it in production if user_name == "" or user_name == None: #real request so send to all caretakers care_takers = care_takers_all[int(deployment_id)] for user_details in care_takers: if method.upper() == "MSG": destination = user_details[0] SendMessageTo(user_details[0], text_str) if method.upper() == "EMAIL": destination = user_details[1] sender.SendEmailTo(user_details[1], text_str, subject) if method.upper() == "SMS": destination = user_details[2] success = SendSMSTo(user_details[2], text_str) #result is logged, so for now just ignore if method.upper() == "PHONE": destination = user_details[2] SendPhoneCall(user_details[2], text_str) if False: logger.info(f"Sending Alert to {destination} via {method} Content:{text_str}") message_dict = {} message_dict["deployment_id"] = deployment_id message_dict["method"] = method message_dict["receipient"] = destination message_dict["text_str"] = text_str SendToQueue(RABBITMQ_ALERTS_QNAME, message_dict) else: #send to logged user only user_details = GetUserDetails(user_name)[0] if method.upper() == "MSG": destination = user_details[0] SendMessageTo(destination, text_str) if method.upper() == "EMAIL": destination = user_details[3] sender.SendEmailTo(destination, text_str, subject) if method.upper() == "SMS": destination = user_details[14] success = SendSMSTo(destination, text_str) #result is logged, so for now just ignore if method.upper() == "PHONE": destination = user_details[14] SendPhoneCall(destination, text_str) def set_character(some_string, bit_nr, new_char): """ Replace a character in a string at position bit_nr from the right. Parameters: some_string (str): The input string bit_nr (int): Position from right (0 = rightmost, 1 = second from right, etc.) new_char (str): The replacement character Returns: str: The modified string """ if bit_nr < 0 or bit_nr >= len(some_string): return some_string # Invalid position # Convert string to list for easier manipulation chars = list(some_string) # Replace character at position bit_nr from right chars[len(chars) - 1 - bit_nr] = new_char # Convert back to string return ''.join(chars) def ReadCleanStringDB(cur, sql): cur.execute(sql) temp_string = cur.fetchone() if temp_string == None: return "" else: return str(temp_string[0]).strip() def GetTimeZoneOfDeployment(deployment_id): global time_zones_all if deployment_id not in time_zones_all: time_zone_st = 'America/Los_Angeles' with get_db_connection() as conn: with conn.cursor() as cur: sqlr = f"SELECT time_zone_s from public.deployments WHERE deployment_id ={deployment_id}" time_zone_st = ReadCleanStringDB(cur, sqlr) time_zones_all[deployment_id] = time_zone_st else: time_zone_st = time_zones_all[deployment_id] return time_zone_st def StoreLastSentToRedis(deployment_id): alarms_settings = alarms_settings_all[deployment_id] alarms_settings["last_triggered_utc"] = datetime.datetime.utcnow().isoformat() try: alarm_deployment_settings_str = json.dumps(alarms_settings) redis_conn.set('alarm_deployment_settings_'+str(deployment_id), alarm_deployment_settings_str) except Exception as e: print(f"Error: {e}") def StoreDeviceAlarmsToRedis(device_id, device_alarm_settings): alarm_device_settings_str = json.dumps(device_alarm_settings) redis_conn.set('alarm_device_settings_'+str(device_id), alarm_device_settings_str) def StoreDeviceAlarmsToDB(device_id, device_alarm_settings): alarm_device_settings_str = json.dumps(device_alarm_settings) conn = get_db_connection() if not conn: return False try: with conn.cursor() as cur: cur.execute("UPDATE devices SET alert_details = %s WHERE device_id = %s", (alarm_device_settings_str, device_id)) print(f"Updated alert_details for {device_id} to {alarm_device_settings_str}") conn.commit() # Update the ACL file return True except Exception as e: print(f"Database error: {e}") conn.rollback() return False finally: conn.close() def StoreLastSentToDB(deployment_id): alarms_settings = alarms_settings_all[deployment_id] alarm_deployment_settings_str = json.dumps(alarms_settings) conn = get_db_connection() if not conn: return False try: with conn.cursor() as cur: cur.execute("UPDATE deployments SET alarm_details = %s WHERE deployment_id = %s", (alarm_deployment_settings_str, deployment_id)) print(f"Updated alert_details for {deployment_id} to {alarm_deployment_settings_str}") conn.commit() # Update the ACL file return True except Exception as e: print(f"Database error: {e}") conn.rollback() return False finally: conn.close() def StoreAllToRedisAndDB(deployment_id, device_id): StoreLastSentToRedis(deployment_id) StoreLastSentToDB(deployment_id) device_alarm_settings = device_alerts_all[device_id] StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) StoreDeviceAlarmsToDB(device_id, device_alarm_settings) def SetupTasks(): global local_daily_minute_last print("SetupTasks") stt = time.time() #This will determine current state of the Z-graf based warnings/alarms, and setup future triggers #This warnings/alarms that are sensor based need to be done in next_run_in_minutes = 1440 #this will be reduced depending on data #each time when local daily time becomes smaller than before (passes daily midnight), then all alarm triggeres need to be re-armed as per DB settings. for deployment_id in deployment_id_list: if deployment_id in alarms_settings_all: time_zone_s = GetTimeZoneOfDeployment(deployment_id) local_timezone = pytz.timezone(time_zone_s) local_time = datetime.datetime.now(local_timezone) deployment_id_s = str(deployment_id) #at this point we want to read settings from redis and not local memory, since settings could change by user #redis_conn.set('alarm_device_settings_'+device_id_s, alarm_device_settings_str) alarm_deployment_settings_str = GetRedisString('alarm_deployment_settings_'+deployment_id_s) if alarm_deployment_settings_str == None: alarms_settings = alarms_settings_all[deployment_id] alarm_deployment_settings_str = json.dumps(alarms_settings) redis_conn.set('alarm_deployment_settings_'+deployment_id_s, alarm_deployment_settings_str) else: alarms_settings = json.loads(alarm_deployment_settings_str) if "enabled" in alarms_settings: alarm_armed_settings_str = alarms_settings["enabled"] else: alarm_armed_settings_str = "000" if "last_triggered_utc" in alarms_settings: last_triggered_utc = alarms_settings["last_triggered_utc"] else: last_triggered_utc = 0 if "rearm_policy" in alarms_settings: rearm_policy = alarms_settings["rearm_policy"] else: rearm_policy = "Never" #alarm_armed_settings_str = GetRedisString(f'alarm_armed_settings_{deployment_id}') utc_time = datetime.datetime.utcnow() # Calculate minutes since start of day hours = local_time.hour minutes = local_time.minute local_daily_minute = (hours * 60) + minutes user_first_last_name = GetBeneficiaryFromDeployment(deployment_id) #check if re-arming should happen #Sent message to user is tied to Benficiary, not receivers do_rearm = False if rearm_policy == "At midnight": if local_daily_minute < local_daily_minute_last: do_rearm = True elif rearm_policy == "1H": if utc_time > (last_triggered_utc + 3600): do_rearm = True elif rearm_policy == "6H": if utc_time > (last_triggered_utc + 6 * 3600): do_rearm = True elif rearm_policy == "12H": if utc_time > (last_triggered_utc + 12 * 3600): do_rearm = True elif rearm_policy == "1D": if utc_time > (last_triggered_utc + 24 * 3600): do_rearm = True elif rearm_policy == "2D": if utc_time > (last_triggered_utc + 48 * 3600): do_rearm = True elif rearm_policy == "Never": pass devices_lst = deployments_devices[deployment_id] if do_rearm: #No need to re-arm Burglar alarm it remains armed untill manually disarmed alarm_settings_str = GetAlarmSettings(deployment_id) if alarm_settings_str != "{}": alarm_settings = json.loads(alarm_settings_str) #alarm_settings["enabled"] bits: # bit 2: Burglar Alarm (index 0), bit 1: Time alone Alarm is Set (index 1), bit 0: Time alone Warning is Set (index 2) if alarm_settings["enabled"][1] == "1": #Time alone Alarm alarm_settings["alone_alarm_armed"] = True if alarm_settings["enabled"][2] == "1": #Time alone Warning alarm_settings["alone_warning_armed"] = True alarms_settings_all[deployment_id] = alarm_settings StoreLastSentToRedis(deployment_id) StoreLastSentToDB(deployment_id) alarm_armed_settings_str = alarms_settings["armed_states"] for device_id in devices_lst: if device_id in device_alerts_all: device_id_s = str(device_id) alarm_device_settings_str = GetDeviceAlarmSettings(device_id) StoreDeviceAlarmsToRedis(device_id, alarm_device_settings_str) StoreDeviceAlarmsToDB(device_id, device_alarm_settings) if alarm_armed_settings_str[0] == "1": #"100" Burglar alarm #Burglar alarm is triggerred on sensor read not time so nothing to be done here pass #lets check if alone if "1" in alarm_armed_settings_str[1:]: #"010" alone warning or alarm pass #todo numbers = [] for device_id in devices_lst: if device_id in device_alerts_all: since_seen = time.time() - GetRedisFloat('lastseen_'+str(device_id)) numbers.append((since_seen, device_id)) #In order to understand if somebody is too long in some place, or to long since it visited, we use following logic: #For too long since last visit: we check if since_seen is larger than absent_minutes_alarm and absent_minutes_warning value #For too long (stuck) in same place: we check (time_of_last_read_of_the_device - time_of_last_read_of_any_other_device) larger than stuck_minutes_warning and stuck_minutes_alarm value second_smallest = -1 if len(numbers) <2: #needs at least 2 devices for proper operation pass else: sorted_numbers = sorted(numbers, key=lambda x: x[0]) smallest = sorted_numbers[0] second_smallest = sorted_numbers[1] device_id_to_last_seen = {} for tpl in sorted_numbers: last_seen_ago = tpl[0] device_id = tpl[1] #device_id_to_last_seen[tpl[1]] = tpl[0] device_alarm_settings = device_alerts_all[device_id] if "enabled_alarms" in device_alarm_settings: enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = "00000000000" if "armed_states" in device_alarm_settings: armed_states = device_alarm_settings["armed_states"] #lets check alarms first, because if satisfied, no need to check for warning if enabled_alarms_str[BitIndex(1)] == "1" and armed_states[BitIndex(1)] == "1": #Too long present alarm if enabled and not already triggerred (triggered = 0)! if device_id == smallest[1]: #now present... how long? if (second_smallest[0] - smallest[0]) > float(device_alarm_settings["stuck_minutes_alarm"]) * 60: #cancel alarm and warning, until re-armed armed_states = set_character(armed_states, 0, "0") armed_states = set_character(armed_states, 1, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["stuck_alarm_method_1"] if method.upper() != "PHONE": SendAlerts(deployment_id, method, f"Alarm: {user_first_last_name} way too long ({int((second_smallest[0] - smallest[0]) / 60)} minutes) in {location}", "", "") else: SendAlerts(deployment_id, method, f"Alarm: {user_first_last_name} way too long in {location}", "", "") device_alarm_settings["armed_states"] = armed_states StoreLastSentToRedis(deployment_id) device_alerts_all[device_id] = device_alarm_settings StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: check_after = float(device_alarm_settings["stuck_minutes_alarm"]) - int((second_smallest[0] - smallest[0])/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after elif enabled_alarms_str[BitIndex(0)] == "1" and armed_states[BitIndex(0)] == "1": #Too long present warning if (second_smallest[0] - smallest[0]) > float(device_alarm_settings["stuck_minutes_warning"]) * 60: #cancel warning, until re-armed armed_states = set_character(armed_states, 0, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["stuck_warning_method_0"] if method.upper() != "PHONE": SendAlerts(deployment_id, method, f"Warning: {user_first_last_name} too long ({int((second_smallest[0] - smallest[0]) / 60)} minutes) in {location}", "", "") else: SendAlerts(deployment_id, method, f"Warning: {user_first_last_name} too long in {location}", "", "") StoreLastSentToRedis(deployment_id) device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: check_after = float(device_alarm_settings["stuck_minutes_warning"]) - int((second_smallest[0] - smallest[0])/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after if enabled_alarms_str[BitIndex(3)] == "1" and armed_states[BitIndex(3)] == "1": #Too long absent alarm if abs(last_seen_ago) > abs(float(device_alarm_settings["absent_minutes_alarm"])) * 60: armed_states = set_character(armed_states, 3, "0") armed_states = set_character(armed_states, 2, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["absent_alarm_method_3"] if method.upper() != "PHONE": SendAlerts(deployment_id, method, f"Alarm: Way too long since {user_first_last_name} visited {location} ({int(last_seen_ago / 60)} minutes)", "", "") else: SendAlerts(deployment_id, method, f"Alarm: Way too long since {user_first_last_name} visited {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreLastSentToRedis(deployment_id) StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: check_after = abs(float(device_alarm_settings["absent_minutes_alarm"])) - int(last_seen_ago/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after if enabled_alarms_str[BitIndex(2)] == "1" and armed_states[BitIndex(2)] == "1": #Too long absent alarm if abs(last_seen_ago) > abs(float(device_alarm_settings["absent_minutes_warning"])) * 60: armed_states = set_character(armed_states, 2, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["absent_warning_method_2"] if method.upper() != "PHONE": SendAlerts(deployment_id, method, f"Warning: Too long since {user_first_last_name} visited {location} ({int(last_seen_ago / 60)} minutes)", "", "") else: SendAlerts(deployment_id, method, f"Warning: Too long since {user_first_last_name} visited {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreLastSentToRedis(deployment_id) StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: check_after = abs(float(device_alarm_settings["absent_minutes_warning"])) - int(last_seen_ago/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after #"stuck_warning_method_0": "SMS", #"stuck_alarm_method_1": "PHONE", #"absent_warning_method_2": "SMS", #"absent_alarm_method_3": "PHONE", #"temperature_high_warning_method_4": "SMS", #"temperature_high_alarm_method_5": "PHONE", #"temperature_low_warning_method_6": "SMS", #"temperature_low_alarm_method_7": "PHONE", #"radar_alarm_method_8":"MSG", #"pressure_alarm_method_9":"MSG", #"light_alarm_method_10":"MSG" #how to determine when user arived here (time of any other place!) if next_run_in_minutes > 1: next_run_in_minutes = 1 try: schedule_task(SetupTasks, "", next_run_in_minutes) except OSError as e: logger.warning(f"Could not SetupTasks: {e}") print(time.time()-stt) def create_device_row_html(device_id): """Generates the HTML for a single device row.""" options = [ "?", "Office", "Hallway", "Garage", "Outside", "Conference Room", "Room", "Kitchen", "Bedroom", "Living Room", "Bathroom", "Dining Room", "Bathroom Main", "Bathroom Guest", "Bedroom Master", "Bedroom Guest", "Basement", "Attic", "Other" ] location_combobox = f'' description_textbox = f'' return f"""
Device #{device_id}
{location_combobox} {description_textbox}
""" def populate_welcome_email(template_content, user_data): """ Populates the welcome email template with user-specific data. Args: template_path (str): The file path to the HTML email template. user_data (dict): A dictionary containing user information: 'first_name', 'last_name', 'user_name', 'password', 'devices' (a list of device IDs). Returns: str: The populated HTML content of the email. """ # Use str.replace() for each placeholder to avoid conflicts with CSS/JS content = template_content.replace('{first_name}', user_data.get("first_name", "")) content = content.replace('{last_name}', user_data.get("last_name", "")) content = content.replace('{user_name}', user_data.get("user_name", "")) content = content.replace('{password}', user_data.get("password", "")) content = content.replace('{signature}', user_data.get("signature", "")) return content def read_file(file_name, source = "LOCAL", type_ = "TEXT", bucket_name="daily-maps"): blob_data = "" if source == "MINIO": #blob_data = ReadObjectMinIO(bucket_name, file_name) pass elif source == "LOCAL": login_file = os.path.join(filesDir, file_name) login_file = login_file.replace("\\","/") logger.debug(f"Full file path: {login_file}") logger.debug(f"File exists: {os.path.exists(login_file)}") #print(login_file) if type_ == "TEXT": with open(login_file, encoding="utf8") as f: blob_data = f.read() else: with open(login_file, 'rb') as f: blob_data = f.read() elif source == "AZURE": try: blob_data = ""#container_client.download_blob(file_name).readall() except Exception as err: logger.error("Not reading Azure blob "+str(err)) blob_data = "" return blob_data else: pass return blob_data # --- Main Monitoring Loop --- def monitor_devices(): """Main monitoring loop that checks devices periodically.""" logger.info("Starting monitoring loop...") while not stop_event.is_set(): try: # Add this outer try block start_time = time.monotonic() #current_time_unix = time.time() #======================== Do node-red instances management ================================================== hash_data = GetRedisMap('node-red_deployed') #logger.debug(f"node_red_requests: {hash_data}") requests_count = 0 if hash_data != {}: requests_count = int(hash_data['requests']) if requests_count > 0: logger.debug(f"node-red_deployed: {str(hash_data)}") user_name = hash_data['user_name'] #delete request #this might need switching to queue... todo redis_conn.hset('node-red_deployed', mapping={ 'user_name': user_name, 'token': "", 'time': "", 'requests': 0 }) sync_success = False changes_were_made = False # Flag to track if sync modified files signal_file = Path("/tmp/node-red-sync-changes.signal") # Define path again try: # Use sys.executable to ensure using the same python interpreter print("Starting node synchronization...") sync_script_path = "/home/ubuntu/.node-red/sync_nodes.py" # Add error checking result = subprocess.run( [sys.executable, sync_script_path], check=True, # Raise CalledProcessError if script exits with non-zero code capture_output=True, text=True, timeout=600 # Add a generous timeout (10 minutes?) for npm operations ) print("Node synchronization script output:\n", result.stdout) if result.stderr: print("Node synchronization script stderr:\n", result.stderr) print("Node synchronization completed successfully.") sync_success = True if signal_file.exists(): changes_were_made = True logger.info("Sync script indicated changes were made (signal file found).") try: signal_file.unlink() # Consume the signal logger.debug("Removed signal file.") except OSError as e: logger.warning(f"Could not remove signal file {signal_file}: {e}") else: logger.info("Sync script indicated no changes were made (signal file not found).") print("No node changes detected by sync script.") except FileNotFoundError: print(f"ERROR: Sync script not found at {sync_script_path}") # Decide how to handle: exit, skip deployment, etc. #return # Or raise an exception except subprocess.CalledProcessError as e: print(f"ERROR: Node synchronization script failed with exit code {e.returncode}") print(f"Stderr:\n{e.stderr}") print(f"Stdout:\n{e.stdout}") # Decide how to handle: exit, skip deployment, etc. #return # Or raise an exception except subprocess.TimeoutExpired: print(f"ERROR: Node synchronization script timed out.") # Decide how to handle #return # Or raise an exception except Exception as e_sync: print(f"ERROR: An unexpected error occurred while running sync script: {e_sync}") #return # Or raise # --- 2. Restart Node-RED Service IF Sync Succeeded AND Changes Were Made --- restart_successful = True # Default to true if no restart needed if sync_success and changes_were_made: # Only restart if sync OK AND changes happened print("Node changes detected, restarting Node-RED service...") restart_successful = restart_node_red_service("node-red.service") if not restart_successful: print("ERROR: Node-RED service failed to restart properly after changes. Aborting deployment.") return # Exit processing this request elif sync_success and not changes_were_made: print("No node changes required restart.") else: # Sync failed, don't restart print("Skipping Node-RED restart due to sync script failure.") # --- 3. Combine and Deploy Flows (only if sync didn't fail AND restart was ok/not needed) --- if sync_success and restart_successful: # Proceed if sync ran ok and restart (if needed) was ok print("Proceeding with flow combination and deployment...") # ... (rest of combine and deploy logic) ... else: print("Skipping flow deployment due to previous errors during sync or restart.") # Get list of user directories base_dir = "/home/ubuntu/.node-red/users" user_dirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))] # Combine flows output_dir = "/home/ubuntu/.node-red/main_instance" flow_file = combine_user_flows(user_dirs, output_dir) # Deploy combined flow creds_file = os.path.join(output_dir, "common_flow_cred.json") success = deploy_combined_flow( flow_file, node_red_url="https://eluxnetworks.net:1999", credentials_file=creds_file, admin_user=ADMIN_USER, admin_password=ADMIN_PASSWORD ) #print(f"Combined flow created at: {combined_file}") #lets check if any node-reds are stale pattern = 'node_red_status_*' threshold = time.time() - 10 # Find all matching keys matching_keys = [] cursor = 0 while True: cursor, keys = redis_conn.scan(cursor=cursor, match=pattern, count=100) for key in keys: key_str = key.decode('utf-8') # Convert bytes to string # Get the last_activity value last_activity = redis_conn.hget(key_str, 'last_activity').decode('utf-8') pid = 0 try: pid = int(redis_conn.hget(key_str, 'pid')) if pid > 0: if last_activity: last_activity_value = int(float(last_activity)) # Check if it's below the threshold print(last_activity_value-threshold) if last_activity_value < threshold: #if True: if key_str not in permament_users: matching_keys.append(key_str) except: pass # Exit the loop when scan is complete if cursor == 0: break # Print the result if len(matching_keys) > 0: print(f"Found {len(matching_keys)} keys with last_activity < {threshold}:") for key in matching_keys: print(key) user_name = key[len("node_red_status_"):] time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") port, pid = GetNodeRedDetails(user_name) result = StopNodeRed(pid) #Lets store it to postgres and REDIS StoreNodeRedUserPortOff(port, user_name, time_s) redis_conn.hset(f'node_red_status_{user_name}', mapping={ 'port': 0, 'started_time': time_s, 'last_activity': time_s, 'pid': 0, }) print(result) # --- Cycle Cleanup & Wait --- elapsed_time = time.monotonic() - start_time #logger.info(f"Monitoring cycle finished in {elapsed_time:.2f} seconds.") wait_time = CHECK_INTERVAL_SECONDS - elapsed_time if wait_time > 0: #logger.debug(f"Waiting for {wait_time:.2f} seconds before next cycle.") # Use stop_event.wait() for graceful shutdown during sleep stop_event.wait(wait_time) else: logger.warning("Monitoring cycle took longer than the configured interval.") except Exception as e: logger.exception(f"Critical error in monitor_devices outer loop: {e}") time.sleep(10) # Wait before retrying the whole loop logger.info("Monitoring loop stopped.") def monitor_messages(): """Main monitoring loop that checks devices periodically.""" logger.info("Starting monitoring loop...") last_id = '0' try: while not stop_event.is_set(): try: # Add this outer try block #======================== Process messaging requests ======================================================== # Read from stream #logger.debug("About to read from Redis stream...") messages = redis_conn.xread({'messaging_requests_stream': last_id}) #logger.debug(f"Redis read successful, got {len(messages) if messages else 0} messages") #print(".", end="") if messages: call_acl = False for stream, msgs in messages: for msg_id, fields in msgs: try: logger.debug(f"Processing messaging request: {str(fields)}") function = fields[b'function'].decode('utf-8') if function == "new_caretaker": email = fields[b'email'].decode('utf-8') user_name = fields[b'user_name'].decode('utf-8') first_name = fields[b'first_name'].decode('utf-8') last_name = fields[b'last_name'].decode('utf-8') phone_number = fields[b'phone_number'].decode('utf-8') password = fields[b'password'].decode('utf-8') #devices = fields[b'devices'].decode('utf-8') signature = fields[b'signature'].decode('utf-8') #user_name = hash_data['user_name'] #first_name = hash_data['first_name'] #last_name = hash_data['last_name'] #phone_number = hash_data['phone_number'] #password = hash_data['password'] #devices = hash_data['devices'] #signature = hash_data['signature'] #requests = hash_data['requests'] template_html = read_file("welcome_template_short.html") #devices_lst = devices.split(",") new_user = { "first_name": first_name, "last_name": last_name, "user_name": user_name, "phone_number": phone_number, "signature": signature, "password": password } email_body = populate_welcome_email(template_html, new_user) # 4. Save the populated email to a new HTML file to preview it if "Error" not in email_body: with open('populated_welcome_email.html', 'w') as file: file.write(email_body) print("Successfully generated 'populated_welcome_email.html'.") # In a real application, you would use a library like smtplib to send this 'email_body' text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email #SendMessageTo(user_id, text_str) #need to send it to device via signature, since, if device is new, not MQTT_id is known. topic = "/well_" + signature SendMessageDirectTo(topic, text_str) sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") call_acl = True else: print(email_body) redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later last_id = msg_id elif function == "new_beneficiary": email = fields[b'email'].decode('utf-8') user_name = fields[b'user_name'].decode('utf-8') first_name = fields[b'first_name'].decode('utf-8') last_name = fields[b'last_name'].decode('utf-8') phone_number = fields[b'phone_number'].decode('utf-8') password = fields[b'password'].decode('utf-8') #devices = fields[b'devices'].decode('utf-8') signature = fields[b'signature'].decode('utf-8') #user_name = hash_data['user_name'] #first_name = hash_data['first_name'] #last_name = hash_data['last_name'] #phone_number = hash_data['phone_number'] #password = hash_data['password'] #devices = hash_data['devices'] #signature = hash_data['signature'] #requests = hash_data['requests'] template_html = read_file("new_beneficiary_template_short.html") #devices_lst = devices.split(",") new_user = { "first_name": first_name, "last_name": last_name, "user_name": user_name, "phone_number": phone_number, "signature": signature, "password": password } email_body = populate_welcome_email(template_html, new_user) # 4. Save the populated email to a new HTML file to preview it if "Error" not in email_body: with open('populated_welcome_email.html', 'w') as file: file.write(email_body) print("Successfully generated 'populated_welcome_email.html'.") # In a real application, you would use a library like smtplib to send this 'email_body' text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email #SendMessageTo(user_id, text_str) #need to send it to device via signature, since, if device is new, not MQTT_id is known. #lets NOT send message here... because adding new beneficiary/deployment would cause credentials on caretaker's phone to change #topic = "/well_" + signature #SendMessageDirectTo(topic, text_str) sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") call_acl = True else: print(email_body) redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later last_id = msg_id elif function == "credentials_updated": email = fields[b'email'].decode('utf-8') user_name = fields[b'user_name'].decode('utf-8') first_name = fields[b'first_name'].decode('utf-8') last_name = fields[b'last_name'].decode('utf-8') phone_number = fields[b'phone_number'].decode('utf-8') password = fields[b'password'].decode('utf-8') #devices = fields[b'devices'].decode('utf-8') signature = fields[b'signature'].decode('utf-8') #user_name = hash_data['user_name'] #first_name = hash_data['first_name'] #last_name = hash_data['last_name'] #phone_number = hash_data['phone_number'] #password = hash_data['password'] #devices = hash_data['devices'] #signature = hash_data['signature'] #requests = hash_data['requests'] template_html = read_file("new_credentials_template_short.html") #devices_lst = devices.split(",") new_user = { "first_name": first_name, "last_name": last_name, "user_name": user_name, "phone_number": phone_number, "signature": signature, "password": password } email_body = populate_welcome_email(template_html, new_user) # 4. Save the populated email to a new HTML file to preview it if "Error" not in email_body: with open('populated_welcome_email.html', 'w') as file: file.write(email_body) print("Successfully generated 'populated_welcome_email.html'.") # In a real application, you would use a library like smtplib to send this 'email_body' text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email #SendMessageTo(user_id, text_str) #need to send it to device via signature, since, if device is new, not MQTT_id is known. topic = "/well_" + signature SendMessageDirectTo(topic, text_str) sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") call_acl = True else: print(email_body) redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later last_id = msg_id elif function == "update_acl": call_acl = True redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later last_id = msg_id except Exception as e: print(f"Error in Processing messaging request: {e}") #call acl_manager.py from /home/ubuntu/mqtt-auth-service. New user was added in well-api if call_acl: try: result = subprocess.run( [sys.executable, '/home/ubuntu/mqtt-auth-service/acl_manager.py'], capture_output=True, text=True, timeout=60 # Add timeout to prevent hanging ) if result.returncode == 0: print("ACL manager executed successfully") print("Output:", result.stdout) else: print("ACL manager failed with return code:", result.returncode) print("Error:", result.stderr) except subprocess.TimeoutExpired: print("ACL manager execution timed out") except Exception as e: print(f"Error executing ACL manager: {e}") time.sleep(0.1) hash_data = GetRedisMap('node_red_requests') #logger.debug(f"node_red_requests: {hash_data}") requests_count = 0 if hash_data != {}: requests_count = int(hash_data['requests']) if requests_count > 0: logger.debug(f"node_red_requests: {str(hash_data)}") user_name = hash_data['user_name'] #user needs Node-red. Is his session up and running? port, pid = IsNRRunning(user_name) #delete request #this might need switching to queue... todo redis_conn.hset('node_red_requests', mapping={ 'user_name': user_name, 'token': "", 'time': "", 'requests': 0 }) if port == 0 or pid == 0: #not running token = hash_data['token'] time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") port = GetAvailablePort(user_name) users_p = GetUserPriviledges(user_name) ps = users_p[0][1] SetupUserEnvironment(user_name, token, ps) pid = StartNodeRed(port, user_name) #Lets store it to postgres and REDIS StoreNodeRedUserPortOn(port, user_name, pid, time_s) redis_conn.hset(f'node_red_status_{user_name}', mapping={ 'port': port, 'started_time': time_s, 'last_activity': time_s, 'pid': pid, }) else: #time_s = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") time_s = time.time() redis_conn.hset(f'node_red_status_{user_name}', mapping={ 'port': port, 'started_time': time_s, 'last_activity': time.time(), 'pid': pid }) except redis.RedisError as e: logger.error(f"Redis error in message monitoring: {e}") time.sleep(5) # Wait before retry except Exception as e: logger.error(f"Unexpected error in message processing: {e}") time.sleep(1) except Exception as e: logger.exception(f"Critical error in monitor_messages: {e}") finally: logger.info("monitor_messages loop stopped.") # --- Signal Handling --- def signal_handler(signum, frame): """Handles termination signals gracefully.""" logger.warning(f"Received signal {signal.Signals(signum).name}. Initiating graceful shutdown...") stop_event.set() t1.stop() t2.stop() for job_id in job_registry: function_name = job_registry[job_id]['function_name'] argument = job_registry[job_id]['argument'] task_key = f"{function_name}_{argument}" # Remove from task registry if this is the current job for this task/arg if task_key in task_arg_registry and task_arg_registry[task_key] == job_id: del task_arg_registry[task_key] # Remove the job from the scheduler scheduler.remove_job(job_id) logger.info(f"Cancelled job '{job_id}' with argument '{job_registry[job_id]['argument']}'") def FahrenheitToCelsius(F): if isinstance(F, str): F = float(F) C = (F - 32) * 5/9 return C def BitIndex(bit_nr): return -bit_nr - 1 def RadarLarger(radar, radar_threshold): thr_parts = radar_threshold[0].split("_") field = thr_parts[0] if field == "s28": radar_val = sum(radar[1][3:]) / (7 * radar[1][0]) elif field == "m0": radar_val = radar[0][5] / radar[1][0] elif field == "m1": radar_val = radar[0][6] / radar[1][0] elif field == "m2": radar_val = radar[0][7] / radar[1][0] elif field == "m3": radar_val = radar[0][8] / radar[1][0] elif field == "m4": radar_val = radar[0][9] / radar[1][0] elif field == "m5": radar_val = radar[0][10] / radar[1][0] elif field == "m6": radar_val = radar[0][11] / radar[1][0] elif field == "m7": radar_val = radar[0][12] / radar[1][0] elif field == "m8": radar_val = radar[0][13] / radar[1][0] elif field == "s2": radar_val = radar[1][3] / radar[1][0] elif field == "s3": radar_val = radar[1][4] / radar[1][0] elif field == "s4": radar_val = radar[1][5] / radar[1][0] elif field == "s5": radar_val = radar[1][6] / radar[1][0] elif field == "s6": radar_val = radar[1][7] / radar[1][0] elif field == "s7": radar_val = radar[1][8] / radar[1][0] elif field == "s8": radar_val = radar[1][9] / radar[1][0] if radar_val > radar_threshold[1]: return radar_val, True else: return radar_val, False def ProcessQueue(): #here we are looking for alarm conditions in data global in_queue if in_queue: while in_queue: try: tim, topic, messagein = in_queue.pop(0) #print(".", end="") MAC = topic[1:] if MAC in mac_to_device_id: device_id = mac_to_device_id[MAC] deployment_id = device_to_deployment[device_id] if deployment_id in alarms_settings_all: alarms_settings = alarms_settings_all[deployment_id] alarm_armed_settings_str = alarms_settings["enabled"] # bit 2: Burglar Alarm (index 0), bit 1: Time Alone Alarm is Set (index 1), bit 0: Time alone Warning is Set (index 2) if device_id in device_alerts_all: device_alarm_settings = device_alerts_all[device_id] message_dict = json.loads(messagein.decode('utf-8')) if "enabled_alarms" in device_alarm_settings: #Lets check temperatures temp_offset = -16.0 if "1" in device_alarm_settings["enabled_alarms"][BitIndex(7):BitIndex(3)]: #Temperatures Too High/Low trigger used? enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered #print(alarm_armed_settings_str) if "temperature" in message_dict: temperature = message_dict["temperature"] + temp_offset #at this point temperature is in C if temperature > 0 and temperature < 100: #ignore others if device_id in device_alerts_all: ''' { "enabled_alarms":"000000000101", "armed_states":"000000000000", "stuck_minutes_warning":"771.3", "stuck_warning_method_0":"SMS", "stuck_minutes_alarm":600, "stuck_alarm_method_1":"PHONE", "absent_minutes_warning":"-1013.4", "absent_warning_method_2":"SMS", "absent_minutes_alarm":30, "absent_alarm_method_3":"PHONE", "temperature_high_warning":"85", "temperature_high_warning_method_4": "SMS","temperature_high_alarm":"95", "temperature_high_alarm_method_5":"PHONE", "temperature_low_warning":"60", "temperature_low_warning_method_6":"SMS", "temperature_low_alarm":"50", "temperature_low_alarm_method_7":"PHONE", "radar_alarm_method_8":"MSG", "pressure_threshold":"15", "pressure_alarm_method_9":"MSG", "light_threshold":"15", "light_alarm_method_10":"MSG", "smell_alarm_method_11":"EMAIL", "rearm_policy":"At midnight" } ''' #bits in enabled_alarms and armed_states explained #"stuck_warning_method_0": "SMS", -1 #"stuck_alarm_method_1": "PHONE", -2 #"absent_warning_method_2": "SMS", -3 #"absent_alarm_method_3": "PHONE", -4 #"temperature_high_warning_method_4": "SMS", -5 #"temperature_high_alarm_method_5": "PHONE", -6 #"temperature_low_warning_method_6": "SMS", -7 #"temperature_low_alarm_method_7": "PHONE", -8 #"radar_alarm_method_8":"MSG", -9 #"pressure_alarm_method_9":"MSG", -10 #"light_alarm_method_10":"MSG" -11 #"smell_alarm_method_11":"MSG" -12 #hast ot be enabled and not triggerred to continue comparing if enabled_alarms_str[BitIndex(5)] == "1" and armed_states[BitIndex(5)] == "1": #Temperatures Too High Alarm! if temperature > FahrenheitToCelsius(device_alarm_settings["temperature_high_alarm"]): #cancel alarm and warning, until re-armed armed_states = set_character(armed_states, 5, "0") armed_states = set_character(armed_states, 4, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["temperature_high_alarm_method_5"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"Alarm @ {first_last_name}: Temperature too high! ({temperature:.1f} C) in {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) #at this point we also need to store it to DB, otherwise when HTML GUI is loaded fact that trigger happened will not be reflected! elif enabled_alarms_str[BitIndex(4)] == "1" and armed_states[BitIndex(4)] == "1": #Temperatures Too High Warning! if temperature > FahrenheitToCelsius(device_alarm_settings["temperature_high_warning"]): #cancel warning, until re-armed armed_states = set_character(armed_states, 4, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["temperature_high_warning_method_4"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"Warning @ {first_last_name}: Temperature too high! ({temperature:.1f} C) in {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) if armed_states[BitIndex(7)] == "1" and armed_states[BitIndex(7)] == "1": #Temperatures Too Low Alarm! if temperature < FahrenheitToCelsius(device_alarm_settings["temperature_low_alarm"]): #cancel alarm and warning, until re-armed armed_states = set_character(armed_states, 7, "0") armed_states = set_character(armed_states, 6, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["temperature_low_alarm_method_7"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"Alarm @ {first_last_name} Temperature too low! ({temperature:.1f} C) in {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) elif armed_states[BitIndex(6)] == "1" and armed_states[BitIndex(6)] == "1": #Temperatures Too Low Warning! if temperature < FahrenheitToCelsius(device_alarm_settings["temperature_low_warning"]): #cancel warning, until re-armed armed_states = set_character(armed_states, 6, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["temperature_low_warning_method_6"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"Warning @ {first_last_name} Temperature too low! ({temperature:.1f} C) in {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) #logger.info(f"{tim}, {mac}, {temperature}") else: #radar packet pass #lets check if alarm condition if alarm_armed_settings_str[BitIndex(2)] == "1": #alarm is armed! if "radar" in message_dict: enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered #if device_id == 534: # print("Here") if enabled_alarms_str[BitIndex(8)] == "1" and armed_states[BitIndex(8)] == "1": radar = message_dict["radar"] radar_threshold_str = GetRedisString(f"radar_threshold{device_id}") radar_threshold = ast.literal_eval(radar_threshold_str) radar_value, larger = RadarLarger(radar, radar_threshold) print(device_id, radar_value, larger) if larger: #cancel alarm for this room, until re-armed armed_states = set_character(armed_states, 8, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["radar_alarm_method_8"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected presence in the {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) if "pressure" in message_dict: bit_nr = 9 enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered if enabled_alarms_str[BitIndex(bit_nr)] == "1" and armed_states[BitIndex(bit_nr)] == "1": if message_dict["mtype"] == 1: pressure = message_dict["pressure"] pressure_threshold = float(device_alarm_settings["pressure_threshold"]) #GetRedisString(f"radar_threshold{device_id}") print(device_id, pressure, pressure_threshold) if pressure > pressure_threshold: #cancel alarm for this room, until re-armed armed_states = set_character(armed_states, bit_nr, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["pressure_alarm_method_9"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected doors opening in the {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) if "light" in message_dict: bit_nr = 10 enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered if enabled_alarms_str[BitIndex(bit_nr)] == "1" and armed_states[BitIndex(bit_nr)] == "1": if message_dict["mtype"] == 4: light = message_dict["light"] light_threshold = float(device_alarm_settings["light_threshold"]) #GetRedisString(f"radar_threshold{device_id}") print(device_id, light, light_threshold) if light > light_threshold: #cancel alarm for this room, until re-armed armed_states = set_character(armed_states, bit_nr, "0") dev_det = devices_details_map[device_id] location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["light_alarm_method_10"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected light change in the {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) else: pass #alarm not setup for this device #print(f"{deployment_id} not in {alarms_settings_all}") else: pass #logger.error(f"MAC: {mac} not part of any deployment") except Exception as e: logger.error(f"Error: {str(e)} {traceback.format_exc()}") def ProcessQueueM(): #global queue global in_queueM, PROCESSED_COUNTER, in_packets_count #print(".") if in_queueM: #queue=qinfo.Open(2,0) # Open a ref to queue #msgm=win32com.client.Dispatch("MSMQ.MSMQMessage") while in_queueM: #if True: try: topic, messagein = in_queueM.pop(0) messagein = messagein.decode('utf-8').upper() logger.info(f"Processing: {topic} {messagein}") if messagein != "PIN|OK": #ignore this one mac_pattern = r'([0-9A-Fa-f]{12})' match = re.search(mac_pattern, messagein) MAC = match.group(1) record = { 'time': time.time(), 'result': messagein } # Convert dictionary to JSON string for storage in Redis list record_json = json.dumps(record) logger.info(f"Pushing to REDIS: from_alerter_{MAC} {record_json}") redis_conn.lpush(f'from_alerter_{MAC}', record_json) # ProcessPacket(new_message_dict) except Exception as err: logger.error(err) def MAC_from_id(device_id): conn = get_db_connection() sql = f"SELECT device_mac FROM public.devices WHERE device_id = {device_id}" with conn.cursor() as cur: cur.execute(sql) result = cur.fetchone() #cur.fetchall() if result != None: return result[0] else: return "" def CheckRedisMessages(): global reload_needed requests_count = 0 #print(f"CheckRedisMessages") # Check if queue exists and has items. These items are from manual GUI interactions in alerts page #Any test messages requested to be sent? queue_length = redis_conn.llen('send_requests') if queue_length > 0: # Process each item for i in range(queue_length): item_json = redis_conn.rpop('send_requests') #logger.info(f"Received in REDIS: send_requests {item_json}") print(f"Received in REDIS: send_requests {item_json}") if item_json is None: break try: record = json.loads(item_json) requests_count += 1 # Print the record action = "" logger.info(f"Request #{requests_count}:") for key, value in record.items(): logger.info(f" {key}: {value}") function = "" if "function" in record: function = record["function"] if function == "set_group": group_id = record["group_id"] if "mac" in record: MAC = record["mac"].upper() elif "device_id" in record: device_id = record["device_id"] MAC = MAC_from_id(device_id).upper() content = UNLOCK MQSendM(f"/{MAC}", content) time.sleep(2) content = f"Z|{group_id}" MQSendM(f"/{MAC}", content) elif function == "reboot": if "mac" in record: MAC = record["mac"].upper() elif "device_id" in record: device_id = record["device_id"] MAC = MAC_from_id(device_id).upper() content = UNLOCK MQSendM(f"/{MAC}", content) time.sleep(2) content = "s" MQSendM(f"/{MAC}", content) record = { 'time': time.time(), 'result': "Reboot sent..." } # Convert dictionary to JSON string for storage in Redis list record_json = json.dumps(record) logger.info(f"Pushing to REDIS: from_alerter_{MAC} {record_json}") redis_conn.lpush(f'from_alerter_{MAC}', record_json) elif function == "set_well_id": if "mac" in record: MAC = record["mac"].upper() else: device_id = record["device_id"] MAC = MAC_from_id(device_id).upper() well_id = record["well_id"] content = UNLOCK MQSendM(f"/{MAC}", content) time.sleep(2) content = f"-|{well_id}" MQSendM(f"/{MAC}", content) elif function == "get_device_live": if "mac" in record: MAC = record["mac"].upper() else: device_id = record["device_id"] MAC = MAC_from_id(device_id).upper() content = UNLOCK MQSendM(f"/{MAC}", content) time.sleep(2) content = "r" MQSendM(f"/{MAC}", content) elif function == "set_network_id": if "mac" in record: MAC = record["mac"].upper() else: device_id = record["device_id"] MAC = MAC_from_id(device_id).upper() network_id = record["network_id"] content = UNLOCK MQSendM(f"/{MAC}", content) time.sleep(2) content = f"0|{network_id}" MQSendM(f"/{MAC}", content) else: if "action" in record: action = record["action"] if action == "reload": reload_needed = True if action != "reload": method = record["method"] location_str = record["location"] location = location_str.split("_")[2] deployment_id = record["deployment_id"] content = record["content"] feature = record["feature"] enabledCellContent = record["enabledCellContent"] currentUnits = record["currentUnits"] currentAlertTableMode = record["currentAlertTableMode"] #Warning/Alarm test_only = record["test_only"] user_name = record["user_name"] user_first_last_name = GetBeneficiaryFromDeployment(deployment_id) if feature == "stuck": msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} is spending more than {enabledCellContent} in {location}" elif feature == "absent": msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} did not visit {location} in more than {enabledCellContent[1:-1]} {currentUnits}" elif feature == "tempLow": msg_ext = f"{currentAlertTableMode}: {content} temperature is lower then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" elif feature == "tempHigh": msg_ext = f"{currentAlertTableMode}: {content} temperature is higher then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" elif feature == "pressure": msg_ext = f"{currentAlertTableMode}: {content} door was opened or closed in the {location} at {user_first_last_name}" elif feature == "radar": msg_ext = f"{currentAlertTableMode}: {content} motion detected in the {location} at {user_first_last_name}" else: msg_ext = f"{currentAlertTableMode}: {content} {feature} in {location} at {user_first_last_name}" SendAlerts(deployment_id, method, msg_ext, "Test message", user_name) #these are testing messages, so do not count them as real triggered... so do not update in REDIS #StoreLastSentToRedis(deployment_id) print("-" * 40) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON from queue item: {e}") continue #Any test messages requested to be sent? queue_length = redis_conn.llen('new_alarms') if queue_length > 0: for i in range(queue_length): print(f"Processing send_requests message from queue...") item_json = redis_conn.rpop('new_alarms') logger.info(f"Received in REDIS: new_alarms {item_json}") if item_json is None: break try: record = json.loads(item_json) deployment_id = int(record["deployment_id"]) device_id = int(record["device_id"]) if device_id == 0: reload_needed = True else: print(record) device_alarms_json = GetRedisString('alarm_device_settings_'+str(device_id)) deployment_alarms_json = GetRedisString('alarm_deployment_settings_'+str(deployment_id)) alarms_settings_all[deployment_id] = json.loads(deployment_alarms_json) device_alerts_all[device_id] = json.loads(device_alarms_json) print(device_alarms_json) print(deployment_alarms_json) #method = record["method"] #location_str = record["location"] except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON from queue item: {e}") continue #print(f"Total requests processed: {requests_count}") return requests_count def ReloadIfNeeded(): global reload_needed, t1, t2 if reload_needed: t1.pause() t2.pause() ok = load_device_configurations() print(f"load_device_configurations in ReloadIfNeeded! {ok}") reload_needed = False t1.resume() t2.resume() # --- Main Execution --- if __name__ == "__main__": logger.info(f"Starting Well Alert Monitoring Service (PID: {os.getpid()})...") logger.info(f"Alert Timeout Threshold: {TIME_OUT_THRESHOLD_MIN} minutes") logger.info(f"Check Interval: {CHECK_INTERVAL_SECONDS} seconds") # Register signal handlers signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) #UpdateACL("artia", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFydGlhIiwiZXhwIjoxNzQ0NzY4NDYzfQ.-DFXbAeXTpKt4a-rGAPzOqcV6HBho0264qIOfZ3QdZM") #SetupUserEnvironment("artia", "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFydGlhIiwiZXhwIjoxNzQ0NzcyNjQ2fQ.cu5CUWBn_6cwj5hidP61vRlr-GZwi08bgxeRFxiU2fI") if not load_device_configurations(): logger.critical("Failed to load initial device configurations. Exiting.") # Clean up potentially established connections sys.exit(1) t1 = perpetualTimer(1, ensure_mqtt_connection) t2 = perpetualTimer(.01, ProcessQueue) t3 = perpetualTimer(1, ReloadIfNeeded) t4 = perpetualTimer(1, ensure_mqtt_connectionM) t5 = perpetualTimer(.01, ProcessQueueM) SetupTasks() t1.start() t2.start() t3.start() t4.start() t5.start() schedule_task(task_a, "first", 1) schedule_task(task_b, "second", 1) musr = os.getenv('GU') #print(musr) sender = EmailSender(os.getenv('GU'), os.getenv('GP'), max_workers=3) sender.start_workers() #SendAlerts(21, "MSG", f"Test: User way too long ({120} minutes) in Bathroom", "") SendAlerts(21, "EMAIL", f"well-alert was started", "Program started", "robster") #SendAlerts(21, "SMS", f"Test: User way too long ({120} minutes) in Bathroom", "", "") #SendAlerts(21, "PHONE", f"Test: User way too long ({120} minutes) in Bathroom", "") SendPhoneCall("-4086462191", "Hi Robert. How are you?") #SendPhoneCall("4086462191", "Hi Robert. How are you?", "") #SendPhoneCall("4085505424", "Hi Fred. How are you? Are you hungry?", "") #SendPhoneCall("4087055709", "Hi Bernhard. How are you? Are you hungry?", "") #SendPhoneCall("4086903883", "Hi Sasha. How are you? Are you hungry?", "") #SendPhoneCall("6507968313", "Hi Julica. How are you? Are you hungry?", "") #NOT WORKING!!! #SendPhoneCall("+385981919229", "Hi Danko. Are you hungry?", "") #SendPhoneCall("4086462191", "Hi Danko. How are you? Your mom seems stable.", "") #success = SendSMSTo("(408)646-2191", "This is only a test") #time.sleep(10) #content = UNLOCK #MQSendM("/901506CA3C7C", content) #time.sleep(2) #content = "Z|1" #MQSendM("/901506CA3C7C", content) template_html = read_file("welcome_template_short.html") try: # Start the main monitoring loop monitor_thread = threading.Thread(target=monitor_devices, daemon=True) monitor_thread.start() monitor_m_thread = threading.Thread(target=monitor_messages, daemon=True) monitor_m_thread.start() # Keep the main thread alive until stop_event is set by signal or error while not stop_event.is_set(): # Can add periodic health checks here if needed CheckRedisMessages() # Optional: Check if threads are still alive and restart if needed if not monitor_thread.is_alive(): logger.error("Device monitor thread died! Restarting...") monitor_thread = threading.Thread(target=monitor_devices, daemon=True) monitor_thread.start() if not monitor_m_thread.is_alive(): logger.error("Message monitor thread died! Restarting...") monitor_m_thread = threading.Thread(target=monitor_messages, daemon=True) monitor_m_thread.start() time.sleep(1) # Check stop_event periodically logger.info("Stop event received, waiting for monitoring threads to finish...") # Wait for both threads to finish monitor_thread.join(timeout=CHECK_INTERVAL_SECONDS + 10) if monitor_thread.is_alive(): logger.warning("Device monitoring thread did not finish cleanly.") monitor_m_thread.join(timeout=10) if monitor_m_thread.is_alive(): logger.warning("Message monitoring thread did not finish cleanly.") except Exception as e: logger.exception(f"An unexpected critical error occurred in main execution: {e}") finally: logger.info("Well Alert Monitoring Service finished.") sys.exit(0) # Explicitly exit with success code after cleanup print("Last line") # This will never execute due to sys.exit(0) above