diff --git a/well-alerts.py b/well-alerts.py index 72f9f43..6acbf4f 100644 --- a/well-alerts.py +++ b/well-alerts.py @@ -1,11 +1,13 @@ #!/usr/bin/env python3 - +#this maintains 2 mqtt connections +#1 to wellnua (to talk and listen to devices) +#2 to eluxnetworks.net import os import time import signal import sys import threading -from threading import Timer +from threading import Timer, Lock import json import logging import datetime @@ -41,6 +43,7 @@ from email.mime.multipart import MIMEMultipart import queue import telnyx # For SMS import pytz +import struct #from datetime import datetime, timedelta @@ -76,6 +79,8 @@ handler.setFormatter(formatter) logger.addHandler(handler) #permament_users = ["node_red_status_brankol", "node_red_status_robster", "node_red_status_bknigge"] permament_users = [] + +filesDir = "/home/app/well_web_storage" # Database Configuration DB_NAME = os.getenv('DB_NAME') DB_USER = os.getenv('DB_USER') @@ -86,8 +91,9 @@ DB_PORT = os.getenv('DB_PORT', '5432') ADMIN_USER = os.getenv('ADMIN_USER') ADMIN_PASSWORD = os.getenv('ADMIN_PASSWORD') -# Mosquitto files +UNLOCK = os.getenv('UNLOCK') +# Mosquitto files MOSQUITTO_PASSWORD_FILE = os.getenv('MOSQUITTO_PASSWORD_FILE') MOSQUITTO_ACL_FILE = os.getenv('MOSQUITTO_ACL_FILE') @@ -106,11 +112,26 @@ exchange = Exchange("", type='direct') client = None connected = False in_queue = [] -MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net" -mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port -mqtt_client_id = "client777" +MQTTSERVER = os.getenv('MQTTSERVER', "") #"mqtt.eluxnetworks.net" +mqtt_port = int(os.getenv('MQTT_PORT', "1883")) # TLS port +mqtt_client_id = "client777aaa" +MQTT_USER=os.getenv('MQTT_USER', "") +MQTT_PASSWORD=os.getenv('MQTT_PASSWORD', "") use_tls = MQTTSERVER not in ["localhost", "127.0.0.1", "192.168.68.70"] +#MQTT Master Configuration (wellnua mqtt!) +RECEIVED_COUNTER = 0 +clientM = None +connectedM = False +in_queueM = [] +MQTTSERVERM = os.getenv('MQTTSERVERM', "") +mqtt_portM = int(os.getenv('MQTT_PORTM', "8883")) # TLS port +mqtt_client_idM = "client90a-authn-ID" +MQTT_USERM=os.getenv('MQTT_USERM', "") +MQTT_PASSWORDM=os.getenv('MQTT_PASSWORDM', "") +use_tlsM = True +CONNECTION_STATE = 0 #0=idle, 1=connected, 2=lost connection, 3=connection requested + # Alerting Thresholds DEFAULT_TIMEOUT_MINUTES = 48 * 60 # Default 48 hours in minutes TIME_OUT_THRESHOLD_MIN = int(os.getenv('time_out_threshold', DEFAULT_TIMEOUT_MINUTES)) @@ -147,7 +168,7 @@ for directory in [MAIN_INSTANCE_DIR, USERS_DIR, SHARED_NODES_DIR]: # Monitoring Interval CHECK_INTERVAL_SECONDS = 1 local_daily_minute_last = 0 - +reload_needed = False # --- Global Variables --- db_conn = None @@ -283,30 +304,147 @@ def list_scheduled_tasks(): class perpetualTimer(): - def __init__(self,t,hFunction): - self.t=t + def __init__(self, t, hFunction): + self.t = t self.hFunction = hFunction - self.thread = Timer(self.t,self.handle_function) + self.thread = None + self.is_running = False + self.lock = Lock() + + def _run(self): + with self.lock: + if not self.is_running: + return - def handle_function(self): self.hFunction() - self.thread = Timer(self.t,self.handle_function) - self.thread.start() + + with self.lock: + if self.is_running: + self.thread = Timer(self.t, self._run) + self.thread.start() def start(self): - self.thread.start() + with self.lock: + if not self.is_running: + self.is_running = True + self.thread = Timer(self.t, self._run) + self.thread.start() - def cancel(self): - self.thread.cancel() + def stop(self): + with self.lock: + self.is_running = False + if self.thread: + self.thread.cancel() + + def pause(self): + self.stop() + + def resume(self): + self.start() + + def is_active(self): + with self.lock: + return self.is_running + +def MQSend_worker(topic, content, retain=False): + """Worker function that handles MQTT sending in a separate thread""" + global client, mqtt_client_id, connected + + #currentTime = int(time.time()) + #print(">", currentTime, topic, content) + + # Limit connection attempts to prevent flooding + max_connection_attempts = 3 + connection_attempts = 0 + + # Use a local semaphore to prevent multiple simultaneous connection attempts + if not hasattr(MQSend_worker, 'connection_in_progress'): + MQSend_worker.connection_in_progress = False + try: + # Only attempt connection if not already in progress + if client is None or not connected: + if not MQSend_worker.connection_in_progress and connection_attempts < max_connection_attempts: + MQSend_worker.connection_in_progress = True + try: + print(f"Thread {threading.current_thread().name} attempting MQTT connection") + client = connect_mqtt(mqtt_client_id) + connection_attempts += 1 + finally: + MQSend_worker.connection_in_progress = False + + # If we have a valid connection, try to send + if connected and client is not None: + try: + if retain: + result = client.publish(topic, content, qos=1, retain=retain) # Higher QoS for better throughput NOT! + else: + result = client.publish(topic, content, qos=1, retain=retain) # Lower QoS for better throughput + + if hasattr(result, 'rc') and result.rc == mqtt.MQTT_ERR_SUCCESS: + pass + print(f"Successfully published to {topic}") + else: + print(f"Failed to publish to {topic}") + except Exception as err: + print(f"Error in MQSend publish: {str(err)}") + else: + print("MQTT not connected, message not sent") + + except Exception as err: + print(f"Error in MQSend_worker: {str(err)}") -def on_message(client_, userdata, msg): #message from GUI +def on_message(clientt, userdata, msg): global in_queue #print(msg.topic+" "+str(msg.payload)) #msga = msg.payload.decode("ascii") #print(msg.timestamp, msg.topic, msg.payload) in_queue.append((str(time.time()), msg.topic, msg.payload)) +def MQSendM(topic, content, retain=False): + global clientM + + currentTime = int(time.time()) + logger.info(f"> {currentTime} , {topic}, {content}") + try: + enc_msg = content + if retain: + clientM.publish(topic, enc_msg, qos=2, retain=retain) + else: + clientM.publish(topic, enc_msg, qos=1, retain=retain) + except Exception as err: + print("Err2B:"+ str(err)) + +# The callback for when the client receives a CONNACK response from the server. +def on_connectM(clientt, userdata, flags, rc, properties=None): + + global CONNECTION_STATE, in_queueM + + logger.info(f"Connected to {MQTTSERVERM} with result code {str(rc)}") + #subbedTo = [("/w",1),("wellnuotopics/topic1",1), ("/well_hub",1)] + subbedTo = [("/well_hub",1)] + #GLog("subscribing to /w and /w_enc") + logger.info(f"MQTTM subscribing to {subbedTo}") + clientt.subscribe(subbedTo) + CONNECTION_STATE = 1 + in_queueM = [] + +def on_disconnectM(clientt, userdata, rc): + global CONNECTION_STATE + + if rc != 0: + if True: #CONNECTION_STATE != 2: + logger.info(f"MQTTM Found disconnected.") + CONNECTION_STATE = 2 #lost connection + + +def on_messageM(client_, userdata, msg): #message from GUI + global in_packets_count, in_queueM, RECEIVED_COUNTER + RECEIVED_COUNTER += 1 + if len(msg.payload) > 0: + logger.info(f"<<<<< {msg.payload}") + in_queueM.append([msg.topic,msg.payload]) + def connect_mqtt(client_id): global client, connected @@ -334,13 +472,13 @@ def connect_mqtt(client_id): logger.info(f"subscribing to , {subbedTo}") client.subscribe(subbedTo) else: - logger.error(f"Failed to connect, return code {rc}") + logger.error(f"Failed to connect to {MQTTSERVER} with {unique_client_id} return code {rc}") connected = False - def on_disconnect_wrapper(client, userdata, rc, properties=None): + def on_disconnect_wrapper(client, userdata, flags, reason_code, properties): global connected connected = False - logger.info(f"Disconnected with result code {rc}") + logger.info(f"Disconnected with result code {reason_code}") # Set the callbacks mqtt_client.on_connect = on_connect_wrapper @@ -357,7 +495,7 @@ def connect_mqtt(client_id): #) # Set username and password - mqtt_client.username_pw_set("well_user", "We3l1_best!") + mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD) # Connect with shorter timeout mqtt_client.connect(MQTTSERVER, mqtt_port, keepalive=30) @@ -412,6 +550,45 @@ def ensure_mqtt_connection(): return True # Already connected +def ensure_mqtt_connectionM(): + global clientM, CONNECTION_STATE, RECEIVED_COUNTER + random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6)) + unique_client_idm = f"{random_suffix}-{int(time.time())}" + if CONNECTION_STATE == 0: + #clientM = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, "client_id8") + clientM = mqtt.Client(client_id=unique_client_idm, protocol=mqtt.MQTTv5, callback_api_version=mqtt.CallbackAPIVersion.VERSION2) + + clientM.tls_set( + ca_certs="wellnua.com.crt", # Path to your new CA certificate + ) + clientM.username_pw_set(MQTT_USERM, MQTT_PASSWORDM) + clientM.on_connect = on_connectM + clientM.on_disconnect = on_disconnectM + clientM.on_message = on_messageM + print(f"MQTTM Connecting to {MQTTSERVERM}...") + clientM.connect(MQTTSERVERM, mqtt_portM, keepalive=30) + + try: + clientM.loop_forever() + print("MQTTM Stopped listening!") + except Exception as e: + print(e) + time.sleep(3) + CONNECTION_STATE = 3 + pass + elif CONNECTION_STATE == 1: #connected + if RECEIVED_COUNTER == 0: + print("No new data...") + CONNECTION_STATE = 2 + elif CONNECTION_STATE == 2: #lost connection + print("MQTTM Lost connection. Going to idle...") + if clientM.is_connected(): + clientM.disconnect() + time.sleep(2) + #clientM.reconnect + CONNECTION_STATE = 0 + elif CONNECTION_STATE == 3: #connection requested + print("MQTTM Waiting for connection...") class NodePackageHandler(FileSystemEventHandler): @@ -828,7 +1005,7 @@ def GetRedisMap(key_name): def GetDeviceDetails(cur, deployment_ids, location_id): #ID, Well id, MAC, Last_Message, Location, Description, Deployment - macs = [mac for _, mac in deployment_ids] + macs = [MAC for _, MAC in deployment_ids] #macs = list(deployment_ids.keys()) macs_string_nq = ",".join(macs) macs_string = "'" + "','".join(macs) + "'" @@ -867,9 +1044,9 @@ def GetDeviceDetails(cur, deployment_ids, location_id): devices_ids_list = [x[0] for x in devices_ids_records] device_ids_string = ",".join(map(str, devices_ids_list)) #sql = f"SELECT device_id, MAX(time) as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) GROUP BY device_id" #to slow - sql = f"SELECT DISTINCT ON (device_id) device_id, time as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) AND time > now() - INTERVAL '1 day' ORDER BY device_id, time DESC" - logger.info(sql) - cur.execute(sql) + sql1 = f"SELECT DISTINCT ON (device_id) device_id, time as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) AND time > now() - INTERVAL '1 day' ORDER BY device_id, time DESC" + logger.info(sql1) + cur.execute(sql1) devices_times = cur.fetchall()#cur.fetchone() #returns times of last readings found_device_details = {} @@ -896,8 +1073,11 @@ def GetDeviceDetails(cur, deployment_ids, location_id): #print(last_message_epoch) #print(type(last_message_epoch)) device_id = device_table_record[0] - mac = device_table_record[1] + MAC = device_table_record[1] well_id = device_table_record[2] + + if well_id == 230: + print(".") description = device_table_record[3] if description == None: @@ -909,7 +1089,7 @@ def GetDeviceDetails(cur, deployment_ids, location_id): if location_id == None: location_id = 0 - alert_details = device_table_record[-1] + alert_details = device_table_record[17] if alert_details == None: alert_details = {}#{'red_sec': 28801, 'yellow_sec': 14401} else: @@ -918,7 +1098,9 @@ def GetDeviceDetails(cur, deployment_ids, location_id): last_seen = GetRedisFloat(f'lastseen_{device_id}') if last_seen == None: last_seen = 0 - row_data = [device_id, well_id, mac, last_message_epoch, location_names[location_id], description, deployment_ids[cnt][0], alert_details, last_seen] + if well_id == 230: + print("stop") + row_data = [device_id, well_id, MAC, last_message_epoch, location_names[location_id], description, deployment_ids[cnt][0], alert_details, last_seen] cnt += 1 all_details.append(row_data) @@ -949,8 +1131,8 @@ def GetVisibleDevicesPerLocation(deployments, location): macs_group = literal_eval(dev_group) else: macs_group = dev_group.split(',') - for mac in macs_group: - deployment_ids.append((deployment_id, mac)) + for MAC in macs_group: + deployment_ids.append((deployment_id, MAC)) devices_details = GetDeviceDetails(cur, deployment_ids, location_indexes[location]) @@ -1055,7 +1237,7 @@ def GetCaretakers(deployment_id, users): else: access_to_list = [access_to[1]] - if deployment_id_str in access_to_list: + if deployment_id_str in access_to_list or access_to_list == ['-1']: caretakers.append((access_to[0], access_to[2], access_to[3])) @@ -1269,15 +1451,16 @@ def update_acl_file(): # Add user permissions for username, topics, permissions in users: - acl_content.append(f"user {username}") - for i in range(len(topics)): - topic = topics[i] - perm = permissions[i] - if 'r' in perm: - acl_content.append(f"topic read {topic}") - if 'w' in perm: - acl_content.append(f"topic write {topic}") - acl_content.append("") + if username != None and user_name != "": + acl_content.append(f"user {username}") + for i in range(len(topics)): + topic = topics[i] + perm = permissions[i] + if 'r' in perm: + acl_content.append(f"topic read {topic}") + if 'w' in perm: + acl_content.append(f"topic write {topic}") + acl_content.append("") # Write directly to the ACL file with open(MOSQUITTO_ACL_FILE, "w") as f: @@ -1333,9 +1516,12 @@ def GetBeneficiaryFromDeployment(deployment_id): sql = f"SELECT first_name, last_name FROM public.person_details WHERE user_id='{beneficiary_id}'" cur.execute(sql) result1 = cur.fetchone() - return result1[0] + " " + result1[1] + if result1 != None: + return result1[0] + " " + result1[1] + else: + return "" else: - return 0 + return "" def DeleteUserTopics(user_name): with get_db_connection() as conn: @@ -1371,8 +1557,8 @@ def GetVisibleDevices(deployments): macs_group = literal_eval(dev_group) - for mac in macs_group: - deployment_ids.append((deployment_id, mac)) + for MAC in macs_group: + deployment_ids.append((deployment_id, MAC)) else: print(f"Deployment {deployment_id} has dev_group empty") devices_details = GetDeviceDetails(cur, deployment_ids, -1) @@ -1474,9 +1660,9 @@ def UpdateACL(username, token): if len(devices) > 0: macs_list = [] for device in devices: - mac = device[2] - if mac != None: - macs_list.append(mac) + MAC = device[2] + if MAC != None: + macs_list.append(MAC) if privileges == "-1": add_user(username, p) @@ -1486,14 +1672,15 @@ def UpdateACL(username, token): add_user(username, p) #add_user(username+"_", token) - for mac in macs_list: - add_topic_permission(username, "/"+mac, "r") - #add_topic_permission(username+"_", "/"+mac, "r") + for MAC in macs_list: + add_topic_permission(username, "/"+MAC, "r") + #add_topic_permission(username+"_", "/"+MAC, "r") update_acl_file() def fetch_device_data(user_name, token): # API endpoint url = "https://eluxnetworks.net/function/well-api/api" + #url = "http://192.168.68.70/function/well-api/api" # Request headers and data headers = { @@ -1611,7 +1798,7 @@ def create_symbolic_link(source_dir_in, target_dir_in): print(f"Error creating symbolic link: {e}") return False -preload_template_path = os.path.expanduser("~/.node-red/preload_template.js") +preload_template_path = os.path.expanduser("/home/ubuntu/.node-red/preload_template.js") if not os.path.exists(preload_template_path): preload_template_content = """// CommonJS wrapper for tslib try { @@ -1633,15 +1820,15 @@ def SetupUserEnvironment(username, token, ps): if True: #Lets do it always UpdateACL(username, token) - user_dir = os.path.expanduser(f"~/.node-red/users/{username}") + user_dir = os.path.expanduser(f"/home/ubuntu/.node-red/users/{username}") os.makedirs(user_dir, exist_ok=True) # 2. Copy settings template - settings_template_file = os.path.expanduser("~/.node-red/settings_template.js") + settings_template_file = os.path.expanduser("/home/ubuntu/.node-red/settings_template.js") user_settings_file = os.path.join(user_dir, "settings.js") replace_and_save(settings_template_file, user_settings_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)]) - timeout_script_template_file = os.path.expanduser("~/.node-red/timeout-script_template.js") + timeout_script_template_file = os.path.expanduser("/home/ubuntu/.node-red/timeout-script_template.js") user_timeout_script_file = os.path.join(user_dir, "timeout-script.js") replace_and_save(timeout_script_template_file, user_timeout_script_file, [("###USER_NAME###", username),("###TOKEN###", token),("###PS###", ps)]) @@ -1684,16 +1871,16 @@ def SetupUserEnvironment(username, token, ps): json.dump(activity_data, f) - preload_template_file = os.path.expanduser("~/.node-red/preload_template.js") + preload_template_file = os.path.expanduser("/home/ubuntu/.node-red/preload_template.js") user_preload_file = os.path.join(user_dir, "preload.js") replace_and_save(preload_template_file, user_preload_file, []) # No replacements needed - shared_nodes_dir = os.path.expanduser("~/.node-red/shared_nodes") + shared_nodes_dir = os.path.expanduser("/home/ubuntu/.node-red/shared_nodes") if not os.path.exists(os.path.join(shared_nodes_dir, "node-red-contrib-wellnuo")): # Copy from the main node_modules if it exists there - src_path = os.path.expanduser("~/.node-red/node_modules/node-red-contrib-wellnuo") + src_path = os.path.expanduser("/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo") if os.path.exists(src_path): import shutil os.makedirs(shared_nodes_dir, exist_ok=True) @@ -1706,7 +1893,7 @@ def SetupUserEnvironment(username, token, ps): # 6. Create symbolic link to common Wellnuo Nodes - #success = create_symbolic_link("~/.node-red/node_modules/node-red-contrib-wellnuo", f"~/.node-red/users/{username}/node_modules") + #success = create_symbolic_link("/home/ubuntu/.node-red/node_modules/node-red-contrib-wellnuo", f"/home/ubuntu/.node-red/users/{username}/node_modules") #if success: #print("Link creation completed successfully.") #else: @@ -2017,7 +2204,7 @@ def combine_user_flows(user_directories, output_dir='/home/ubuntu/.node-red', mqtt_username = re.search(r'MQTT_USERNAME\s*[=:]\s*["\']([^"\']+)["\']', config_content) mqtt_password = re.search(r'MQTT_PASSWORD\s*[=:]\s*["\']([^"\']+)["\']', config_content) mqtt_server = re.search(r'MQTT_SERVER\s*[=:]\s*["\']([^"\']+)["\']', config_content) - mqtt_port = re.search(r'MQTT_PORT\s*[=:]\s*["\']?(\d+)["\']?', config_content) + mqtt_port_nr = re.search(r'MQTT_PORT\s*[=:]\s*["\']?(\d+)["\']?', config_content) if mqtt_username: wellnuo_config['MQTT_USERNAME'] = mqtt_username.group(1) @@ -2025,8 +2212,8 @@ def combine_user_flows(user_directories, output_dir='/home/ubuntu/.node-red', wellnuo_config['MQTT_PASSWORD'] = mqtt_password.group(1) if mqtt_server: wellnuo_config['MQTT_SERVER'] = mqtt_server.group(1) - if mqtt_port: - wellnuo_config['MQTT_PORT'] = int(mqtt_port.group(1)) + if mqtt_port_nr: + wellnuo_config['MQTT_PORT'] = int(mqtt_port_nr.group(1)) # Extract deployment data using a different approach deploy_start = config_content.find('deploymentData') @@ -2434,8 +2621,10 @@ def combine_user_flows(user_directories, output_dir='/home/ubuntu/.node-red', combined_wellnuo_config = "// Combined WellNuo configuration\n\n" combined_wellnuo_config += "module.exports = {\n" combined_wellnuo_config += " // Default MQTT configuration\n" - combined_wellnuo_config += " MQTT_SERVER: \"mqtt.eluxnetworks.net\",\n" - combined_wellnuo_config += " MQTT_PORT: 8883,\n" + #combined_wellnuo_config += " MQTT_SERVER: \"mqtt.eluxnetworks.net\",\n" + #combined_wellnuo_config += " MQTT_PORT: 8883,\n" + combined_wellnuo_config += " MQTT_SERVER: \"127.0.0.1\",\n" + combined_wellnuo_config += " MQTT_PORT: 1883,\n" #combined_wellnuo_config += " MQTT_USERNAME: \"artib\",\n" #combined_wellnuo_config += " MQTT_PASSWORD: \"well_arti_2027\",\n\n" combined_wellnuo_config += " // User credentials mapping - used by custom node handler\n" @@ -2777,8 +2966,10 @@ def restart_node_red_service(service_name="node-red.service"): def task_a(arg): + print(f"task_a {arg}") + def task_b(arg): print(f"task_b {arg}") @@ -2851,7 +3042,7 @@ class EmailSender: except: pass - def _send_single_email(self, to_email, text_content, subject): + def _send_single_email(self, to_email, html_content, subject): """ Send a single email using Gmail SMTP. @@ -2867,7 +3058,15 @@ class EmailSender: msg['Subject'] = subject # Attach the text content - msg.attach(MIMEText(text_content, 'plain')) + + #plain_text = "Please view this email in an HTML-compatible client to see its full content." + #part1 = MIMEText(plain_text, 'plain') + + # 2. Create the HTML part + part2 = MIMEText(html_content, 'html') + + #msg.attach(part1) + msg.attach(part2) print("Connecting to SMTP server...") server = smtplib.SMTP_SSL('smtp.gmail.com', 465) @@ -2877,7 +3076,7 @@ class EmailSender: server.login(self.gmail_user, self.gmail_password) print("Login successful") - print("Sending message...") + print("Sending email...") server.send_message(msg) print("Message sent") @@ -2900,10 +3099,39 @@ class EmailSender: if email_address[0] != "_" and email_address[0] != "-": self.email_queue.put((email_address, text_str, subject)) +def GetMQTTid(user_id): + with get_db_connection() as conn: + with conn.cursor() as cur: + sqlr = f"SELECT mqtt_id from public.mobile_clients WHERE user_id ={user_id}" + mqtt_id = ReadCleanStringDB(cur, sqlr) + return mqtt_id def SendMessageTo(user_id, text_str): + #we need mqtt_id for user print(user_id, text_str) + mqtt_id = GetMQTTid(user_id) + mqtt_message_str = f""" + {{ + "Command": "REPORT","body":"{text_str}","reflected":"","language":"","time":{time.time() *1000} + }} + """ + print(mqtt_message_str) + + if mqtt_id != "": + MQSendM(mqtt_id, mqtt_message_str) + +def SendMessageDirectTo(topic, text_str): + + mqtt_message_str = f""" + {{ + "Command": "REPORT","body":"{text_str}","reflected":"","language":"","time":{time.time() *1000} + }} + """ + #print(mqtt_message_str) + + if topic != "": + MQSend_worker(topic, mqtt_message_str, True)#(topic, mqtt_message_str, True) def normalize_phone_number(phone_number_str: str) -> str: if not phone_number_str: return "" @@ -3307,6 +3535,7 @@ def SetupTasks(): global local_daily_minute_last print("SetupTasks") + stt = time.time() #This will determine current state of the Z-graf based warnings/alarms, and setup future triggers #This warnings/alarms that are sensor based need to be done in @@ -3445,7 +3674,7 @@ def SetupTasks(): #lets check alarms first, because if satisfied, no need to check for warning if enabled_alarms_str[BitIndex(1)] == "1" and armed_states[BitIndex(1)] == "1": #Too long present alarm if enabled and not already triggerred (triggered = 0)! if device_id == smallest[1]: #now present... how long? - if (second_smallest[0] - smallest[0]) > device_alarm_settings["stuck_minutes_alarm"] * 60: + if (second_smallest[0] - smallest[0]) > float(device_alarm_settings["stuck_minutes_alarm"]) * 60: #cancel alarm and warning, until re-armed armed_states = set_character(armed_states, 0, "0") armed_states = set_character(armed_states, 1, "0") @@ -3462,12 +3691,12 @@ def SetupTasks(): device_alerts_all[device_id] = device_alarm_settings StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: - check_after = device_alarm_settings["stuck_minutes_alarm"] - int((second_smallest[0] - smallest[0])/60) + check_after = float(device_alarm_settings["stuck_minutes_alarm"]) - int((second_smallest[0] - smallest[0])/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after elif enabled_alarms_str[BitIndex(0)] == "1" and armed_states[BitIndex(0)] == "1": #Too long present warning - if (second_smallest[0] - smallest[0]) > device_alarm_settings["stuck_minutes_warning"] * 60: + if (second_smallest[0] - smallest[0]) > float(device_alarm_settings["stuck_minutes_warning"]) * 60: #cancel warning, until re-armed armed_states = set_character(armed_states, 0, "0") dev_det = devices_details_map[device_id] @@ -3483,13 +3712,13 @@ def SetupTasks(): device_alerts_all[device_id] = device_alarm_settings StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: - check_after = device_alarm_settings["stuck_minutes_warning"] - int((second_smallest[0] - smallest[0])/60) + check_after = float(device_alarm_settings["stuck_minutes_warning"]) - int((second_smallest[0] - smallest[0])/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after if enabled_alarms_str[BitIndex(3)] == "1" and armed_states[BitIndex(3)] == "1": #Too long absent alarm - if last_seen_ago > device_alarm_settings["absent_minutes_alarm"] * 60: + if abs(last_seen_ago) > abs(float(device_alarm_settings["absent_minutes_alarm"])) * 60: armed_states = set_character(armed_states, 3, "0") armed_states = set_character(armed_states, 2, "0") @@ -3505,12 +3734,12 @@ def SetupTasks(): StoreLastSentToRedis(deployment_id) StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: - check_after = device_alarm_settings["absent_minutes_alarm"] - int(last_seen_ago/60) + check_after = abs(float(device_alarm_settings["absent_minutes_alarm"])) - int(last_seen_ago/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after if enabled_alarms_str[BitIndex(2)] == "1" and armed_states[BitIndex(2)] == "1": #Too long absent alarm - if last_seen_ago > device_alarm_settings["absent_minutes_warning"] * 60: + if abs(last_seen_ago) > abs(float(device_alarm_settings["absent_minutes_warning"])) * 60: armed_states = set_character(armed_states, 2, "0") dev_det = devices_details_map[device_id] @@ -3526,7 +3755,7 @@ def SetupTasks(): StoreLastSentToRedis(deployment_id) StoreDeviceAlarmsToRedis(device_id, device_alarm_settings) else: - check_after = device_alarm_settings["absent_minutes_warning"] - int(last_seen_ago/60) + check_after = abs(float(device_alarm_settings["absent_minutes_warning"])) - int(last_seen_ago/60) if check_after < next_run_in_minutes: next_run_in_minutes = check_after @@ -3547,269 +3776,572 @@ def SetupTasks(): if next_run_in_minutes > 1: next_run_in_minutes = 1 - schedule_task(SetupTasks, "", next_run_in_minutes) + + try: + schedule_task(SetupTasks, "", next_run_in_minutes) + except OSError as e: + logger.warning(f"Could not SetupTasks: {e}") + print(time.time()-stt) + +def create_device_row_html(device_id): + """Generates the HTML for a single device row.""" + options = [ + "?", "Office", "Hallway", "Garage", "Outside", "Conference Room", "Room", + "Kitchen", "Bedroom", "Living Room", "Bathroom", "Dining Room", + "Bathroom Main", "Bathroom Guest", "Bedroom Master", "Bedroom Guest", + "Basement", "Attic", "Other" + ] + + location_combobox = f'' + + description_textbox = f'' + + return f""" +
+
Device #{device_id}
+
+ {location_combobox} + {description_textbox} +
+
+ """ + +def populate_welcome_email(template_content, user_data): + """ + Populates the welcome email template with user-specific data. + + Args: + template_path (str): The file path to the HTML email template. + user_data (dict): A dictionary containing user information: + 'first_name', 'last_name', 'user_name', + 'password', 'devices' (a list of device IDs). + + Returns: + str: The populated HTML content of the email. + """ + + # Use str.replace() for each placeholder to avoid conflicts with CSS/JS + content = template_content.replace('{first_name}', user_data.get("first_name", "")) + content = content.replace('{last_name}', user_data.get("last_name", "")) + content = content.replace('{user_name}', user_data.get("user_name", "")) + content = content.replace('{password}', user_data.get("password", "")) + content = content.replace('{signature}', user_data.get("signature", "")) + + return content + + +def read_file(file_name, source = "LOCAL", type_ = "TEXT", bucket_name="daily-maps"): + + blob_data = "" + if source == "MINIO": + #blob_data = ReadObjectMinIO(bucket_name, file_name) + pass + elif source == "LOCAL": + login_file = os.path.join(filesDir, file_name) + login_file = login_file.replace("\\","/") + logger.debug(f"Full file path: {login_file}") + logger.debug(f"File exists: {os.path.exists(login_file)}") + #print(login_file) + if type_ == "TEXT": + with open(login_file, encoding="utf8") as f: + blob_data = f.read() + else: + with open(login_file, 'rb') as f: + blob_data = f.read() + + elif source == "AZURE": + try: + blob_data = ""#container_client.download_blob(file_name).readall() + except Exception as err: + logger.error("Not reading Azure blob "+str(err)) + blob_data = "" + return blob_data + else: + pass + return blob_data + # --- Main Monitoring Loop --- def monitor_devices(): """Main monitoring loop that checks devices periodically.""" logger.info("Starting monitoring loop...") while not stop_event.is_set(): - start_time = time.monotonic() - #logger.info("Starting monitoring cycle...") + try: # Add this outer try block + start_time = time.monotonic() + #current_time_unix = time.time() - if not devices_config: - logger.warning("No device configurations loaded, skipping monitoring cycle.") - success = load_device_configurations() # Try reloading if empty - if not success: - # Wait longer before retrying if config load fails - stop_event.wait(CHECK_INTERVAL_SECONDS * 5) - continue # Skip to next cycle attempt + #======================== Do node-red instances management ================================================== - current_time_unix = time.time() + hash_data = GetRedisMap('node-red_deployed') + #logger.debug(f"node_red_requests: {hash_data}") + requests_count = 0 + if hash_data != {}: + requests_count = int(hash_data['requests']) + if requests_count > 0: + logger.debug(f"node-red_deployed: {str(hash_data)}") + user_name = hash_data['user_name'] + #delete request #this might need switching to queue... todo + redis_conn.hset('node-red_deployed', mapping={ + 'user_name': user_name, + 'token': "", + 'time': "", + 'requests': 0 + }) + + + sync_success = False + changes_were_made = False # Flag to track if sync modified files + signal_file = Path("/tmp/node-red-sync-changes.signal") # Define path again + + try: + # Use sys.executable to ensure using the same python interpreter + print("Starting node synchronization...") + sync_script_path = "/home/ubuntu/.node-red/sync_nodes.py" + # Add error checking + result = subprocess.run( + [sys.executable, sync_script_path], + check=True, # Raise CalledProcessError if script exits with non-zero code + capture_output=True, + text=True, + timeout=600 # Add a generous timeout (10 minutes?) for npm operations + ) + print("Node synchronization script output:\n", result.stdout) + + if result.stderr: print("Node synchronization script stderr:\n", result.stderr) + print("Node synchronization completed successfully.") + sync_success = True + if signal_file.exists(): + changes_were_made = True + logger.info("Sync script indicated changes were made (signal file found).") + try: + signal_file.unlink() # Consume the signal + logger.debug("Removed signal file.") + except OSError as e: + logger.warning(f"Could not remove signal file {signal_file}: {e}") + else: + logger.info("Sync script indicated no changes were made (signal file not found).") + print("No node changes detected by sync script.") - #======================== Do node-red instances management ================================================== - hash_data = GetRedisMap('node_red_requests') - #logger.debug(f"node_red_requests: {hash_data}") - requests_count = 0 - if hash_data != {}: - requests_count = int(hash_data['requests']) - if requests_count > 0: - logger.debug(f"node_red_requests: {str(hash_data)}") - user_name = hash_data['user_name'] - #user needs Node-red. Is his session up and running? - port, pid = IsNRRunning(user_name) - #delete request #this might need switching to queue... todo - redis_conn.hset('node_red_requests', mapping={ - 'user_name': user_name, - 'token': "", - 'time': "", - 'requests': 0 - }) + except FileNotFoundError: + print(f"ERROR: Sync script not found at {sync_script_path}") + # Decide how to handle: exit, skip deployment, etc. + #return # Or raise an exception + except subprocess.CalledProcessError as e: + print(f"ERROR: Node synchronization script failed with exit code {e.returncode}") + print(f"Stderr:\n{e.stderr}") + print(f"Stdout:\n{e.stdout}") + # Decide how to handle: exit, skip deployment, etc. + #return # Or raise an exception + except subprocess.TimeoutExpired: + print(f"ERROR: Node synchronization script timed out.") + # Decide how to handle + #return # Or raise an exception + except Exception as e_sync: + print(f"ERROR: An unexpected error occurred while running sync script: {e_sync}") + #return # Or raise - if port == 0 or pid == 0: #not running - token = hash_data['token'] + # --- 2. Restart Node-RED Service IF Sync Succeeded AND Changes Were Made --- + restart_successful = True # Default to true if no restart needed + if sync_success and changes_were_made: # Only restart if sync OK AND changes happened + print("Node changes detected, restarting Node-RED service...") + restart_successful = restart_node_red_service("node-red.service") + if not restart_successful: + print("ERROR: Node-RED service failed to restart properly after changes. Aborting deployment.") + return # Exit processing this request + elif sync_success and not changes_were_made: + print("No node changes required restart.") + else: + # Sync failed, don't restart + print("Skipping Node-RED restart due to sync script failure.") + + + # --- 3. Combine and Deploy Flows (only if sync didn't fail AND restart was ok/not needed) --- + if sync_success and restart_successful: # Proceed if sync ran ok and restart (if needed) was ok + print("Proceeding with flow combination and deployment...") + # ... (rest of combine and deploy logic) ... + else: + print("Skipping flow deployment due to previous errors during sync or restart.") + + # Get list of user directories + base_dir = "/home/ubuntu/.node-red/users" + user_dirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir) + if os.path.isdir(os.path.join(base_dir, d))] + + # Combine flows + output_dir = "/home/ubuntu/.node-red/main_instance" + flow_file = combine_user_flows(user_dirs, output_dir) + + # Deploy combined flow + creds_file = os.path.join(output_dir, "common_flow_cred.json") + success = deploy_combined_flow( + flow_file, + node_red_url="https://eluxnetworks.net:1999", + credentials_file=creds_file, + admin_user=ADMIN_USER, + admin_password=ADMIN_PASSWORD + ) + + #print(f"Combined flow created at: {combined_file}") + + + + + #lets check if any node-reds are stale + pattern = 'node_red_status_*' + threshold = time.time() - 10 + + # Find all matching keys + matching_keys = [] + cursor = 0 + + while True: + cursor, keys = redis_conn.scan(cursor=cursor, match=pattern, count=100) + + for key in keys: + key_str = key.decode('utf-8') # Convert bytes to string + + # Get the last_activity value + last_activity = redis_conn.hget(key_str, 'last_activity').decode('utf-8') + pid = 0 + try: + pid = int(redis_conn.hget(key_str, 'pid')) + if pid > 0: + if last_activity: + last_activity_value = int(float(last_activity)) + + # Check if it's below the threshold + print(last_activity_value-threshold) + if last_activity_value < threshold: + #if True: + if key_str not in permament_users: + matching_keys.append(key_str) + except: + pass + + # Exit the loop when scan is complete + if cursor == 0: + break + + # Print the result + if len(matching_keys) > 0: + + print(f"Found {len(matching_keys)} keys with last_activity < {threshold}:") + for key in matching_keys: + print(key) + user_name = key[len("node_red_status_"):] time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + port, pid = GetNodeRedDetails(user_name) - port = GetAvailablePort(user_name) - - users_p = GetUserPriviledges(user_name) - ps = users_p[0][1] - - SetupUserEnvironment(user_name, token, ps) - - pid = StartNodeRed(port, user_name) + result = StopNodeRed(pid) #Lets store it to postgres and REDIS - StoreNodeRedUserPortOn(port, user_name, pid, time_s) + StoreNodeRedUserPortOff(port, user_name, time_s) redis_conn.hset(f'node_red_status_{user_name}', mapping={ - 'port': port, + 'port': 0, 'started_time': time_s, 'last_activity': time_s, - 'pid': pid, - }) - else: - #time_s = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - time_s = time.time() - redis_conn.hset(f'node_red_status_{user_name}', mapping={ - 'port': port, - 'started_time': time_s, - 'last_activity': time.time(), - 'pid': pid + 'pid': 0, }) + print(result) + # --- Cycle Cleanup & Wait --- + elapsed_time = time.monotonic() - start_time + #logger.info(f"Monitoring cycle finished in {elapsed_time:.2f} seconds.") - hash_data = GetRedisMap('node-red_deployed') - #logger.debug(f"node_red_requests: {hash_data}") - requests_count = 0 - if hash_data != {}: - requests_count = int(hash_data['requests']) - if requests_count > 0: - logger.debug(f"node-red_deployed: {str(hash_data)}") - user_name = hash_data['user_name'] - #delete request #this might need switching to queue... todo - redis_conn.hset('node-red_deployed', mapping={ - 'user_name': user_name, - 'token': "", - 'time': "", - 'requests': 0 - }) + wait_time = CHECK_INTERVAL_SECONDS - elapsed_time + if wait_time > 0: + #logger.debug(f"Waiting for {wait_time:.2f} seconds before next cycle.") + # Use stop_event.wait() for graceful shutdown during sleep + stop_event.wait(wait_time) + else: + logger.warning("Monitoring cycle took longer than the configured interval.") - - sync_success = False - changes_were_made = False # Flag to track if sync modified files - signal_file = Path("/tmp/node-red-sync-changes.signal") # Define path again - - try: - # Use sys.executable to ensure using the same python interpreter - print("Starting node synchronization...") - sync_script_path = "/home/ubuntu/.node-red/sync_nodes.py" - # Add error checking - result = subprocess.run( - [sys.executable, sync_script_path], - check=True, # Raise CalledProcessError if script exits with non-zero code - capture_output=True, - text=True, - timeout=600 # Add a generous timeout (10 minutes?) for npm operations - ) - print("Node synchronization script output:\n", result.stdout) - - if result.stderr: print("Node synchronization script stderr:\n", result.stderr) - print("Node synchronization completed successfully.") - sync_success = True - if signal_file.exists(): - changes_were_made = True - logger.info("Sync script indicated changes were made (signal file found).") - try: - signal_file.unlink() # Consume the signal - logger.debug("Removed signal file.") - except OSError as e: - logger.warning(f"Could not remove signal file {signal_file}: {e}") - else: - logger.info("Sync script indicated no changes were made (signal file not found).") - print("No node changes detected by sync script.") - - - - except FileNotFoundError: - print(f"ERROR: Sync script not found at {sync_script_path}") - # Decide how to handle: exit, skip deployment, etc. - #return # Or raise an exception - except subprocess.CalledProcessError as e: - print(f"ERROR: Node synchronization script failed with exit code {e.returncode}") - print(f"Stderr:\n{e.stderr}") - print(f"Stdout:\n{e.stdout}") - # Decide how to handle: exit, skip deployment, etc. - #return # Or raise an exception - except subprocess.TimeoutExpired: - print(f"ERROR: Node synchronization script timed out.") - # Decide how to handle - #return # Or raise an exception - except Exception as e_sync: - print(f"ERROR: An unexpected error occurred while running sync script: {e_sync}") - #return # Or raise - - - # --- 2. Restart Node-RED Service IF Sync Succeeded AND Changes Were Made --- - restart_successful = True # Default to true if no restart needed - if sync_success and changes_were_made: # Only restart if sync OK AND changes happened - print("Node changes detected, restarting Node-RED service...") - restart_successful = restart_node_red_service("node-red.service") - if not restart_successful: - print("ERROR: Node-RED service failed to restart properly after changes. Aborting deployment.") - return # Exit processing this request - elif sync_success and not changes_were_made: - print("No node changes required restart.") - else: - # Sync failed, don't restart - print("Skipping Node-RED restart due to sync script failure.") - - - # --- 3. Combine and Deploy Flows (only if sync didn't fail AND restart was ok/not needed) --- - if sync_success and restart_successful: # Proceed if sync ran ok and restart (if needed) was ok - print("Proceeding with flow combination and deployment...") - # ... (rest of combine and deploy logic) ... - else: - print("Skipping flow deployment due to previous errors during sync or restart.") - - # Get list of user directories - base_dir = "/home/ubuntu/.node-red/users" - user_dirs = [os.path.join(base_dir, d) for d in os.listdir(base_dir) - if os.path.isdir(os.path.join(base_dir, d))] - - # Combine flows - output_dir = "/home/ubuntu/.node-red/main_instance" - flow_file = combine_user_flows(user_dirs, output_dir) - - # Deploy combined flow - creds_file = os.path.join(output_dir, "common_flow_cred.json") - success = deploy_combined_flow( - flow_file, - node_red_url="https://eluxnetworks.net:1999", - credentials_file=creds_file, - admin_user=ADMIN_USER, - admin_password=ADMIN_PASSWORD - ) - - #print(f"Combined flow created at: {combined_file}") - - - - - #lets check if any node-reds are stale - pattern = 'node_red_status_*' - threshold = time.time() - 10 - - # Find all matching keys - matching_keys = [] - cursor = 0 - - while True: - cursor, keys = redis_conn.scan(cursor=cursor, match=pattern, count=100) - - for key in keys: - key_str = key.decode('utf-8') # Convert bytes to string - - # Get the last_activity value - last_activity = redis_conn.hget(key_str, 'last_activity').decode('utf-8') - pid = 0 - try: - pid = int(redis_conn.hget(key_str, 'pid')) - if pid > 0: - if last_activity: - last_activity_value = int(float(last_activity)) - - # Check if it's below the threshold - print(last_activity_value-threshold) - if last_activity_value < threshold: - #if True: - if key_str not in permament_users: - matching_keys.append(key_str) - except: - pass - - # Exit the loop when scan is complete - if cursor == 0: - break - - # Print the result - if len(matching_keys) > 0: - - print(f"Found {len(matching_keys)} keys with last_activity < {threshold}:") - for key in matching_keys: - print(key) - user_name = key[len("node_red_status_"):] - time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") - port, pid = GetNodeRedDetails(user_name) - - result = StopNodeRed(pid) - #Lets store it to postgres and REDIS - StoreNodeRedUserPortOff(port, user_name, time_s) - redis_conn.hset(f'node_red_status_{user_name}', mapping={ - 'port': 0, - 'started_time': time_s, - 'last_activity': time_s, - 'pid': 0, - }) - print(result) - - # --- Cycle Cleanup & Wait --- - elapsed_time = time.monotonic() - start_time - #logger.info(f"Monitoring cycle finished in {elapsed_time:.2f} seconds.") - - wait_time = CHECK_INTERVAL_SECONDS - elapsed_time - if wait_time > 0: - #logger.debug(f"Waiting for {wait_time:.2f} seconds before next cycle.") - # Use stop_event.wait() for graceful shutdown during sleep - stop_event.wait(wait_time) - else: - logger.warning("Monitoring cycle took longer than the configured interval.") + except Exception as e: + logger.exception(f"Critical error in monitor_devices outer loop: {e}") + time.sleep(10) # Wait before retrying the whole loop logger.info("Monitoring loop stopped.") +def monitor_messages(): + """Main monitoring loop that checks devices periodically.""" + logger.info("Starting monitoring loop...") + + last_id = '0' + try: + + while not stop_event.is_set(): + try: # Add this outer try block + + #======================== Process messaging requests ======================================================== + + # Read from stream + #logger.debug("About to read from Redis stream...") + messages = redis_conn.xread({'messaging_requests_stream': last_id}) + #logger.debug(f"Redis read successful, got {len(messages) if messages else 0} messages") + #print(".", end="") + if messages: + call_acl = False + for stream, msgs in messages: + for msg_id, fields in msgs: + try: + logger.debug(f"Processing messaging request: {str(fields)}") + function = fields[b'function'].decode('utf-8') + if function == "new_caretaker": + + email = fields[b'email'].decode('utf-8') + user_name = fields[b'user_name'].decode('utf-8') + first_name = fields[b'first_name'].decode('utf-8') + last_name = fields[b'last_name'].decode('utf-8') + phone_number = fields[b'phone_number'].decode('utf-8') + password = fields[b'password'].decode('utf-8') + #devices = fields[b'devices'].decode('utf-8') + signature = fields[b'signature'].decode('utf-8') + + #user_name = hash_data['user_name'] + #first_name = hash_data['first_name'] + #last_name = hash_data['last_name'] + #phone_number = hash_data['phone_number'] + #password = hash_data['password'] + #devices = hash_data['devices'] + #signature = hash_data['signature'] + + #requests = hash_data['requests'] + template_html = read_file("welcome_template_short.html") + #devices_lst = devices.split(",") + new_user = { + "first_name": first_name, + "last_name": last_name, + "user_name": user_name, + "phone_number": phone_number, + "signature": signature, + "password": password + } + email_body = populate_welcome_email(template_html, new_user) + + # 4. Save the populated email to a new HTML file to preview it + if "Error" not in email_body: + with open('populated_welcome_email.html', 'w') as file: + file.write(email_body) + print("Successfully generated 'populated_welcome_email.html'.") + # In a real application, you would use a library like smtplib to send this 'email_body' + + + text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email + #SendMessageTo(user_id, text_str) + #need to send it to device via signature, since, if device is new, not MQTT_id is known. + topic = "/well_" + signature + SendMessageDirectTo(topic, text_str) + sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") + call_acl = True + else: + print(email_body) + + redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later + last_id = msg_id + + elif function == "new_beneficiary": + email = fields[b'email'].decode('utf-8') + user_name = fields[b'user_name'].decode('utf-8') + first_name = fields[b'first_name'].decode('utf-8') + last_name = fields[b'last_name'].decode('utf-8') + phone_number = fields[b'phone_number'].decode('utf-8') + password = fields[b'password'].decode('utf-8') + #devices = fields[b'devices'].decode('utf-8') + signature = fields[b'signature'].decode('utf-8') + + #user_name = hash_data['user_name'] + #first_name = hash_data['first_name'] + #last_name = hash_data['last_name'] + #phone_number = hash_data['phone_number'] + #password = hash_data['password'] + #devices = hash_data['devices'] + #signature = hash_data['signature'] + + #requests = hash_data['requests'] + template_html = read_file("new_beneficiary_template_short.html") + #devices_lst = devices.split(",") + new_user = { + "first_name": first_name, + "last_name": last_name, + "user_name": user_name, + "phone_number": phone_number, + "signature": signature, + "password": password + } + email_body = populate_welcome_email(template_html, new_user) + + # 4. Save the populated email to a new HTML file to preview it + if "Error" not in email_body: + with open('populated_welcome_email.html', 'w') as file: + file.write(email_body) + print("Successfully generated 'populated_welcome_email.html'.") + # In a real application, you would use a library like smtplib to send this 'email_body' + + + text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email + #SendMessageTo(user_id, text_str) + #need to send it to device via signature, since, if device is new, not MQTT_id is known. + topic = "/well_" + signature + SendMessageDirectTo(topic, text_str) + sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") + call_acl = True + else: + print(email_body) + + redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later + last_id = msg_id + elif function == "credentials_updated": + email = fields[b'email'].decode('utf-8') + user_name = fields[b'user_name'].decode('utf-8') + first_name = fields[b'first_name'].decode('utf-8') + last_name = fields[b'last_name'].decode('utf-8') + phone_number = fields[b'phone_number'].decode('utf-8') + password = fields[b'password'].decode('utf-8') + #devices = fields[b'devices'].decode('utf-8') + signature = fields[b'signature'].decode('utf-8') + + #user_name = hash_data['user_name'] + #first_name = hash_data['first_name'] + #last_name = hash_data['last_name'] + #phone_number = hash_data['phone_number'] + #password = hash_data['password'] + #devices = hash_data['devices'] + #signature = hash_data['signature'] + + #requests = hash_data['requests'] + template_html = read_file("new_credentials_template_short.html") + #devices_lst = devices.split(",") + new_user = { + "first_name": first_name, + "last_name": last_name, + "user_name": user_name, + "phone_number": phone_number, + "signature": signature, + "password": password + } + email_body = populate_welcome_email(template_html, new_user) + + # 4. Save the populated email to a new HTML file to preview it + if "Error" not in email_body: + with open('populated_welcome_email.html', 'w') as file: + file.write(email_body) + print("Successfully generated 'populated_welcome_email.html'.") + # In a real application, you would use a library like smtplib to send this 'email_body' + + + text_str = "CREDS," + first_name + "," + last_name + "," + user_name + "," + password + "," + email + #SendMessageTo(user_id, text_str) + #need to send it to device via signature, since, if device is new, not MQTT_id is known. + topic = "/well_" + signature + SendMessageDirectTo(topic, text_str) + sender.SendEmailTo(email, email_body, "Wellcome to WellNuo!") + call_acl = True + else: + print(email_body) + + redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later + last_id = msg_id + elif function == "update_acl": + call_acl = True + redis_conn.xdel('messaging_requests_stream', msg_id) #uncomment later + last_id = msg_id + except Exception as e: + print(f"Error in Processing messaging request: {e}") + + #call acl_manager.py from /home/ubuntu/mqtt-auth-service. New user was added in well-api + if call_acl: + try: + result = subprocess.run( + [sys.executable, '/home/ubuntu/mqtt-auth-service/acl_manager.py'], + capture_output=True, + text=True, + timeout=60 # Add timeout to prevent hanging + ) + + if result.returncode == 0: + print("ACL manager executed successfully") + print("Output:", result.stdout) + else: + print("ACL manager failed with return code:", result.returncode) + print("Error:", result.stderr) + + except subprocess.TimeoutExpired: + print("ACL manager execution timed out") + except Exception as e: + print(f"Error executing ACL manager: {e}") + + time.sleep(0.1) + + + hash_data = GetRedisMap('node_red_requests') + #logger.debug(f"node_red_requests: {hash_data}") + requests_count = 0 + if hash_data != {}: + requests_count = int(hash_data['requests']) + if requests_count > 0: + logger.debug(f"node_red_requests: {str(hash_data)}") + user_name = hash_data['user_name'] + #user needs Node-red. Is his session up and running? + port, pid = IsNRRunning(user_name) + #delete request #this might need switching to queue... todo + redis_conn.hset('node_red_requests', mapping={ + 'user_name': user_name, + 'token': "", + 'time': "", + 'requests': 0 + }) + + if port == 0 or pid == 0: #not running + token = hash_data['token'] + + time_s = time.time()#datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + + port = GetAvailablePort(user_name) + + users_p = GetUserPriviledges(user_name) + ps = users_p[0][1] + + SetupUserEnvironment(user_name, token, ps) + + pid = StartNodeRed(port, user_name) + #Lets store it to postgres and REDIS + StoreNodeRedUserPortOn(port, user_name, pid, time_s) + redis_conn.hset(f'node_red_status_{user_name}', mapping={ + 'port': port, + 'started_time': time_s, + 'last_activity': time_s, + 'pid': pid, + }) + else: + #time_s = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") + time_s = time.time() + redis_conn.hset(f'node_red_status_{user_name}', mapping={ + 'port': port, + 'started_time': time_s, + 'last_activity': time.time(), + 'pid': pid + }) + + except redis.RedisError as e: + logger.error(f"Redis error in message monitoring: {e}") + time.sleep(5) # Wait before retry + except Exception as e: + logger.error(f"Unexpected error in message processing: {e}") + time.sleep(1) + + except Exception as e: + logger.exception(f"Critical error in monitor_messages: {e}") + finally: + logger.info("monitor_messages loop stopped.") + + # --- Signal Handling --- @@ -3817,8 +4349,8 @@ def signal_handler(signum, frame): """Handles termination signals gracefully.""" logger.warning(f"Received signal {signal.Signals(signum).name}. Initiating graceful shutdown...") stop_event.set() - t1.cancel() - t2.cancel() + t1.stop() + t2.stop() for job_id in job_registry: function_name = job_registry[job_id]['function_name'] @@ -3895,9 +4427,10 @@ def ProcessQueue(): while in_queue: try: tim, topic, messagein = in_queue.pop(0) - mac = topic[1:] - if mac in mac_to_device_id: - device_id = mac_to_device_id[mac] + #print(".", end="") + MAC = topic[1:] + if MAC in mac_to_device_id: + device_id = mac_to_device_id[MAC] deployment_id = device_to_deployment[device_id] if deployment_id in alarms_settings_all: alarms_settings = alarms_settings_all[deployment_id] @@ -3944,7 +4477,9 @@ def ProcessQueue(): "temperature_low_alarm":"50", "temperature_low_alarm_method_7":"PHONE", "radar_alarm_method_8":"MSG", + "pressure_threshold":"15", "pressure_alarm_method_9":"MSG", + "light_threshold":"15", "light_alarm_method_10":"MSG", "smell_alarm_method_11":"EMAIL", "rearm_policy":"At midnight" @@ -4030,6 +4565,8 @@ def ProcessQueue(): if "radar" in message_dict: enabled_alarms_str = device_alarm_settings["enabled_alarms"] armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered + #if device_id == 534: + # print("Here") if enabled_alarms_str[BitIndex(8)] == "1" and armed_states[BitIndex(8)] == "1": radar = message_dict["radar"] radar_threshold_str = GetRedisString(f"radar_threshold{device_id}") @@ -4044,12 +4581,58 @@ def ProcessQueue(): location = (dev_det[4] + " " + dev_det[5].strip()).strip() method = device_alarm_settings["radar_alarm_method_8"] first_last_name = GetBeneficiaryFromDeployment(deployment_id) - SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm is triggered in the {location}", "", "") + SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected presence in the {location}", "", "") device_alarm_settings["armed_states"] = armed_states device_alerts_all[device_id] = device_alarm_settings StoreAllToRedisAndDB(deployment_id, device_id) + if "pressure" in message_dict: + bit_nr = 9 + enabled_alarms_str = device_alarm_settings["enabled_alarms"] + armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered + if enabled_alarms_str[BitIndex(bit_nr)] == "1" and armed_states[BitIndex(bit_nr)] == "1": + if message_dict["mtype"] == 1: + + pressure = message_dict["pressure"] + pressure_threshold = float(device_alarm_settings["pressure_threshold"]) #GetRedisString(f"radar_threshold{device_id}") + print(device_id, pressure, pressure_threshold) + if pressure > pressure_threshold: + + #cancel alarm for this room, until re-armed + armed_states = set_character(armed_states, bit_nr, "0") + dev_det = devices_details_map[device_id] + location = (dev_det[4] + " " + dev_det[5].strip()).strip() + method = device_alarm_settings["pressure_alarm_method_9"] + first_last_name = GetBeneficiaryFromDeployment(deployment_id) + SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected doors opening in the {location}", "", "") + device_alarm_settings["armed_states"] = armed_states + device_alerts_all[device_id] = device_alarm_settings + StoreAllToRedisAndDB(deployment_id, device_id) + + if "light" in message_dict: + bit_nr = 10 + enabled_alarms_str = device_alarm_settings["enabled_alarms"] + armed_states = device_alarm_settings["armed_states"] #1=armed 0=triggered + if enabled_alarms_str[BitIndex(bit_nr)] == "1" and armed_states[BitIndex(bit_nr)] == "1": + if message_dict["mtype"] == 4: + + light = message_dict["light"] + light_threshold = float(device_alarm_settings["light_threshold"]) #GetRedisString(f"radar_threshold{device_id}") + print(device_id, light, light_threshold) + if light > light_threshold: + + #cancel alarm for this room, until re-armed + armed_states = set_character(armed_states, bit_nr, "0") + dev_det = devices_details_map[device_id] + location = (dev_det[4] + " " + dev_det[5].strip()).strip() + method = device_alarm_settings["light_alarm_method_10"] + first_last_name = GetBeneficiaryFromDeployment(deployment_id) + SendAlerts(deployment_id, method, f"At {first_last_name} burglar alarm detected light change in the {location}", "", "") + device_alarm_settings["armed_states"] = armed_states + device_alerts_all[device_id] = device_alarm_settings + StoreAllToRedisAndDB(deployment_id, device_id) + else: pass #alarm not setup for this device @@ -4061,8 +4644,58 @@ def ProcessQueue(): except Exception as e: logger.error(f"Error: {str(e)} {traceback.format_exc()}") + +def ProcessQueueM(): + #global queue + global in_queueM, PROCESSED_COUNTER, in_packets_count + #print(".") + if in_queueM: + #queue=qinfo.Open(2,0) # Open a ref to queue + #msgm=win32com.client.Dispatch("MSMQ.MSMQMessage") + + while in_queueM: + #if True: + try: + topic, messagein = in_queueM.pop(0) + messagein = messagein.decode('utf-8').upper() + logger.info(f"Processing: {topic} {messagein}") + if messagein != "PIN|OK": #ignore this one + + mac_pattern = r'([0-9A-Fa-f]{12})' + match = re.search(mac_pattern, messagein) + MAC = match.group(1) + + record = { + 'time': time.time(), + 'result': messagein + } + + # Convert dictionary to JSON string for storage in Redis list + record_json = json.dumps(record) + logger.info(f"Pushing to REDIS: from_alerter_{MAC} {record_json}") + redis_conn.lpush(f'from_alerter_{MAC}', record_json) + + # ProcessPacket(new_message_dict) + except Exception as err: + logger.error(err) + +def MAC_from_id(device_id): + + conn = get_db_connection() + + sql = f"SELECT device_mac FROM public.devices WHERE device_id = {device_id}" + with conn.cursor() as cur: + cur.execute(sql) + result = cur.fetchone() #cur.fetchall() + if result != None: + return result[0] + else: + return "" + def CheckRedisMessages(): + global reload_needed + requests_count = 0 #print(f"CheckRedisMessages") # Check if queue exists and has items. These items are from manual GUI interactions in alerts page @@ -4071,11 +4704,12 @@ def CheckRedisMessages(): queue_length = redis_conn.llen('send_requests') if queue_length > 0: - print(f"Processing send_requests message from queue...") # Process each item for i in range(queue_length): item_json = redis_conn.rpop('send_requests') + #logger.info(f"Received in REDIS: send_requests {item_json}") + print(f"Received in REDIS: send_requests {item_json}") if item_json is None: break @@ -4085,44 +4719,137 @@ def CheckRedisMessages(): requests_count += 1 # Print the record - print(f"Request #{requests_count}:") + action = "" + logger.info(f"Request #{requests_count}:") for key, value in record.items(): - print(f" {key}: {value}") + logger.info(f" {key}: {value}") - method = record["method"] - location_str = record["location"] - location = location_str.split("_")[2] - deployment_id = record["deployment_id"] - content = record["content"] - feature = record["feature"] - enabledCellContent = record["enabledCellContent"] - currentUnits = record["currentUnits"] - currentAlertTableMode = record["currentAlertTableMode"] #Warning/Alarm - test_only = record["test_only"] - #action = record["action"] - user_name = record["user_name"] - user_first_last_name = GetBeneficiaryFromDeployment(deployment_id) + function = "" + if "function" in record: + function = record["function"] + if function == "set_group": + group_id = record["group_id"] - if feature == "stuck": - msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} is spending more than {enabledCellContent} in {location}" - elif feature == "absent": - msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} did not visit {location} in more than {enabledCellContent[1:-1]} {currentUnits}" - elif feature == "tempLow": - msg_ext = f"{currentAlertTableMode}: {content} temperature is lower then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" - elif feature == "tempHigh": - msg_ext = f"{currentAlertTableMode}: {content} temperature is higher then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" - elif feature == "pressure": - msg_ext = f"{currentAlertTableMode}: {content} door was opened or closed in the {location} at {user_first_last_name}" - elif feature == "radar": - msg_ext = f"{currentAlertTableMode}: {content} motion detected in the {location} at {user_first_last_name}" + if "mac" in record: + MAC = record["mac"].upper() + + elif "device_id" in record: + device_id = record["device_id"] + MAC = MAC_from_id(device_id).upper() + + content = UNLOCK + MQSendM(f"/{MAC}", content) + time.sleep(2) + content = f"Z|{group_id}" + MQSendM(f"/{MAC}", content) + elif function == "reboot": + + if "mac" in record: + MAC = record["mac"].upper() + + elif "device_id" in record: + device_id = record["device_id"] + MAC = MAC_from_id(device_id).upper() + + content = UNLOCK + MQSendM(f"/{MAC}", content) + time.sleep(2) + content = "s" + MQSendM(f"/{MAC}", content) + record = { + 'time': time.time(), + 'result': "Reboot sent..." + } + + # Convert dictionary to JSON string for storage in Redis list + record_json = json.dumps(record) + logger.info(f"Pushing to REDIS: from_alerter_{MAC} {record_json}") + redis_conn.lpush(f'from_alerter_{MAC}', record_json) + + elif function == "set_well_id": + if "mac" in record: + MAC = record["mac"].upper() + else: + device_id = record["device_id"] + MAC = MAC_from_id(device_id).upper() + + well_id = record["well_id"] + + + content = UNLOCK + MQSendM(f"/{MAC}", content) + time.sleep(2) + content = f"-|{well_id}" + MQSendM(f"/{MAC}", content) + + elif function == "get_device_live": + if "mac" in record: + MAC = record["mac"].upper() + else: + device_id = record["device_id"] + MAC = MAC_from_id(device_id).upper() + + content = UNLOCK + MQSendM(f"/{MAC}", content) + time.sleep(2) + content = "r" + MQSendM(f"/{MAC}", content) + elif function == "set_network_id": + if "mac" in record: + MAC = record["mac"].upper() + else: + device_id = record["device_id"] + MAC = MAC_from_id(device_id).upper() + + + network_id = record["network_id"] + + content = UNLOCK + MQSendM(f"/{MAC}", content) + time.sleep(2) + content = f"0|{network_id}" + MQSendM(f"/{MAC}", content) else: - msg_ext = f"{currentAlertTableMode}: {content} {feature} in {location} at {user_first_last_name}" + if "action" in record: + action = record["action"] + if action == "reload": + reload_needed = True - SendAlerts(deployment_id, method, msg_ext, "Test message", user_name) - #these are testing messages, so do not count them as real triggered... so do not update in REDIS - #StoreLastSentToRedis(deployment_id) + if action != "reload": + method = record["method"] + location_str = record["location"] + location = location_str.split("_")[2] + deployment_id = record["deployment_id"] + content = record["content"] + feature = record["feature"] + enabledCellContent = record["enabledCellContent"] + currentUnits = record["currentUnits"] + currentAlertTableMode = record["currentAlertTableMode"] #Warning/Alarm + test_only = record["test_only"] - print("-" * 40) + user_name = record["user_name"] + user_first_last_name = GetBeneficiaryFromDeployment(deployment_id) + + if feature == "stuck": + msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} is spending more than {enabledCellContent} in {location}" + elif feature == "absent": + msg_ext = f"{currentAlertTableMode}: {content} {user_first_last_name} did not visit {location} in more than {enabledCellContent[1:-1]} {currentUnits}" + elif feature == "tempLow": + msg_ext = f"{currentAlertTableMode}: {content} temperature is lower then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" + elif feature == "tempHigh": + msg_ext = f"{currentAlertTableMode}: {content} temperature is higher then {enabledCellContent} {currentUnits} in {location} at {user_first_last_name}" + elif feature == "pressure": + msg_ext = f"{currentAlertTableMode}: {content} door was opened or closed in the {location} at {user_first_last_name}" + elif feature == "radar": + msg_ext = f"{currentAlertTableMode}: {content} motion detected in the {location} at {user_first_last_name}" + else: + msg_ext = f"{currentAlertTableMode}: {content} {feature} in {location} at {user_first_last_name}" + + SendAlerts(deployment_id, method, msg_ext, "Test message", user_name) + #these are testing messages, so do not count them as real triggered... so do not update in REDIS + #StoreLastSentToRedis(deployment_id) + + print("-" * 40) except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON from queue item: {e}") @@ -4136,6 +4863,8 @@ def CheckRedisMessages(): print(f"Processing send_requests message from queue...") item_json = redis_conn.rpop('new_alarms') + logger.info(f"Received in REDIS: new_alarms {item_json}") + if item_json is None: break @@ -4143,18 +4872,22 @@ def CheckRedisMessages(): record = json.loads(item_json) deployment_id = int(record["deployment_id"]) device_id = int(record["device_id"]) - print(record) + if device_id == 0: + reload_needed = True + else: - device_alarms_json = GetRedisString('alarm_device_settings_'+str(device_id)) - deployment_alarms_json = GetRedisString('alarm_deployment_settings_'+str(deployment_id)) + print(record) - alarms_settings_all[deployment_id] = json.loads(deployment_alarms_json) - device_alerts_all[device_id] = json.loads(device_alarms_json) - print(device_alarms_json) - print(deployment_alarms_json) + device_alarms_json = GetRedisString('alarm_device_settings_'+str(device_id)) + deployment_alarms_json = GetRedisString('alarm_deployment_settings_'+str(deployment_id)) - #method = record["method"] - #location_str = record["location"] + alarms_settings_all[deployment_id] = json.loads(deployment_alarms_json) + device_alerts_all[device_id] = json.loads(device_alarms_json) + print(device_alarms_json) + print(deployment_alarms_json) + + #method = record["method"] + #location_str = record["location"] except json.JSONDecodeError as e: logger.error(f"Failed to parse JSON from queue item: {e}") continue @@ -4162,8 +4895,19 @@ def CheckRedisMessages(): #print(f"Total requests processed: {requests_count}") return requests_count +def ReloadIfNeeded(): + global reload_needed, t1, t2 + if reload_needed: + t1.pause() + t2.pause() + ok = load_device_configurations() + print(f"load_device_configurations in ReloadIfNeeded! {ok}") + reload_needed = False + t1.resume() + t2.resume() # --- Main Execution --- + if __name__ == "__main__": logger.info(f"Starting Well Alert Monitoring Service (PID: {os.getpid()})...") @@ -4183,10 +4927,17 @@ if __name__ == "__main__": t1 = perpetualTimer(1, ensure_mqtt_connection) t2 = perpetualTimer(.01, ProcessQueue) + t3 = perpetualTimer(1, ReloadIfNeeded) + t4 = perpetualTimer(1, ensure_mqtt_connectionM) + t5 = perpetualTimer(.01, ProcessQueueM) SetupTasks() t1.start() t2.start() + t3.start() + t4.start() + t5.start() + schedule_task(task_a, "first", 1) schedule_task(task_b, "second", 1) musr = os.getenv('GU') @@ -4196,7 +4947,7 @@ if __name__ == "__main__": #SendAlerts(21, "MSG", f"Test: User way too long ({120} minutes) in Bathroom", "") SendAlerts(21, "EMAIL", f"well-alert was started", "Program started", "robster") - #SendAlerts(21, "SMS", f"Test: User way too long ({120} minutes) in Bathroom", "") + #SendAlerts(21, "SMS", f"Test: User way too long ({120} minutes) in Bathroom", "", "") #SendAlerts(21, "PHONE", f"Test: User way too long ({120} minutes) in Bathroom", "") SendPhoneCall("-4086462191", "Hi Robert. How are you?") #SendPhoneCall("4086462191", "Hi Robert. How are you?", "") @@ -4208,22 +4959,49 @@ if __name__ == "__main__": #SendPhoneCall("4086462191", "Hi Danko. How are you? Your mom seems stable.", "") #success = SendSMSTo("(408)646-2191", "This is only a test") + #time.sleep(10) + #content = UNLOCK + #MQSendM("/901506CA3C7C", content) + #time.sleep(2) + #content = "Z|1" + #MQSendM("/901506CA3C7C", content) + template_html = read_file("welcome_template_short.html") try: # Start the main monitoring loop monitor_thread = threading.Thread(target=monitor_devices, daemon=True) monitor_thread.start() + monitor_m_thread = threading.Thread(target=monitor_messages, daemon=True) + monitor_m_thread.start() # Keep the main thread alive until stop_event is set by signal or error while not stop_event.is_set(): # Can add periodic health checks here if needed CheckRedisMessages() + + # Optional: Check if threads are still alive and restart if needed + if not monitor_thread.is_alive(): + logger.error("Device monitor thread died! Restarting...") + monitor_thread = threading.Thread(target=monitor_devices, daemon=True) + monitor_thread.start() + + if not monitor_m_thread.is_alive(): + logger.error("Message monitor thread died! Restarting...") + monitor_m_thread = threading.Thread(target=monitor_messages, daemon=True) + monitor_m_thread.start() + time.sleep(1) # Check stop_event periodically - logger.info("Stop event received, waiting for monitoring thread to finish...") - monitor_thread.join(timeout=CHECK_INTERVAL_SECONDS + 10) # Wait for thread with timeout + logger.info("Stop event received, waiting for monitoring threads to finish...") + + # Wait for both threads to finish + monitor_thread.join(timeout=CHECK_INTERVAL_SECONDS + 10) if monitor_thread.is_alive(): - logger.warning("Monitoring thread did not finish cleanly.") + logger.warning("Device monitoring thread did not finish cleanly.") + + monitor_m_thread.join(timeout=10) + if monitor_m_thread.is_alive(): + logger.warning("Message monitoring thread did not finish cleanly.") except Exception as e: logger.exception(f"An unexpected critical error occurred in main execution: {e}") @@ -4231,4 +5009,4 @@ if __name__ == "__main__": logger.info("Well Alert Monitoring Service finished.") sys.exit(0) # Explicitly exit with success code after cleanup - print("Last line") + print("Last line") # This will never execute due to sys.exit(0) above