diff --git a/.env b/.env new file mode 100644 index 0000000..34d5305 --- /dev/null +++ b/.env @@ -0,0 +1,16 @@ +# Database settings +DB_NAME=wellnuo +DB_USER=well_app +DB_PASSWORD=well_app_2024 +DB_HOST=192.168.68.70 +DB_PORT=5432 + +# RabbitMQ settings +RABBITMQ_URL=amqp://well_pipe:wellnuo_2024@192.168.68.70:5672// +RABBITMQ_QUEUE=evgrid-dev-demo + +# Processing settings +PROCESSING_PERIOD=300 +QUEUE_LENGTH_THRESHOLD=1000 +BATCH_SIZE=100 + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3a39009 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.9-alpine + +WORKDIR /app + +# Install build dependencies for psycopg2 +RUN apk add --no-cache postgresql-dev gcc python3-dev musl-dev + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Create src directory and copy application code +RUN mkdir -p /app/src +COPY src/app.py /app/src/ +COPY src/new-ca.crt /app/src/new-ca.crt + +# Expose port for health checks +EXPOSE 8086 + +CMD ["python", "src/app.py"] \ No newline at end of file diff --git a/queue-muncher.yml b/queue-muncher.yml new file mode 100644 index 0000000..8e9411d --- /dev/null +++ b/queue-muncher.yml @@ -0,0 +1,37 @@ +version: 1.0 +provider: + name: openfaas + gateway: http://192.168.68.70:8084 +functions: + queue-muncher: + image: repo.eluxnetworks.net/queue-muncher:latest + labels: + com.openfaas.scale.zero: false + network: "host" # Add this line + environment: + read_timeout: "1h" + write_timeout: "1h" + exec_timeout: "1h" + + # RabbitMQ configuration + RABBITMQ_URL: "amqp://well_pipe:wellnuo_2024@192.168.68.70:5672//" + RABBITMQ_QUEUE: "evgrid-dev-demo" + REDIS_HOST: "192.168.68.70" + # Database configuration + DB_NAME: "wellnuo" + DB_USER: "well_app" + DB_PASSWORD: "well_app_2024" + DB_HOST: "192.168.68.70" + DB_PORT: "5432" + MQTTSERVER: "192.168.68.70" + MQTT_PORT: "1883" + # Processing settings + PROCESSING_PERIOD: "300" + QUEUE_LENGTH_THRESHOLD: "1000" + BATCH_SIZE: "100" + deploy: + resources: + limits: + memory: 256Mi + requests: + memory: 128Mi diff --git a/redis_test.py b/redis_test.py new file mode 100644 index 0000000..e99d37b --- /dev/null +++ b/redis_test.py @@ -0,0 +1,233 @@ +import os +import redis +import psycopg2 +from dotenv import load_dotenv +import ast +import time + +load_dotenv() + +DB_NAME = os.getenv('DB_NAME') +DB_USER = os.getenv('DB_USER') +DB_PASSWORD = os.getenv('DB_PASSWORD') +DB_HOST = os.getenv('DB_HOST') +DB_PORT = os.getenv('DB_PORT') + +# Connect to Redis (assuming it's running on localhost with default port) +r = redis.Redis(host='localhost', port=6379, db=0) + +def GetRedisString(key_name): + try: + result = r.get(key_name).decode('utf-8') + except: + result = None + return result + +def GetRedisInt(key_name): + try: + result = int(r.get(key_name).decode('utf-8')) + except: + result = None + return result + +def GetRedisFloat(key_name): + try: + result = float(r.get(key_name).decode('utf-8')) + except: + result = None + + return result + +value = GetRedisFloat('lastseen_501') +print(value) +r.set('lastseen_510', 1.444) + +value = GetRedisFloat('lastseen_510') +print(value) + +def get_db_connection(): + return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT) + +def GetLastPointQuery(device_id, field_name, threshold_value): + """ + Generate a SQL query to find the last point in radar_readings where the specified field + meets the threshold condition. + + Parameters: + device_id (int): The device ID to filter by + field_name (str): The field to check, e.g., "s2_max", "s28_min", etc. + threshold_value (float): The threshold value to compare against + + Returns: + str: SQL query string + """ + # Parse the field name to get base field and operation + parts = field_name.split('_') + base_field = parts[0] + operation = parts[1] if len(parts) > 1 else None + + # Define the field expression based on base_field + if base_field == 's28': + field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7" + elif base_field == 'm08': + field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9" + else: + field_expr = base_field + + # Define comparison operator based on operation + operator = ">" if operation == "max" else "<" + + # Generate the SQL query + query = f""" + SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} + AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1; + """ + + return query + + +def GetLastDetected(device_id, field_name, threshold_value): + # Example usage: + + query = GetLastPointQuery(device_id, field_name, threshold_value) + print(query) + last_detected = None + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(query) + last_detected = cur.fetchone()[0].timestamp() + + return last_detected + +def UpdateLastSeen(new_message_dict): + print(new_message_dict) + + device_id_s = str(new_message_dict['device_id']) + + #matches code in well-api + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + threshold_details = GetRedisString('radar_threshold'+device_id_s) + try: + radar_threshold_list = ast.literal_eval(threshold_details) + radar_threshold_signal = radar_threshold_list[0] + radar_threshold_value = radar_threshold_list[1] + except: + #key not found so read from DB, and store to key + sql = f""" + SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s}; + """ + + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(sql) + threshold_details = cur.fetchone()[0] #cur.fetchall()# + print(threshold_details) + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + if threshold_details != None: + + threshold_details_list = ast.literal_eval(threshold_details) + radar_threshold_signal = threshold_details_list[0] + radar_threshold_value = threshold_details_list[1] + + r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value])) + + + + #lets determine if presence is detected + component = radar_threshold_signal.split("_")[0] + radar_val = 0 + if component == "s28": + radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + elif component[0] == "s": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + elif component[0] == "m": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + + if radar_val > radar_threshold_value: #seen now + r.set('lastseen_'+device_id_s, new_message_dict['time']) + + else: #not seen now, but lets determine when he was and add the key + last_seen = GetRedisFloat('lastseen_'+device_id_s) + if last_seen == None: + last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value) + r.set('lastseen_'+device_id_s, last_seen) + + +new_message_dict = {'time': 1743554317.579559, 'device_id': 499, 'radar': [[100, 100, 0, 0, 0, 2226, 2654, 425, 523, 445, 340, 436, 339, 548], [100, 0, 0, 706, 657, 547, 570, 553, 509, 499]]} + +st = time.time() +UpdateLastSeen(new_message_dict) +print(time.time()-st) + +if False: + # Simple key-value + + value = r.get('simple_key') + print(f"Simple key value: {value.decode('utf-8')}") + + # WRITING DATA TO REDIS + + # Simple key-value + r.set('simple_key', 'Hello, Redis!') + + # Setting expiration (TTL) in seconds + r.setex('expiring_key', 60, 'This will expire in 60 seconds') + + # Working with numbers + r.set('counter', 0) + r.incr('counter') # Increment by 1 + r.incrby('counter', 5) # Increment by 5 + + # Lists + r.rpush('my_list', 'item1') + r.rpush('my_list', 'item2', 'item3') # Add multiple items + + # Sets + r.sadd('my_set', 'member1', 'member2', 'member3') + + # Hashes (dictionaries) + r.hset('user:1000', mapping={ + 'username': 'john_doe', + 'email': 'john@example.com', + 'visits': 10 + }) + + # READING DATA FROM REDIS + + # Simple key-value + value = r.get('simple_key') + print(f"Simple key value: {value.decode('utf-8')}") + + # Check current counter value + counter = r.get('counter') + print(f"Counter value: {counter.decode('utf-8')}") + + # Lists + all_items = r.lrange('my_list', 0, -1) # Get all items + print("List items:", [item.decode('utf-8') for item in all_items]) + + # Sets + all_members = r.smembers('my_set') + print("Set members:", [member.decode('utf-8') for member in all_members]) + + # Check if a member exists in a set + is_member = r.sismember('my_set', 'member1') + print(f"Is 'member1' in set? {is_member}") + + # Hashes + username = r.hget('user:1000', 'username') + print(f"Username: {username.decode('utf-8')}") + + # Get all hash fields + user_data = r.hgetall('user:1000') + print("User data:", {k.decode('utf-8'): v.decode('utf-8') for k, v in user_data.items()}) + + # Check remaining TTL for expiring key (in seconds) + ttl = r.ttl('expiring_key') + print(f"Expiring key TTL: {ttl} seconds") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c466b8f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +paho-mqtt +kombu>=5.0.0 +psycopg2-binary>=2.8.6 +python-dotenv>=0.15.0 +prometheus-client>=0.11.0 +redis diff --git a/secrets/mqtt-ca-cert b/secrets/mqtt-ca-cert new file mode 100644 index 0000000..fff2847 --- /dev/null +++ b/secrets/mqtt-ca-cert @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHTCCAgWgAwIBAgIUak6HXnrheUs5SnAXYa+jUU1l28YwDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UEAwwTZWx1eG5ldHdvcmtzLm5ldCBDQTAeFw0yNTAyMjYwNTA4 +MDFaFw0zNTAyMjQwNTA4MDFaMB4xHDAaBgNVBAMME2VsdXhuZXR3b3Jrcy5uZXQg +Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC0ooycKCciiC9CY6mP +ph+WS1I42kf3Io7kzZ1/gMFb1EJxabMlAga94NmWO9uQkwiaFQOWrH2cvLyLL9kD +kz7ZmQUij4C2sHU2CQkqG0mo8xeBxaFYmSXwAd0jYd/6GHABCF63/USWIrfUPkNt +f7HaoM6yPZ61w3Ne0G5Kfd5/HsTiRiGbpCXHUpp6NMeuG59j1Ma+eEDXPKMimKti +R3bCMI5tOsCOey6yjEP+DituitqUZZYKPmk+7cvi1tK50OGMT330P+mPZPJQxauK +dew3mhTv5iKiGYhdN5ZFUy1KVJHf3y3rmNjEWesU0X8483v4tuhhcjNIA+D8/Tcn +qKQ5AgMBAAGjUzBRMB0GA1UdDgQWBBTd6ubEStLdE60De4Re5IQENKn/aTAfBgNV +HSMEGDAWgBTd6ubEStLdE60De4Re5IQENKn/aTAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQAMkrP0zdt1uAOI1B77nV7+EZSzJxahrubo8opjrkvd +4/stCVG6OfDutxARvCdC4OoJiRPGXBiA22bIi7ZMl7DTpA+CeFEICfF3MKcf8xIT +V5sCm25dX+KHwACWNccNazpIlIAVGxMmcSs70SlMIfPksCW2FRibsVnzESQCgQcP +e6owfVvsnQNN4UADki4JMZJ1RQ/nUNM3aNJSf/SFVJYjiUXHLNJY65FfiYV4MGRi +Yq+NDPs3D0KLkwQ03FFcw56TdPnCAYuihDHkITuQDtS1DawLhuwFDDPcys5Mnigc +1Y1o5V4Z6L6xkttwgD9zP3DQJscDZ/Gki1NqYV3TvJPr +-----END CERTIFICATE----- diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..1b72122 --- /dev/null +++ b/src/app.py @@ -0,0 +1,1307 @@ +import signal +import json +import redis +import ast +import sys +import os +import json +import random +from dotenv import load_dotenv +from kombu import Connection #, Exchange, Queue, Consumer, Message +from kombu.exceptions import TimeoutError +from kombu.serialization import register +import paho.mqtt.client as mqtt +from socket import timeout as SocketTimeout +import socket +import psycopg2 +from psycopg2.extras import execute_values +import base64 +import struct +import time +from datetime import datetime, timezone +import traceback +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading + +import time +import logging + +import uuid + +client = None # Initialize to None, not a string +connected = False + + +try: + from prometheus_client import generate_latest, start_http_server, CONTENT_TYPE_LATEST, Gauge, Counter, REGISTRY + prometheus_available = True +except: + prometheus_available = False + +keep_processing = True + + +# Set up logging +default_uuid = str(uuid.uuid4()) +logger = logging.getLogger(default_uuid) +logger.propagate = False +logger.setLevel(logging.DEBUG) #DEBUG, INFO, WARNING, ERROR, and CRITICAL +handler = logging.StreamHandler() +#formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('%(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +INSTANCE_ID = f"queue-muncher-{default_uuid}" +last_heartbeat = 0 + +logger.warning("Script started") +load_dotenv() + +# Global flag to control the queue processing loop +running = True + +# RabbitMQ configuration +RABBITMQ_URL = os.getenv('RABBITMQ_URL') + +if False: + QUEUE_NAME = "well-devices"#os.getenv('RABBITMQ_QUEUE') +else: + QUEUE_NAME = os.getenv('RABBITMQ_QUEUE') + +DB_NAME = os.getenv('DB_NAME') +DB_USER = os.getenv('DB_USER') +DB_PASSWORD = os.getenv('DB_PASSWORD') +DB_HOST = os.getenv('DB_HOST') +DB_PORT = os.getenv('DB_PORT') +PROCESSING_PERIOD = int(os.getenv('PROCESSING_PERIOD', '300')) # 5 minutes +QUEUE_LENGTH_THRESHOLD = int(os.getenv('QUEUE_LENGTH_THRESHOLD', '1000')) +redis_host = os.getenv('REDIS_HOST', '192.168.68.70') +redis_host = '192.168.68.70' +cache_dir = "cache" +MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net" +mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port + +use_tls = MQTTSERVER not in ["localhost", "127.0.0.1", "192.168.68.70"] + +mqtt_client_id = "client9-authn-ID" + +MACS2id = {} +ids2MAC = {} +BATCH_SIZE = int(os.getenv('BATCH_SIZE', '50')) + + +logger.info(f"DB_NAME= {DB_NAME}") +logger.info(f"DB_USER= {DB_USER}") +logger.info(f"DB_PASSWORD= {DB_PASSWORD}") +logger.info(f"DB_HOST= {DB_HOST}") +logger.info(f"DB_PORT= {DB_PORT}") +logger.info(f"PROCESSING_PERIOD= {PROCESSING_PERIOD}") +logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") +logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") +logger.info(f"BATCH_SIZE= {BATCH_SIZE}") + +logger.info(f"mqtt_port= {mqtt_port}") +logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") +logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") +logger.info(f"QUEUE_NAME= {QUEUE_NAME}") +logger.info(f"RABBITMQ_URL= {RABBITMQ_URL}") + +# Set up Prometheus metrics if available +if prometheus_available: + QUEUE_SIZE = Gauge('queue_muncher_queue_size', 'Number of messages in the RabbitMQ queue') + MESSAGES_PROCESSED = Counter('queue_muncher_messages_processed', 'Number of messages processed') + PROCESSING_TIME = Gauge('queue_muncher_processing_time', 'Time spent processing messages in seconds') + BATCH_SIZE_GAUGE = Gauge('queue_muncher_batch_size', 'Size of used batch') + logger.info(f"QUEUE_SIZE= {QUEUE_SIZE}") + logger.info(f"MESSAGES_PROCESSED= {MESSAGES_PROCESSED}") + logger.info(f"PROCESSING_TIME= {PROCESSING_TIME}") + logger.info(f"BATCH_SIZE_GAUGE= {BATCH_SIZE_GAUGE}") +else: + logger.info(f"prometheus_available= {prometheus_available}") + +class HealthCheckHandler(BaseHTTPRequestHandler): + def do_GET(self): + logger.info(f"in do_GET {self.path}") + if '/health' in self.path: + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + response = {"status": "healthy"} + self.wfile.write(json.dumps(response).encode()) + else: + self.send_response(404) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(b'Not Found') + + # Suppress console log messages for each request + def log_message(self, format, *args): + return + +def GetRedisString(key_name): + try: + result = r.get(key_name).decode('utf-8') + except: + result = None + return result + +def GetRedisInt(key_name): + try: + result = int(r.get(key_name).decode('utf-8')) + except: + result = None + return result + +def GetRedisFloat(key_name): + try: + result = float(r.get(key_name).decode('utf-8')) + except: + result = None + + return result + + +def run_server(port=8080): + server_address = ('', port) + httpd = HTTPServer(server_address, HealthCheckHandler) + print(f"Health check server running on port {port}") + server_thread = threading.Thread(target=httpd.serve_forever) + server_thread.daemon = True + server_thread.start() + return httpd + +def process_queue(): + while running: + timestamp = datetime.now().isoformat() + print(f"[{timestamp}] Simulated Queue muncher processing...") + time.sleep(1) + + +def MQSend(topic, content): + global client, mqtt_client_id, connected + + # Start a separate thread for MQTT sending to avoid blocking RabbitMQ processing + mqtt_thread = threading.Thread( + target=MQSend_worker, + args=(topic, content), + daemon=True + ) + mqtt_thread.start() + # Return immediately so RabbitMQ processing can continue + return + +def MQSend_worker(topic, content): + """Worker function that handles MQTT sending in a separate thread""" + global client, mqtt_client_id, connected + + currentTime = int(time.time()) + #print(">", currentTime, topic, content) + + # Limit connection attempts to prevent flooding + max_connection_attempts = 3 + connection_attempts = 0 + + # Use a local semaphore to prevent multiple simultaneous connection attempts + if not hasattr(MQSend_worker, 'connection_in_progress'): + MQSend_worker.connection_in_progress = False + + try: + # Only attempt connection if not already in progress + if client is None or not connected: + if not MQSend_worker.connection_in_progress and connection_attempts < max_connection_attempts: + MQSend_worker.connection_in_progress = True + try: + print(f"Thread {threading.current_thread().name} attempting MQTT connection") + client = connect_mqtt(mqtt_client_id) + connection_attempts += 1 + finally: + MQSend_worker.connection_in_progress = False + + # If we have a valid connection, try to send + if connected and client is not None: + try: + result = client.publish(topic, content, qos=0, retain=False) # Lower QoS for better throughput + if hasattr(result, 'rc') and result.rc == mqtt.MQTT_ERR_SUCCESS: + pass + #print(f"Successfully published to {topic}") + else: + print(f"Failed to publish to {topic}") + except Exception as err: + print(f"Error in MQSend publish: {str(err)}") + else: + print("MQTT not connected, message not sent") + + except Exception as err: + print(f"Error in MQSend_worker: {str(err)}") + + +def connect_mqtt(client_id): + global client, connected + + # Add timeout for the entire operation + connection_timeout = 5 # 5 seconds max for the whole connection attempt + start_time = time.time() + + try: + random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6)) + unique_client_id = f"{random_suffix}-{int(time.time())}" + print(f"Connecting {unique_client_id}") + + # Determine certificate path based on environment + certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt' + + # Create client with MQTTv5 protocol + mqtt_client = mqtt.Client(client_id=unique_client_id, protocol=mqtt.MQTTv5) + + # Define callbacks with MQTTv5 signature + def on_connect_wrapper(client, userdata, flags, rc, properties=None): + global connected + if rc == 0: + connected = True + print(f"Connected to {MQTTSERVER} with {unique_client_id}") + subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1)] + print("subscribing to ", subbedTo) + client.subscribe(subbedTo) + else: + print(f"Failed to connect, return code {rc}") + connected = False + + def on_disconnect_wrapper(client, userdata, rc, properties=None): + global connected + connected = False + print(f"Disconnected with result code {rc}") + + # Set the callbacks + mqtt_client.on_connect = on_connect_wrapper + mqtt_client.on_disconnect = on_disconnect_wrapper + + + if use_tls: + certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt' + mqtt_client.tls_set(ca_certs=certificate_path) + + # Set TLS with your certificate + #mqtt_client.tls_set( + # ca_certs=certificate_path, + #) + + # Set username and password + mqtt_client.username_pw_set("well_user", "We3l1_best!") + + # Connect with shorter timeout + mqtt_client.connect(MQTTSERVER, mqtt_port, keepalive=30) + + # Start the loop in a background thread + mqtt_client.loop_start() + + # Wait for connection with timeout + wait_end_time = min(start_time + connection_timeout, time.time() + 3) + while not connected and time.time() < wait_end_time: + time.sleep(0.1) + + # Check if we connected successfully + if connected: + print(f"MQTT connected successfully in {time.time() - start_time:.2f} seconds") + client = mqtt_client + return mqtt_client + else: + print(f"MQTT connection attempt timed out after {time.time() - start_time:.2f} seconds") + mqtt_client.loop_stop() + connected = False + return None + + except Exception as e: + connection_time = time.time() - start_time + print(f"Connection error after {connection_time:.2f} seconds: {e}") + connected = False + return None + +def ensure_mqtt_connection(): + global client, mqtt_client_id, connected + + if not connected: + print("MQTT connection needed, attempting to connect...") + try: + if client: + try: + client.disconnect() + client.loop_stop() + except: + pass # Ignore errors on disconnect + + client = connect_mqtt(mqtt_client_id) + + # Give a moment for connection to establish + time.sleep(2) + + return connected + except Exception as e: + print(f"Reconnection error: {e}") + return False + + return True # Already connected + +# Define the watchdog function after the connect_mqtt function is defined +def mqtt_watchdog(): + """Thread to monitor and maintain MQTT connection""" + global client, mqtt_client_id, connected + + while True: + if not connected: + try: + print("MQTT watchdog: Reconnecting...") + try: + # Only try to disconnect if client is a proper object + if hasattr(client, 'disconnect') and callable(client.disconnect): + client.disconnect() + except: + pass # Ignore errors during disconnect + + # Use the global connect_mqtt function + + client = connect_mqtt(mqtt_client_id) + except Exception as e: + print(f"MQTT watchdog: Reconnection failed: {e}") + + time.sleep(30) # Check every 30 seconds + +# Start the watchdog thread +#mqtt_watchdog_thread = threading.Thread(target=mqtt_watchdog, daemon=True) +#mqtt_watchdog_thread.start() + +def signal_handler(sig, frame): + global running + print("Shutting down gracefully...") + running = False + httpd.shutdown() + print("HTTP server closed") + sys.exit(0) + +def monitor_db_connection(conn): + """Test the database connection and return diagnostic information.""" + try: + # Create a new cursor + with conn.cursor() as cursor: + # Simple query to check connection + cursor.execute("SELECT 1") + result = cursor.fetchone() + + # Get connection info + cursor.execute(""" + SELECT + datname, + usename, + client_addr, + state, + backend_start, + xact_start, + query_start, + state_change + FROM + pg_stat_activity + WHERE + pid = pg_backend_pid() + """) + conn_info = cursor.fetchone() + + return { + "status": "connected" if result == (1,) else "error", + "backend_pid": conn_info[0] if conn_info else None, + "database": conn_info[0] if conn_info else None, + "user": conn_info[1] if conn_info else None, + "client_addr": conn_info[2] if conn_info else None, + "state": conn_info[3] if conn_info else None, + "backend_start": conn_info[4] if conn_info else None, + "xact_start": conn_info[5] if conn_info else None, + "query_start": conn_info[6] if conn_info else None, + "state_change": conn_info[7] if conn_info else None + } + except Exception as e: + return { + "status": "error", + "error_type": type(e).__name__, + "error_message": str(e) + } + +def check_connection_periodically(conn, interval=5): + """Check connection health periodically and log results.""" + last_check_time = 0 + while True: + current_time = time.time() + if current_time - last_check_time >= interval: + last_check_time = current_time + try: + status = monitor_db_connection(conn) + if status["status"] == "connected": + logger.debug(f"DB connection healthy: pid={status.get('backend_pid')}, state={status.get('state')}") + else: + logger.warning(f"DB connection issue: {status}") + except Exception as e: + logger.error(f"Failed to check DB connection: {e}") + yield # Allow other operations to proceed + +def process_messages(connection, db_conn): + global last_heartbeat + logger.info("Starting batch processing") + + # Start connection monitoring + connection_monitor = check_connection_periodically(db_conn, interval=3) + connection_health_thread = threading.Thread( + target=lambda: [next(connection_monitor) or time.sleep(0.1) for _ in range(int(PROCESSING_PERIOD * 10))], + daemon=True + ) + connection_health_thread.start() + + + try: + # Get a channel + channel = connection.channel() + logger.info("Channel created") + + # Basic QoS with all required parameters + channel.basic_qos( + prefetch_size=0, + prefetch_count=1, + a_global=False + ) + logger.info("Basic QoS set") + + data_batches = { + 'sensors': [], + 'radar': [], + 'temperature': [], + 'humidity': [], + 'pressure': [], + 'light': [] + } + + #def diagnostic_callback(msg): + #"""Simple diagnostic callback that just acknowledges messages""" + #logger.info("Diagnostic callback received message") + #channel.basic_ack(delivery_tag=msg.delivery_info['delivery_tag']) + #MESSAGES_PROCESSED.inc() + + ## Use this instead of your normal callback for testing + #channel.basic_consume( + #queue=QUEUE_NAME, + #callback=diagnostic_callback, + #no_ack=False + #) + + def callback(msg): + """ + Callback function for processing messages + msg: amqp.basic_message.Message object containing the message data + """ + try: + #logger.info("\n=== Message Received ===") + #logger.info(f"Delivery info: {msg.delivery_info}") + #logger.info(f"Properties: {msg.properties}") + #logger.info(f"Body type: {type(msg.body)}") + #logger.info(f"Body length: {len(msg.body) if msg.body else 0}") + + if isinstance(msg.body, (bytes, bytearray)): + hex_dump = ' '.join(f'{b:02x}' for b in msg.body[:32]) + #logger.info(f"First 32 bytes (hex): {hex_dump}") + data = {'data_base64': base64.b64encode(msg.body).decode()} + else: + if isinstance(msg.body, str): + if msg.body[0] == "{": + original_data = msg.body.encode() + data = {'data_base64': base64.b64encode(original_data).decode()} + else: + data = msg.body + #when from well_cloud_server + #'{"id": "c9afe11a-ffe2-4a95-ad9b-828deffa70b5", "source": "well_cloud_server", "type": "MQTT.EventPublished", "data_base64": "f88ad36750980e0064b708890414ed1300001104557844f628ee41ea0136eb52410a00db97ca480193f2a54c02cde59f4c03c97db04c04ec54974a051f64914a0631aa8b4a07d39da74808654fb4480931ddc048", "time": "2025-03-14T01:48:41.000+00:00", "specversion": "1.0", "datacontenttype": "application/octet-stream", "subject": "wellnuotopics/topic1"}' + + with PROCESSING_TIME.time(): + with db_conn.cursor() as cursor: + data_type, parsed_data = process_message_data(cursor, data) + if data_type and parsed_data: + #logger.info(f"Successfully processed as {data_type}") + MQSend("/"+ids2MAC[parsed_data["device_id"]], json.dumps(parsed_data)) + if data_type in data_batches: + data_batches[data_type].append(parsed_data) + batch_size = len(data_batches[data_type]) + #logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}") + + if batch_size >= BATCH_SIZE: + #logger.info(f"Processing {data_type} batch") + + + #filename = f"data_{data_type}.pkl" # Example: data_temperature.pkl + #backup_dir = "data_backups_100" # Optional: Place backups in a subdirectory + + #os.makedirs(backup_dir, exist_ok=True) # Create directory if it doesn't exist + #full_filename = os.path.join(backup_dir, filename) + + + ## 2. Pickle the data + #try: + #with open(full_filename, 'wb') as f: + #pickle.dump(data_batches, f) + #print(f"Data of type '{data_type}' pickled to: {full_filename}") # Confirmation message + + #except IOError as e: + #print(f"Error writing pickle file: {e}") + #raise # Re-raise the exception for handling upstream. + + + insert_into_timescaledb_My(cursor, data_type, data_batches[data_type]) + db_conn.commit() + data_batches[data_type].clear() + + channel.basic_ack(delivery_tag=msg.delivery_info['delivery_tag']) + MESSAGES_PROCESSED.inc() + #logger.info("Message acknowledged") + + except Exception as e: + logger.error(f"Message processing error: {str(e)}") + logger.exception("Full traceback:") + # Reject the message without requeue if we can't process it + channel.basic_reject(delivery_tag=msg.delivery_info['delivery_tag'], requeue=False) + + # Try to get queue info + try: + queue_info = channel.queue_declare(queue=QUEUE_NAME, passive=True) + logger.info(f"Queue info: message_count={queue_info.message_count}, consumer_count={queue_info.consumer_count}") + except Exception as e: + logger.error(f"Error getting queue info: {e}") + raise + + # Basic consume with correct callback signature + logger.info(f"Starting basic_consume on queue {QUEUE_NAME}") + channel.basic_consume( + queue=QUEUE_NAME, + callback=callback, + no_ack=False + ) + + logger.info("Starting to consume messages...") + + start_time = time.time() + last_queue_check = 0 + queue_check_interval = 10 + + # Start consuming with timeout checks + while time.time() - start_time < PROCESSING_PERIOD: + try: + # Process messages with very short timeout + #connection.drain_events(timeout=0.1) + + try: + # Log before attempting to process + connection.drain_events(timeout=0.1) + except (socket.timeout, SocketTimeout, TimeoutError): + continue + except Exception as e: + logger.error(f"Error during drain_events: {e}") + continue + + + # Periodic queue check + current_time = time.time() + if current_time - last_queue_check > queue_check_interval: + try: + queue_info = channel.queue_declare(queue=QUEUE_NAME, passive=True) + logger.info(f"Queue status - Messages: {queue_info.message_count}, " + f"Consumers: {queue_info.consumer_count}") + last_queue_check = current_time + except Exception as e: + logger.error(f"Error checking queue status: {e}") + + # Heartbeat update + if time.time() - last_heartbeat > 10: + update_heartbeat(db_conn) + last_heartbeat = time.time() + logger.info("Heartbeat updated") + + except (socket.timeout, SocketTimeout, TimeoutError): + continue # Just continue on timeout + + except ConnectionError as e: + logger.error(f"Connection error: {e}") + raise + + logger.info("Consumption period ended, closing channel") + channel.close() + + except Exception as e: + logger.error(f"Critical error in process_messages: {e}") + logger.exception("Full traceback:") + raise + finally: + # Process any remaining batches + with db_conn.cursor() as cursor: + for data_type, batch in data_batches.items(): + if batch: + insert_into_timescaledb(cursor, data_type, batch) + db_conn.commit() + + logger.info("Batch processing completed") + +################################################################################################# +def main_loop(): + global keep_processing, client + + print("Initializing MQTT client at startup") + client = connect_mqtt(mqtt_client_id) + + while keep_processing: + try: + with Connection(RABBITMQ_URL) as rabbit_conn: + # Get a fresh connection for each batch processing session + db_conn = get_db_connection() + #process_messages(rabbit_conn, db_conn, max_processing_time=60) # Shorter processing window + process_messages(rabbit_conn, db_conn) # Shorter processing window + + # Check if we should continue after each batch + if not should_continue_processing(db_conn): + logger.info("Preparing to exit...") + keep_processing = False + deregister_instance(db_conn) + break + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(5) # Wait before retrying + +def insert_into_timescaledb(cursor, data_type, data_batch): + if not data_batch: + return + + # Use extremely small batches to test + MAX_BATCH = 5 + + total_inserted = 0 + + for i in range(0, len(data_batch), MAX_BATCH): + sub_batch = data_batch[i:i+MAX_BATCH] + + # Prepare query and values based on data type + if data_type == 'sensors': + query = """ + INSERT INTO sensor_readings (time, device_id, temperature, humidity, pressure, light, + s0, s1, s2, s3, s4, s5, s6, s7, s8, s9, mtype) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), + row.get('device_id'), row.get('temperature'), row.get('humidity'), row.get('pressure'), row.get('light'), + *(row.get('smell', [None]*10)), row.get('mtype')) for row in sub_batch] + + elif data_type == 'radar': + query = """ + INSERT INTO radar_readings (time, device_id, absent, moving, stationary, "both", + m0, m1, m2, m3, m4, m5, m6, m7, m8, + s0, s1, s2, s3, s4, s5, s6, s7, s8) + VALUES %s + """ + values = [] + for row in sub_batch: + motion_energy = row['radar'][0] + stationary_energy = row['radar'][1] + + # Perform element-wise division + absent = motion_energy[1] / motion_energy[0] if motion_energy[0] != 0 else 0 + moving = motion_energy[2] / motion_energy[0] if motion_energy[0] != 0 else 0 + stationary = motion_energy[3] / motion_energy[0] if motion_energy[0] != 0 else 0 + both = motion_energy[4] / motion_energy[0] if motion_energy[0] != 0 else 0 + + m_values = [v / motion_energy[0] if motion_energy[0] != 0 else 0 for v in motion_energy[5:14]] + s_values = [v / stationary_energy[0] if stationary_energy[0] != 0 else 0 for v in stationary_energy[1:10]] + + values.append(( + datetime.fromtimestamp(row['time'], tz=timezone.utc), + row['device_id'], + absent, moving, stationary, both, + *m_values, + *s_values + )) + + elif data_type in ['temperature', 'humidity', 'pressure', 'light']: + query = f""" + INSERT INTO sensor_readings (time, device_id, {data_type}) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], + row.get(data_type)) for row in sub_batch] + else: + raise ValueError(f"Unknown data type: {data_type}") + + try: + # Add explicit transaction control + cursor.execute("BEGIN") + execute_values(cursor, query, values) + cursor.execute("COMMIT") + + batch_num = i//MAX_BATCH + 1 + total_inserted += len(values) + #logger.warning(f"Inserted {len(values)} {data_type} readings (batch {batch_num}, total: {total_inserted}/{len(data_batch)})") + + except Exception as e: + # Roll back transaction on error + try: + cursor.execute("ROLLBACK") + except Exception as rollback_error: + logger.error(f"Error during rollback: {rollback_error}") + + logger.error(f"Error inserting {data_type} data: {e}") + if values: + logger.error(f"Problematic values: {values}") + + # Re-raise the exception so the caller can handle it + raise + + return total_inserted + +def insert_into_timescaledb_My(cursor, data_type, data_batch): + if not data_batch: + return + + if data_type == 'sensors': + query = """ + INSERT INTO sensor_readings (time, device_id, temperature, humidity, pressure, light, + s0, s1, s2, s3, s4, s5, s6, s7, s8, s9, mtype) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), + row.get('device_id'), row.get('temperature'), row.get('humidity'), row.get('pressure'), row.get('light'), + *(row.get('smell', [None]*10)), row.get('mtype')) for row in data_batch] + + elif data_type == 'radar': + query = """ + INSERT INTO radar_readings (time, device_id, absent, moving, stationary, "both", + m0, m1, m2, m3, m4, m5, m6, m7, m8, + s0, s1, s2, s3, s4, s5, s6, s7, s8) + VALUES %s + """ + values = [] + for row in data_batch: + motion_energy = row['radar'][0] + stationary_energy = row['radar'][1] + + # Perform element-wise division + absent = motion_energy[1] / motion_energy[0] if motion_energy[0] != 0 else 0 + moving = motion_energy[2] / motion_energy[0] if motion_energy[0] != 0 else 0 + stationary = motion_energy[3] / motion_energy[0] if motion_energy[0] != 0 else 0 + both = motion_energy[4] / motion_energy[0] if motion_energy[0] != 0 else 0 + + m_values = [v / motion_energy[0] if motion_energy[0] != 0 else 0 for v in motion_energy[5:14]] + s_values = [v / stationary_energy[0] if stationary_energy[0] != 0 else 0 for v in stationary_energy[1:10]] + + values.append(( + datetime.fromtimestamp(row['time'], tz=timezone.utc), + row['device_id'], + absent, moving, stationary, both, + *m_values, + *s_values + )) + + elif data_type in ['temperature', 'humidity', 'pressure', 'light']: + query = f""" + INSERT INTO sensor_readings (time, device_id, {data_type}) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], + row.get(data_type)) for row in data_batch] + + else: + raise ValueError(f"Unknown data type: {data_type}") + + try: + execute_values(cursor, query, values) + #logger.warning(f"Inserted {len(values)} {data_type} readings") + except Exception as e: + print(f"Error inserting {data_type} data: {e}") + print(f"Problematic values: {values[:5]}...") # Print first 5 values for debugging + + logger.error(f"Error inserting {data_type} data: {e}") + logger.error(f"Problematic values: {values[:5]}...") # Print first 5 values for debugging + raise + +def GetLastPointQuery(device_id, field_name, threshold_value): + """ + Generate a SQL query to find the last point in radar_readings where the specified field + meets the threshold condition. + + Parameters: + device_id (int): The device ID to filter by + field_name (str): The field to check, e.g., "s2_max", "s28_min", etc. + threshold_value (float): The threshold value to compare against + + Returns: + str: SQL query string + """ + # Parse the field name to get base field and operation + parts = field_name.split('_') + base_field = parts[0] + operation = parts[1] if len(parts) > 1 else None + + # Define the field expression based on base_field + if base_field == 's28': + field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7" + elif base_field == 'm08': + field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9" + else: + field_expr = base_field + + # Define comparison operator based on operation + operator = ">" if operation == "max" else "<" + + # Generate the SQL query + query = f""" + SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} + AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1; + """ + + return query + +def GetIdFromMAC(cursor, MAC): + try: + query = "SELECT device_id FROM devices WHERE device_mac = '" + MAC + "'" + + cursor.execute(query) + result = cursor.fetchone() + if result and len(result) > 0: + return result[0] + else: + query = f""" + INSERT INTO public.devices (device_mac, well_id, description) VALUES ( + '{MAC}', (SELECT COALESCE(MAX(well_id), 0) + 1 FROM public.devices), '') + RETURNING well_id, device_id + """ + cursor.execute(query) + result = cursor.fetchone() + cursor.connection.commit() # Use cursor's connection, not undefined 'conn' + new_device_id = result[1] + return new_device_id + + except Exception as e: + logger.error(f"Error in GetIdFromMAC sql= {query} {e}") + return 0 + +def GetLastDetected(device_id, field_name, threshold_value): + # Example usage: + + query = GetLastPointQuery(device_id, field_name, threshold_value) + print(query) + last_detected = None + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(query) + last_detected = cur.fetchone()[0].timestamp() + + return last_detected + +def UpdateLastSeen(new_message_dict): + print(new_message_dict) + + device_id_s = str(new_message_dict['device_id']) + + #matches code in well-api + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + threshold_details = GetRedisString('radar_threshold'+device_id_s) + try: + radar_threshold_list = ast.literal_eval(threshold_details) + radar_threshold_signal = radar_threshold_list[0] + radar_threshold_value = radar_threshold_list[1] + except: + #key not found so read from DB, and store to key + sql = f""" + SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s}; + """ + + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(sql) + threshold_details = cur.fetchone()[0] #cur.fetchall()# + print(threshold_details) + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + if threshold_details != None: + + threshold_details_list = ast.literal_eval(threshold_details) + radar_threshold_signal = threshold_details_list[0] + radar_threshold_value = threshold_details_list[1] + + r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value])) + + + try: + + #lets determine if presence is detected + component = radar_threshold_signal.split("_")[0] + radar_val = 0 + if component == "s28": + radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + elif component[0] == "s": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + elif component[0] == "m": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + + if radar_val > radar_threshold_value: #seen now + r.set('lastseen_'+device_id_s, new_message_dict['time']) + + else: #not seen now, but lets determine when he was and add the key + last_seen = GetRedisFloat('lastseen_'+device_id_s) + if last_seen == None: + last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value) + r.set('lastseen_'+device_id_s, last_seen) + + + except Exception as e: + print(f"UpdateLastSeen failed: {e}") + +def process_message_data(cursor, body): + + global MACS2id, ids2MAC + """Process a single message from the queue.""" + try: + if isinstance(body, dict): + data = body + else: + data = json.loads(body) + + if "data_base64" in data: + + decrypt_data = data["data_base64"] + #print(data["time"]) + decrypt_data = base64.b64decode(decrypt_data) + decrypt_data_old = decrypt_data + if isinstance(decrypt_data, bytes): + decrypt_data = decrypt_data.decode('utf-8') + m_type = 0 + new_message_dict = {} + #print(decrypt_data) + if len(decrypt_data) > 14: + + if "data_base64" in decrypt_data: + data = json.loads(decrypt_data) + payload = data["data_base64"] + decrypt_data = base64.b64decode(payload) + + if len(decrypt_data) > 14: + pointer = 0 + lenn = 4 + #print ("") + seconds = struct.unpack(' 0: + + + new_message_dict = {"time":packet_time, "device_id":device_id} + + line_entry = str(packet_time) + dcr_l = len(decrypt_data) + lenn_1 = 1 + + message_type = struct.unpack(' 0: + st = time.time() + if message_type == 16: #Radar message + m_type = "radar" + #counter, no targets, moving targets, stationary targets, both, 9 moving gates energy + #counter, 9 stationary gates energy + pointer = pointer + lenn_1 + motion_energy = [0] * 14 + stationary_energy = [0] * 10 + #print("@1",time.time()-st) + lenn = 2 + for i in range(14): + V = struct.unpack('= 0x20 and message_type <= 0x2F): #Sensors message + #work_dict = dict(common_dict) + #print("@4",time.time()-st) + m_type = "sensors" + pointer = pointer + lenn_1 + + lenn = 4 + P = struct.unpack(' NOW() - INTERVAL '60 seconds' + """) + + return cur.fetchone()[0] + +def register_instance(conn): + + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO queue_muncher_instances (instance_id, last_heartbeat, status, container_id) + VALUES (%s, NOW(), 'active', %s) + ON CONFLICT (instance_id) DO UPDATE + SET last_heartbeat = NOW(), status = 'active' + """, (INSTANCE_ID, "")) + conn.commit() + + +def should_continue_processing(conn): + queue_length = get_queue_length() + instance_count = get_active_instances(conn) + + logger.info(f"{INSTANCE_ID}Current queue length: {queue_length}, Instance count: {instance_count}") + + if queue_length > QUEUE_LENGTH_THRESHOLD: + return True + elif instance_count > 1: + return False + else: + return True # If this is the last instance, it should continue processing + +def deregister_instance(conn): + with conn.cursor() as cur: + cur.execute(""" + UPDATE queue_muncher_instances + SET status = 'inactive' + WHERE instance_id = %s + """, (INSTANCE_ID,)) + conn.commit() + +client = connect_mqtt(mqtt_client_id) + +def mqtt_network_diagnostics(): + """Run network diagnostics to troubleshoot MQTT connection""" + try: + print("Running MQTT connection diagnostics...") + + # Try DNS resolution + try: + import socket + ip_address = socket.gethostbyname(MQTTSERVER) + print(f"DNS resolution for {MQTTSERVER}: {ip_address}") + except Exception as e: + print(f"DNS resolution failed: {e}") + + # Try a basic socket connection to test reachability + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5) + result = s.connect_ex((MQTTSERVER, mqtt_port)) + if result == 0: + print(f"Socket connection to {MQTTSERVER}:{mqtt_port} successful") + else: + print(f"Socket connection failed with error code: {result}") + s.close() + except Exception as e: + print(f"Socket test failed: {e}") + + # Try a ping (requires shell access) + try: + import subprocess + ping_result = subprocess.run(["ping", "-c", "1", "-W", "5", MQTTSERVER], + capture_output=True, text=True, timeout=6) + print(f"Ping result: {ping_result.returncode}") + print(ping_result.stdout) + except Exception as e: + print(f"Ping test failed: {e}") + + except Exception as e: + print(f"Diagnostics failed: {e}") + + +if __name__ == "__main__": + + # Run diagnostics + mqtt_network_diagnostics() + + r = redis.Redis(host=redis_host, port=6379, db=0) + #r = redis.Redis(host='redis', port=6379, db=0) + + # This block will only run when the script is executed directly (not through OpenFaaS) + logger.info("Running in debug mode") + queue_size = get_queue_length() + # This block will only run when the script is executed directly (not through OpenFaaS) + debug_port = int(os.getenv('DEBUG_PORT', '8085')) # Use a different default port for debugging + + processing_thread = threading.Thread(target=main_loop, name="processing_thread") + processing_thread.start() + + #server_thread = threading.Thread(target=run_server, args=(debug_port,), name="server_thread") + #server_thread.start() + httpd = run_server(8085) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + with get_db_connection() as conn: + deregister_instance(conn) + +else: + + #r = redis.Redis(host='localhost', port=6379, db=0) + try: + r = redis.Redis(host=redis_host, port=6379, db=0) + except Exception as e: + print(f"redis.Redis failed: {e}") + + # This block runs when the script is imported by OpenFaaS + logger.info("Running in production mode") + processing_thread = threading.Thread(target=main_loop, name="processing_thread") + processing_thread.start() + + server_thread = threading.Thread(target=run_server, name="server_thread") + server_thread.start() diff --git a/src/data_backups/data_radar.pkl b/src/data_backups/data_radar.pkl new file mode 100644 index 0000000..256d9ec Binary files /dev/null and b/src/data_backups/data_radar.pkl differ diff --git a/src/data_backups/data_sensors.pkl b/src/data_backups/data_sensors.pkl new file mode 100644 index 0000000..7b7e4db Binary files /dev/null and b/src/data_backups/data_sensors.pkl differ diff --git a/src/data_backups_100/data_radar.pkl b/src/data_backups_100/data_radar.pkl new file mode 100644 index 0000000..d556c98 Binary files /dev/null and b/src/data_backups_100/data_radar.pkl differ diff --git a/src/new-ca.crt b/src/new-ca.crt new file mode 100644 index 0000000..fff2847 --- /dev/null +++ b/src/new-ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHTCCAgWgAwIBAgIUak6HXnrheUs5SnAXYa+jUU1l28YwDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UEAwwTZWx1eG5ldHdvcmtzLm5ldCBDQTAeFw0yNTAyMjYwNTA4 +MDFaFw0zNTAyMjQwNTA4MDFaMB4xHDAaBgNVBAMME2VsdXhuZXR3b3Jrcy5uZXQg +Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC0ooycKCciiC9CY6mP +ph+WS1I42kf3Io7kzZ1/gMFb1EJxabMlAga94NmWO9uQkwiaFQOWrH2cvLyLL9kD +kz7ZmQUij4C2sHU2CQkqG0mo8xeBxaFYmSXwAd0jYd/6GHABCF63/USWIrfUPkNt +f7HaoM6yPZ61w3Ne0G5Kfd5/HsTiRiGbpCXHUpp6NMeuG59j1Ma+eEDXPKMimKti +R3bCMI5tOsCOey6yjEP+DituitqUZZYKPmk+7cvi1tK50OGMT330P+mPZPJQxauK +dew3mhTv5iKiGYhdN5ZFUy1KVJHf3y3rmNjEWesU0X8483v4tuhhcjNIA+D8/Tcn +qKQ5AgMBAAGjUzBRMB0GA1UdDgQWBBTd6ubEStLdE60De4Re5IQENKn/aTAfBgNV +HSMEGDAWgBTd6ubEStLdE60De4Re5IQENKn/aTAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQAMkrP0zdt1uAOI1B77nV7+EZSzJxahrubo8opjrkvd +4/stCVG6OfDutxARvCdC4OoJiRPGXBiA22bIi7ZMl7DTpA+CeFEICfF3MKcf8xIT +V5sCm25dX+KHwACWNccNazpIlIAVGxMmcSs70SlMIfPksCW2FRibsVnzESQCgQcP +e6owfVvsnQNN4UADki4JMZJ1RQ/nUNM3aNJSf/SFVJYjiUXHLNJY65FfiYV4MGRi +Yq+NDPs3D0KLkwQ03FFcw56TdPnCAYuihDHkITuQDtS1DawLhuwFDDPcys5Mnigc +1Y1o5V4Z6L6xkttwgD9zP3DQJscDZ/Gki1NqYV3TvJPr +-----END CERTIFICATE----- diff --git a/timescaledb_backup.dump b/timescaledb_backup.dump new file mode 100644 index 0000000..e69de29