From 1dbb91f8ba4ab5639d45ea376a52d2dad5a35432 Mon Sep 17 00:00:00 2001 From: "RZ_MINIX\\rober" Date: Sun, 15 Jun 2025 20:31:11 -0700 Subject: [PATCH] Initial push --- .env | 16 + Dockerfile | 20 + queue-muncher.yml | 37 + redis_test.py | 233 +++++ requirements.txt | 6 + secrets/mqtt-ca-cert | 19 + src/app.py | 1307 +++++++++++++++++++++++++++ src/data_backups/data_radar.pkl | Bin 0 -> 499 bytes src/data_backups/data_sensors.pkl | Bin 0 -> 511 bytes src/data_backups_100/data_radar.pkl | Bin 0 -> 23032 bytes src/new-ca.crt | 19 + timescaledb_backup.dump | 0 12 files changed, 1657 insertions(+) create mode 100644 .env create mode 100644 Dockerfile create mode 100644 queue-muncher.yml create mode 100644 redis_test.py create mode 100644 requirements.txt create mode 100644 secrets/mqtt-ca-cert create mode 100644 src/app.py create mode 100644 src/data_backups/data_radar.pkl create mode 100644 src/data_backups/data_sensors.pkl create mode 100644 src/data_backups_100/data_radar.pkl create mode 100644 src/new-ca.crt create mode 100644 timescaledb_backup.dump diff --git a/.env b/.env new file mode 100644 index 0000000..34d5305 --- /dev/null +++ b/.env @@ -0,0 +1,16 @@ +# Database settings +DB_NAME=wellnuo +DB_USER=well_app +DB_PASSWORD=well_app_2024 +DB_HOST=192.168.68.70 +DB_PORT=5432 + +# RabbitMQ settings +RABBITMQ_URL=amqp://well_pipe:wellnuo_2024@192.168.68.70:5672// +RABBITMQ_QUEUE=evgrid-dev-demo + +# Processing settings +PROCESSING_PERIOD=300 +QUEUE_LENGTH_THRESHOLD=1000 +BATCH_SIZE=100 + diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..3a39009 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.9-alpine + +WORKDIR /app + +# Install build dependencies for psycopg2 +RUN apk add --no-cache postgresql-dev gcc python3-dev musl-dev + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Create src directory and copy application code +RUN mkdir -p /app/src +COPY src/app.py /app/src/ +COPY src/new-ca.crt /app/src/new-ca.crt + +# Expose port for health checks +EXPOSE 8086 + +CMD ["python", "src/app.py"] \ No newline at end of file diff --git a/queue-muncher.yml b/queue-muncher.yml new file mode 100644 index 0000000..8e9411d --- /dev/null +++ b/queue-muncher.yml @@ -0,0 +1,37 @@ +version: 1.0 +provider: + name: openfaas + gateway: http://192.168.68.70:8084 +functions: + queue-muncher: + image: repo.eluxnetworks.net/queue-muncher:latest + labels: + com.openfaas.scale.zero: false + network: "host" # Add this line + environment: + read_timeout: "1h" + write_timeout: "1h" + exec_timeout: "1h" + + # RabbitMQ configuration + RABBITMQ_URL: "amqp://well_pipe:wellnuo_2024@192.168.68.70:5672//" + RABBITMQ_QUEUE: "evgrid-dev-demo" + REDIS_HOST: "192.168.68.70" + # Database configuration + DB_NAME: "wellnuo" + DB_USER: "well_app" + DB_PASSWORD: "well_app_2024" + DB_HOST: "192.168.68.70" + DB_PORT: "5432" + MQTTSERVER: "192.168.68.70" + MQTT_PORT: "1883" + # Processing settings + PROCESSING_PERIOD: "300" + QUEUE_LENGTH_THRESHOLD: "1000" + BATCH_SIZE: "100" + deploy: + resources: + limits: + memory: 256Mi + requests: + memory: 128Mi diff --git a/redis_test.py b/redis_test.py new file mode 100644 index 0000000..e99d37b --- /dev/null +++ b/redis_test.py @@ -0,0 +1,233 @@ +import os +import redis +import psycopg2 +from dotenv import load_dotenv +import ast +import time + +load_dotenv() + +DB_NAME = os.getenv('DB_NAME') +DB_USER = os.getenv('DB_USER') +DB_PASSWORD = os.getenv('DB_PASSWORD') +DB_HOST = os.getenv('DB_HOST') +DB_PORT = os.getenv('DB_PORT') + +# Connect to Redis (assuming it's running on localhost with default port) +r = redis.Redis(host='localhost', port=6379, db=0) + +def GetRedisString(key_name): + try: + result = r.get(key_name).decode('utf-8') + except: + result = None + return result + +def GetRedisInt(key_name): + try: + result = int(r.get(key_name).decode('utf-8')) + except: + result = None + return result + +def GetRedisFloat(key_name): + try: + result = float(r.get(key_name).decode('utf-8')) + except: + result = None + + return result + +value = GetRedisFloat('lastseen_501') +print(value) +r.set('lastseen_510', 1.444) + +value = GetRedisFloat('lastseen_510') +print(value) + +def get_db_connection(): + return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT) + +def GetLastPointQuery(device_id, field_name, threshold_value): + """ + Generate a SQL query to find the last point in radar_readings where the specified field + meets the threshold condition. + + Parameters: + device_id (int): The device ID to filter by + field_name (str): The field to check, e.g., "s2_max", "s28_min", etc. + threshold_value (float): The threshold value to compare against + + Returns: + str: SQL query string + """ + # Parse the field name to get base field and operation + parts = field_name.split('_') + base_field = parts[0] + operation = parts[1] if len(parts) > 1 else None + + # Define the field expression based on base_field + if base_field == 's28': + field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7" + elif base_field == 'm08': + field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9" + else: + field_expr = base_field + + # Define comparison operator based on operation + operator = ">" if operation == "max" else "<" + + # Generate the SQL query + query = f""" + SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} + AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1; + """ + + return query + + +def GetLastDetected(device_id, field_name, threshold_value): + # Example usage: + + query = GetLastPointQuery(device_id, field_name, threshold_value) + print(query) + last_detected = None + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(query) + last_detected = cur.fetchone()[0].timestamp() + + return last_detected + +def UpdateLastSeen(new_message_dict): + print(new_message_dict) + + device_id_s = str(new_message_dict['device_id']) + + #matches code in well-api + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + threshold_details = GetRedisString('radar_threshold'+device_id_s) + try: + radar_threshold_list = ast.literal_eval(threshold_details) + radar_threshold_signal = radar_threshold_list[0] + radar_threshold_value = radar_threshold_list[1] + except: + #key not found so read from DB, and store to key + sql = f""" + SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s}; + """ + + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(sql) + threshold_details = cur.fetchone()[0] #cur.fetchall()# + print(threshold_details) + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + if threshold_details != None: + + threshold_details_list = ast.literal_eval(threshold_details) + radar_threshold_signal = threshold_details_list[0] + radar_threshold_value = threshold_details_list[1] + + r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value])) + + + + #lets determine if presence is detected + component = radar_threshold_signal.split("_")[0] + radar_val = 0 + if component == "s28": + radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + elif component[0] == "s": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + elif component[0] == "m": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + + if radar_val > radar_threshold_value: #seen now + r.set('lastseen_'+device_id_s, new_message_dict['time']) + + else: #not seen now, but lets determine when he was and add the key + last_seen = GetRedisFloat('lastseen_'+device_id_s) + if last_seen == None: + last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value) + r.set('lastseen_'+device_id_s, last_seen) + + +new_message_dict = {'time': 1743554317.579559, 'device_id': 499, 'radar': [[100, 100, 0, 0, 0, 2226, 2654, 425, 523, 445, 340, 436, 339, 548], [100, 0, 0, 706, 657, 547, 570, 553, 509, 499]]} + +st = time.time() +UpdateLastSeen(new_message_dict) +print(time.time()-st) + +if False: + # Simple key-value + + value = r.get('simple_key') + print(f"Simple key value: {value.decode('utf-8')}") + + # WRITING DATA TO REDIS + + # Simple key-value + r.set('simple_key', 'Hello, Redis!') + + # Setting expiration (TTL) in seconds + r.setex('expiring_key', 60, 'This will expire in 60 seconds') + + # Working with numbers + r.set('counter', 0) + r.incr('counter') # Increment by 1 + r.incrby('counter', 5) # Increment by 5 + + # Lists + r.rpush('my_list', 'item1') + r.rpush('my_list', 'item2', 'item3') # Add multiple items + + # Sets + r.sadd('my_set', 'member1', 'member2', 'member3') + + # Hashes (dictionaries) + r.hset('user:1000', mapping={ + 'username': 'john_doe', + 'email': 'john@example.com', + 'visits': 10 + }) + + # READING DATA FROM REDIS + + # Simple key-value + value = r.get('simple_key') + print(f"Simple key value: {value.decode('utf-8')}") + + # Check current counter value + counter = r.get('counter') + print(f"Counter value: {counter.decode('utf-8')}") + + # Lists + all_items = r.lrange('my_list', 0, -1) # Get all items + print("List items:", [item.decode('utf-8') for item in all_items]) + + # Sets + all_members = r.smembers('my_set') + print("Set members:", [member.decode('utf-8') for member in all_members]) + + # Check if a member exists in a set + is_member = r.sismember('my_set', 'member1') + print(f"Is 'member1' in set? {is_member}") + + # Hashes + username = r.hget('user:1000', 'username') + print(f"Username: {username.decode('utf-8')}") + + # Get all hash fields + user_data = r.hgetall('user:1000') + print("User data:", {k.decode('utf-8'): v.decode('utf-8') for k, v in user_data.items()}) + + # Check remaining TTL for expiring key (in seconds) + ttl = r.ttl('expiring_key') + print(f"Expiring key TTL: {ttl} seconds") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c466b8f --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +paho-mqtt +kombu>=5.0.0 +psycopg2-binary>=2.8.6 +python-dotenv>=0.15.0 +prometheus-client>=0.11.0 +redis diff --git a/secrets/mqtt-ca-cert b/secrets/mqtt-ca-cert new file mode 100644 index 0000000..fff2847 --- /dev/null +++ b/secrets/mqtt-ca-cert @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHTCCAgWgAwIBAgIUak6HXnrheUs5SnAXYa+jUU1l28YwDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UEAwwTZWx1eG5ldHdvcmtzLm5ldCBDQTAeFw0yNTAyMjYwNTA4 +MDFaFw0zNTAyMjQwNTA4MDFaMB4xHDAaBgNVBAMME2VsdXhuZXR3b3Jrcy5uZXQg +Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC0ooycKCciiC9CY6mP +ph+WS1I42kf3Io7kzZ1/gMFb1EJxabMlAga94NmWO9uQkwiaFQOWrH2cvLyLL9kD +kz7ZmQUij4C2sHU2CQkqG0mo8xeBxaFYmSXwAd0jYd/6GHABCF63/USWIrfUPkNt +f7HaoM6yPZ61w3Ne0G5Kfd5/HsTiRiGbpCXHUpp6NMeuG59j1Ma+eEDXPKMimKti +R3bCMI5tOsCOey6yjEP+DituitqUZZYKPmk+7cvi1tK50OGMT330P+mPZPJQxauK +dew3mhTv5iKiGYhdN5ZFUy1KVJHf3y3rmNjEWesU0X8483v4tuhhcjNIA+D8/Tcn +qKQ5AgMBAAGjUzBRMB0GA1UdDgQWBBTd6ubEStLdE60De4Re5IQENKn/aTAfBgNV +HSMEGDAWgBTd6ubEStLdE60De4Re5IQENKn/aTAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQAMkrP0zdt1uAOI1B77nV7+EZSzJxahrubo8opjrkvd +4/stCVG6OfDutxARvCdC4OoJiRPGXBiA22bIi7ZMl7DTpA+CeFEICfF3MKcf8xIT +V5sCm25dX+KHwACWNccNazpIlIAVGxMmcSs70SlMIfPksCW2FRibsVnzESQCgQcP +e6owfVvsnQNN4UADki4JMZJ1RQ/nUNM3aNJSf/SFVJYjiUXHLNJY65FfiYV4MGRi +Yq+NDPs3D0KLkwQ03FFcw56TdPnCAYuihDHkITuQDtS1DawLhuwFDDPcys5Mnigc +1Y1o5V4Z6L6xkttwgD9zP3DQJscDZ/Gki1NqYV3TvJPr +-----END CERTIFICATE----- diff --git a/src/app.py b/src/app.py new file mode 100644 index 0000000..1b72122 --- /dev/null +++ b/src/app.py @@ -0,0 +1,1307 @@ +import signal +import json +import redis +import ast +import sys +import os +import json +import random +from dotenv import load_dotenv +from kombu import Connection #, Exchange, Queue, Consumer, Message +from kombu.exceptions import TimeoutError +from kombu.serialization import register +import paho.mqtt.client as mqtt +from socket import timeout as SocketTimeout +import socket +import psycopg2 +from psycopg2.extras import execute_values +import base64 +import struct +import time +from datetime import datetime, timezone +import traceback +from http.server import HTTPServer, BaseHTTPRequestHandler +import threading + +import time +import logging + +import uuid + +client = None # Initialize to None, not a string +connected = False + + +try: + from prometheus_client import generate_latest, start_http_server, CONTENT_TYPE_LATEST, Gauge, Counter, REGISTRY + prometheus_available = True +except: + prometheus_available = False + +keep_processing = True + + +# Set up logging +default_uuid = str(uuid.uuid4()) +logger = logging.getLogger(default_uuid) +logger.propagate = False +logger.setLevel(logging.DEBUG) #DEBUG, INFO, WARNING, ERROR, and CRITICAL +handler = logging.StreamHandler() +#formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s') +formatter = logging.Formatter('%(levelname)s - %(message)s') +handler.setFormatter(formatter) +logger.addHandler(handler) + +INSTANCE_ID = f"queue-muncher-{default_uuid}" +last_heartbeat = 0 + +logger.warning("Script started") +load_dotenv() + +# Global flag to control the queue processing loop +running = True + +# RabbitMQ configuration +RABBITMQ_URL = os.getenv('RABBITMQ_URL') + +if False: + QUEUE_NAME = "well-devices"#os.getenv('RABBITMQ_QUEUE') +else: + QUEUE_NAME = os.getenv('RABBITMQ_QUEUE') + +DB_NAME = os.getenv('DB_NAME') +DB_USER = os.getenv('DB_USER') +DB_PASSWORD = os.getenv('DB_PASSWORD') +DB_HOST = os.getenv('DB_HOST') +DB_PORT = os.getenv('DB_PORT') +PROCESSING_PERIOD = int(os.getenv('PROCESSING_PERIOD', '300')) # 5 minutes +QUEUE_LENGTH_THRESHOLD = int(os.getenv('QUEUE_LENGTH_THRESHOLD', '1000')) +redis_host = os.getenv('REDIS_HOST', '192.168.68.70') +redis_host = '192.168.68.70' +cache_dir = "cache" +MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net" +mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port + +use_tls = MQTTSERVER not in ["localhost", "127.0.0.1", "192.168.68.70"] + +mqtt_client_id = "client9-authn-ID" + +MACS2id = {} +ids2MAC = {} +BATCH_SIZE = int(os.getenv('BATCH_SIZE', '50')) + + +logger.info(f"DB_NAME= {DB_NAME}") +logger.info(f"DB_USER= {DB_USER}") +logger.info(f"DB_PASSWORD= {DB_PASSWORD}") +logger.info(f"DB_HOST= {DB_HOST}") +logger.info(f"DB_PORT= {DB_PORT}") +logger.info(f"PROCESSING_PERIOD= {PROCESSING_PERIOD}") +logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") +logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") +logger.info(f"BATCH_SIZE= {BATCH_SIZE}") + +logger.info(f"mqtt_port= {mqtt_port}") +logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") +logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") +logger.info(f"QUEUE_NAME= {QUEUE_NAME}") +logger.info(f"RABBITMQ_URL= {RABBITMQ_URL}") + +# Set up Prometheus metrics if available +if prometheus_available: + QUEUE_SIZE = Gauge('queue_muncher_queue_size', 'Number of messages in the RabbitMQ queue') + MESSAGES_PROCESSED = Counter('queue_muncher_messages_processed', 'Number of messages processed') + PROCESSING_TIME = Gauge('queue_muncher_processing_time', 'Time spent processing messages in seconds') + BATCH_SIZE_GAUGE = Gauge('queue_muncher_batch_size', 'Size of used batch') + logger.info(f"QUEUE_SIZE= {QUEUE_SIZE}") + logger.info(f"MESSAGES_PROCESSED= {MESSAGES_PROCESSED}") + logger.info(f"PROCESSING_TIME= {PROCESSING_TIME}") + logger.info(f"BATCH_SIZE_GAUGE= {BATCH_SIZE_GAUGE}") +else: + logger.info(f"prometheus_available= {prometheus_available}") + +class HealthCheckHandler(BaseHTTPRequestHandler): + def do_GET(self): + logger.info(f"in do_GET {self.path}") + if '/health' in self.path: + self.send_response(200) + self.send_header('Content-type', 'application/json') + self.end_headers() + response = {"status": "healthy"} + self.wfile.write(json.dumps(response).encode()) + else: + self.send_response(404) + self.send_header('Content-type', 'text/plain') + self.end_headers() + self.wfile.write(b'Not Found') + + # Suppress console log messages for each request + def log_message(self, format, *args): + return + +def GetRedisString(key_name): + try: + result = r.get(key_name).decode('utf-8') + except: + result = None + return result + +def GetRedisInt(key_name): + try: + result = int(r.get(key_name).decode('utf-8')) + except: + result = None + return result + +def GetRedisFloat(key_name): + try: + result = float(r.get(key_name).decode('utf-8')) + except: + result = None + + return result + + +def run_server(port=8080): + server_address = ('', port) + httpd = HTTPServer(server_address, HealthCheckHandler) + print(f"Health check server running on port {port}") + server_thread = threading.Thread(target=httpd.serve_forever) + server_thread.daemon = True + server_thread.start() + return httpd + +def process_queue(): + while running: + timestamp = datetime.now().isoformat() + print(f"[{timestamp}] Simulated Queue muncher processing...") + time.sleep(1) + + +def MQSend(topic, content): + global client, mqtt_client_id, connected + + # Start a separate thread for MQTT sending to avoid blocking RabbitMQ processing + mqtt_thread = threading.Thread( + target=MQSend_worker, + args=(topic, content), + daemon=True + ) + mqtt_thread.start() + # Return immediately so RabbitMQ processing can continue + return + +def MQSend_worker(topic, content): + """Worker function that handles MQTT sending in a separate thread""" + global client, mqtt_client_id, connected + + currentTime = int(time.time()) + #print(">", currentTime, topic, content) + + # Limit connection attempts to prevent flooding + max_connection_attempts = 3 + connection_attempts = 0 + + # Use a local semaphore to prevent multiple simultaneous connection attempts + if not hasattr(MQSend_worker, 'connection_in_progress'): + MQSend_worker.connection_in_progress = False + + try: + # Only attempt connection if not already in progress + if client is None or not connected: + if not MQSend_worker.connection_in_progress and connection_attempts < max_connection_attempts: + MQSend_worker.connection_in_progress = True + try: + print(f"Thread {threading.current_thread().name} attempting MQTT connection") + client = connect_mqtt(mqtt_client_id) + connection_attempts += 1 + finally: + MQSend_worker.connection_in_progress = False + + # If we have a valid connection, try to send + if connected and client is not None: + try: + result = client.publish(topic, content, qos=0, retain=False) # Lower QoS for better throughput + if hasattr(result, 'rc') and result.rc == mqtt.MQTT_ERR_SUCCESS: + pass + #print(f"Successfully published to {topic}") + else: + print(f"Failed to publish to {topic}") + except Exception as err: + print(f"Error in MQSend publish: {str(err)}") + else: + print("MQTT not connected, message not sent") + + except Exception as err: + print(f"Error in MQSend_worker: {str(err)}") + + +def connect_mqtt(client_id): + global client, connected + + # Add timeout for the entire operation + connection_timeout = 5 # 5 seconds max for the whole connection attempt + start_time = time.time() + + try: + random_suffix = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=6)) + unique_client_id = f"{random_suffix}-{int(time.time())}" + print(f"Connecting {unique_client_id}") + + # Determine certificate path based on environment + certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt' + + # Create client with MQTTv5 protocol + mqtt_client = mqtt.Client(client_id=unique_client_id, protocol=mqtt.MQTTv5) + + # Define callbacks with MQTTv5 signature + def on_connect_wrapper(client, userdata, flags, rc, properties=None): + global connected + if rc == 0: + connected = True + print(f"Connected to {MQTTSERVER} with {unique_client_id}") + subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1)] + print("subscribing to ", subbedTo) + client.subscribe(subbedTo) + else: + print(f"Failed to connect, return code {rc}") + connected = False + + def on_disconnect_wrapper(client, userdata, rc, properties=None): + global connected + connected = False + print(f"Disconnected with result code {rc}") + + # Set the callbacks + mqtt_client.on_connect = on_connect_wrapper + mqtt_client.on_disconnect = on_disconnect_wrapper + + + if use_tls: + certificate_path = '/app/src/new-ca.crt' if os.path.exists('/app/src/new-ca.crt') else 'new-ca.crt' + mqtt_client.tls_set(ca_certs=certificate_path) + + # Set TLS with your certificate + #mqtt_client.tls_set( + # ca_certs=certificate_path, + #) + + # Set username and password + mqtt_client.username_pw_set("well_user", "We3l1_best!") + + # Connect with shorter timeout + mqtt_client.connect(MQTTSERVER, mqtt_port, keepalive=30) + + # Start the loop in a background thread + mqtt_client.loop_start() + + # Wait for connection with timeout + wait_end_time = min(start_time + connection_timeout, time.time() + 3) + while not connected and time.time() < wait_end_time: + time.sleep(0.1) + + # Check if we connected successfully + if connected: + print(f"MQTT connected successfully in {time.time() - start_time:.2f} seconds") + client = mqtt_client + return mqtt_client + else: + print(f"MQTT connection attempt timed out after {time.time() - start_time:.2f} seconds") + mqtt_client.loop_stop() + connected = False + return None + + except Exception as e: + connection_time = time.time() - start_time + print(f"Connection error after {connection_time:.2f} seconds: {e}") + connected = False + return None + +def ensure_mqtt_connection(): + global client, mqtt_client_id, connected + + if not connected: + print("MQTT connection needed, attempting to connect...") + try: + if client: + try: + client.disconnect() + client.loop_stop() + except: + pass # Ignore errors on disconnect + + client = connect_mqtt(mqtt_client_id) + + # Give a moment for connection to establish + time.sleep(2) + + return connected + except Exception as e: + print(f"Reconnection error: {e}") + return False + + return True # Already connected + +# Define the watchdog function after the connect_mqtt function is defined +def mqtt_watchdog(): + """Thread to monitor and maintain MQTT connection""" + global client, mqtt_client_id, connected + + while True: + if not connected: + try: + print("MQTT watchdog: Reconnecting...") + try: + # Only try to disconnect if client is a proper object + if hasattr(client, 'disconnect') and callable(client.disconnect): + client.disconnect() + except: + pass # Ignore errors during disconnect + + # Use the global connect_mqtt function + + client = connect_mqtt(mqtt_client_id) + except Exception as e: + print(f"MQTT watchdog: Reconnection failed: {e}") + + time.sleep(30) # Check every 30 seconds + +# Start the watchdog thread +#mqtt_watchdog_thread = threading.Thread(target=mqtt_watchdog, daemon=True) +#mqtt_watchdog_thread.start() + +def signal_handler(sig, frame): + global running + print("Shutting down gracefully...") + running = False + httpd.shutdown() + print("HTTP server closed") + sys.exit(0) + +def monitor_db_connection(conn): + """Test the database connection and return diagnostic information.""" + try: + # Create a new cursor + with conn.cursor() as cursor: + # Simple query to check connection + cursor.execute("SELECT 1") + result = cursor.fetchone() + + # Get connection info + cursor.execute(""" + SELECT + datname, + usename, + client_addr, + state, + backend_start, + xact_start, + query_start, + state_change + FROM + pg_stat_activity + WHERE + pid = pg_backend_pid() + """) + conn_info = cursor.fetchone() + + return { + "status": "connected" if result == (1,) else "error", + "backend_pid": conn_info[0] if conn_info else None, + "database": conn_info[0] if conn_info else None, + "user": conn_info[1] if conn_info else None, + "client_addr": conn_info[2] if conn_info else None, + "state": conn_info[3] if conn_info else None, + "backend_start": conn_info[4] if conn_info else None, + "xact_start": conn_info[5] if conn_info else None, + "query_start": conn_info[6] if conn_info else None, + "state_change": conn_info[7] if conn_info else None + } + except Exception as e: + return { + "status": "error", + "error_type": type(e).__name__, + "error_message": str(e) + } + +def check_connection_periodically(conn, interval=5): + """Check connection health periodically and log results.""" + last_check_time = 0 + while True: + current_time = time.time() + if current_time - last_check_time >= interval: + last_check_time = current_time + try: + status = monitor_db_connection(conn) + if status["status"] == "connected": + logger.debug(f"DB connection healthy: pid={status.get('backend_pid')}, state={status.get('state')}") + else: + logger.warning(f"DB connection issue: {status}") + except Exception as e: + logger.error(f"Failed to check DB connection: {e}") + yield # Allow other operations to proceed + +def process_messages(connection, db_conn): + global last_heartbeat + logger.info("Starting batch processing") + + # Start connection monitoring + connection_monitor = check_connection_periodically(db_conn, interval=3) + connection_health_thread = threading.Thread( + target=lambda: [next(connection_monitor) or time.sleep(0.1) for _ in range(int(PROCESSING_PERIOD * 10))], + daemon=True + ) + connection_health_thread.start() + + + try: + # Get a channel + channel = connection.channel() + logger.info("Channel created") + + # Basic QoS with all required parameters + channel.basic_qos( + prefetch_size=0, + prefetch_count=1, + a_global=False + ) + logger.info("Basic QoS set") + + data_batches = { + 'sensors': [], + 'radar': [], + 'temperature': [], + 'humidity': [], + 'pressure': [], + 'light': [] + } + + #def diagnostic_callback(msg): + #"""Simple diagnostic callback that just acknowledges messages""" + #logger.info("Diagnostic callback received message") + #channel.basic_ack(delivery_tag=msg.delivery_info['delivery_tag']) + #MESSAGES_PROCESSED.inc() + + ## Use this instead of your normal callback for testing + #channel.basic_consume( + #queue=QUEUE_NAME, + #callback=diagnostic_callback, + #no_ack=False + #) + + def callback(msg): + """ + Callback function for processing messages + msg: amqp.basic_message.Message object containing the message data + """ + try: + #logger.info("\n=== Message Received ===") + #logger.info(f"Delivery info: {msg.delivery_info}") + #logger.info(f"Properties: {msg.properties}") + #logger.info(f"Body type: {type(msg.body)}") + #logger.info(f"Body length: {len(msg.body) if msg.body else 0}") + + if isinstance(msg.body, (bytes, bytearray)): + hex_dump = ' '.join(f'{b:02x}' for b in msg.body[:32]) + #logger.info(f"First 32 bytes (hex): {hex_dump}") + data = {'data_base64': base64.b64encode(msg.body).decode()} + else: + if isinstance(msg.body, str): + if msg.body[0] == "{": + original_data = msg.body.encode() + data = {'data_base64': base64.b64encode(original_data).decode()} + else: + data = msg.body + #when from well_cloud_server + #'{"id": "c9afe11a-ffe2-4a95-ad9b-828deffa70b5", "source": "well_cloud_server", "type": "MQTT.EventPublished", "data_base64": "f88ad36750980e0064b708890414ed1300001104557844f628ee41ea0136eb52410a00db97ca480193f2a54c02cde59f4c03c97db04c04ec54974a051f64914a0631aa8b4a07d39da74808654fb4480931ddc048", "time": "2025-03-14T01:48:41.000+00:00", "specversion": "1.0", "datacontenttype": "application/octet-stream", "subject": "wellnuotopics/topic1"}' + + with PROCESSING_TIME.time(): + with db_conn.cursor() as cursor: + data_type, parsed_data = process_message_data(cursor, data) + if data_type and parsed_data: + #logger.info(f"Successfully processed as {data_type}") + MQSend("/"+ids2MAC[parsed_data["device_id"]], json.dumps(parsed_data)) + if data_type in data_batches: + data_batches[data_type].append(parsed_data) + batch_size = len(data_batches[data_type]) + #logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}") + + if batch_size >= BATCH_SIZE: + #logger.info(f"Processing {data_type} batch") + + + #filename = f"data_{data_type}.pkl" # Example: data_temperature.pkl + #backup_dir = "data_backups_100" # Optional: Place backups in a subdirectory + + #os.makedirs(backup_dir, exist_ok=True) # Create directory if it doesn't exist + #full_filename = os.path.join(backup_dir, filename) + + + ## 2. Pickle the data + #try: + #with open(full_filename, 'wb') as f: + #pickle.dump(data_batches, f) + #print(f"Data of type '{data_type}' pickled to: {full_filename}") # Confirmation message + + #except IOError as e: + #print(f"Error writing pickle file: {e}") + #raise # Re-raise the exception for handling upstream. + + + insert_into_timescaledb_My(cursor, data_type, data_batches[data_type]) + db_conn.commit() + data_batches[data_type].clear() + + channel.basic_ack(delivery_tag=msg.delivery_info['delivery_tag']) + MESSAGES_PROCESSED.inc() + #logger.info("Message acknowledged") + + except Exception as e: + logger.error(f"Message processing error: {str(e)}") + logger.exception("Full traceback:") + # Reject the message without requeue if we can't process it + channel.basic_reject(delivery_tag=msg.delivery_info['delivery_tag'], requeue=False) + + # Try to get queue info + try: + queue_info = channel.queue_declare(queue=QUEUE_NAME, passive=True) + logger.info(f"Queue info: message_count={queue_info.message_count}, consumer_count={queue_info.consumer_count}") + except Exception as e: + logger.error(f"Error getting queue info: {e}") + raise + + # Basic consume with correct callback signature + logger.info(f"Starting basic_consume on queue {QUEUE_NAME}") + channel.basic_consume( + queue=QUEUE_NAME, + callback=callback, + no_ack=False + ) + + logger.info("Starting to consume messages...") + + start_time = time.time() + last_queue_check = 0 + queue_check_interval = 10 + + # Start consuming with timeout checks + while time.time() - start_time < PROCESSING_PERIOD: + try: + # Process messages with very short timeout + #connection.drain_events(timeout=0.1) + + try: + # Log before attempting to process + connection.drain_events(timeout=0.1) + except (socket.timeout, SocketTimeout, TimeoutError): + continue + except Exception as e: + logger.error(f"Error during drain_events: {e}") + continue + + + # Periodic queue check + current_time = time.time() + if current_time - last_queue_check > queue_check_interval: + try: + queue_info = channel.queue_declare(queue=QUEUE_NAME, passive=True) + logger.info(f"Queue status - Messages: {queue_info.message_count}, " + f"Consumers: {queue_info.consumer_count}") + last_queue_check = current_time + except Exception as e: + logger.error(f"Error checking queue status: {e}") + + # Heartbeat update + if time.time() - last_heartbeat > 10: + update_heartbeat(db_conn) + last_heartbeat = time.time() + logger.info("Heartbeat updated") + + except (socket.timeout, SocketTimeout, TimeoutError): + continue # Just continue on timeout + + except ConnectionError as e: + logger.error(f"Connection error: {e}") + raise + + logger.info("Consumption period ended, closing channel") + channel.close() + + except Exception as e: + logger.error(f"Critical error in process_messages: {e}") + logger.exception("Full traceback:") + raise + finally: + # Process any remaining batches + with db_conn.cursor() as cursor: + for data_type, batch in data_batches.items(): + if batch: + insert_into_timescaledb(cursor, data_type, batch) + db_conn.commit() + + logger.info("Batch processing completed") + +################################################################################################# +def main_loop(): + global keep_processing, client + + print("Initializing MQTT client at startup") + client = connect_mqtt(mqtt_client_id) + + while keep_processing: + try: + with Connection(RABBITMQ_URL) as rabbit_conn: + # Get a fresh connection for each batch processing session + db_conn = get_db_connection() + #process_messages(rabbit_conn, db_conn, max_processing_time=60) # Shorter processing window + process_messages(rabbit_conn, db_conn) # Shorter processing window + + # Check if we should continue after each batch + if not should_continue_processing(db_conn): + logger.info("Preparing to exit...") + keep_processing = False + deregister_instance(db_conn) + break + except Exception as e: + logger.error(f"Error in main loop: {e}") + time.sleep(5) # Wait before retrying + +def insert_into_timescaledb(cursor, data_type, data_batch): + if not data_batch: + return + + # Use extremely small batches to test + MAX_BATCH = 5 + + total_inserted = 0 + + for i in range(0, len(data_batch), MAX_BATCH): + sub_batch = data_batch[i:i+MAX_BATCH] + + # Prepare query and values based on data type + if data_type == 'sensors': + query = """ + INSERT INTO sensor_readings (time, device_id, temperature, humidity, pressure, light, + s0, s1, s2, s3, s4, s5, s6, s7, s8, s9, mtype) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), + row.get('device_id'), row.get('temperature'), row.get('humidity'), row.get('pressure'), row.get('light'), + *(row.get('smell', [None]*10)), row.get('mtype')) for row in sub_batch] + + elif data_type == 'radar': + query = """ + INSERT INTO radar_readings (time, device_id, absent, moving, stationary, "both", + m0, m1, m2, m3, m4, m5, m6, m7, m8, + s0, s1, s2, s3, s4, s5, s6, s7, s8) + VALUES %s + """ + values = [] + for row in sub_batch: + motion_energy = row['radar'][0] + stationary_energy = row['radar'][1] + + # Perform element-wise division + absent = motion_energy[1] / motion_energy[0] if motion_energy[0] != 0 else 0 + moving = motion_energy[2] / motion_energy[0] if motion_energy[0] != 0 else 0 + stationary = motion_energy[3] / motion_energy[0] if motion_energy[0] != 0 else 0 + both = motion_energy[4] / motion_energy[0] if motion_energy[0] != 0 else 0 + + m_values = [v / motion_energy[0] if motion_energy[0] != 0 else 0 for v in motion_energy[5:14]] + s_values = [v / stationary_energy[0] if stationary_energy[0] != 0 else 0 for v in stationary_energy[1:10]] + + values.append(( + datetime.fromtimestamp(row['time'], tz=timezone.utc), + row['device_id'], + absent, moving, stationary, both, + *m_values, + *s_values + )) + + elif data_type in ['temperature', 'humidity', 'pressure', 'light']: + query = f""" + INSERT INTO sensor_readings (time, device_id, {data_type}) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], + row.get(data_type)) for row in sub_batch] + else: + raise ValueError(f"Unknown data type: {data_type}") + + try: + # Add explicit transaction control + cursor.execute("BEGIN") + execute_values(cursor, query, values) + cursor.execute("COMMIT") + + batch_num = i//MAX_BATCH + 1 + total_inserted += len(values) + #logger.warning(f"Inserted {len(values)} {data_type} readings (batch {batch_num}, total: {total_inserted}/{len(data_batch)})") + + except Exception as e: + # Roll back transaction on error + try: + cursor.execute("ROLLBACK") + except Exception as rollback_error: + logger.error(f"Error during rollback: {rollback_error}") + + logger.error(f"Error inserting {data_type} data: {e}") + if values: + logger.error(f"Problematic values: {values}") + + # Re-raise the exception so the caller can handle it + raise + + return total_inserted + +def insert_into_timescaledb_My(cursor, data_type, data_batch): + if not data_batch: + return + + if data_type == 'sensors': + query = """ + INSERT INTO sensor_readings (time, device_id, temperature, humidity, pressure, light, + s0, s1, s2, s3, s4, s5, s6, s7, s8, s9, mtype) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), + row.get('device_id'), row.get('temperature'), row.get('humidity'), row.get('pressure'), row.get('light'), + *(row.get('smell', [None]*10)), row.get('mtype')) for row in data_batch] + + elif data_type == 'radar': + query = """ + INSERT INTO radar_readings (time, device_id, absent, moving, stationary, "both", + m0, m1, m2, m3, m4, m5, m6, m7, m8, + s0, s1, s2, s3, s4, s5, s6, s7, s8) + VALUES %s + """ + values = [] + for row in data_batch: + motion_energy = row['radar'][0] + stationary_energy = row['radar'][1] + + # Perform element-wise division + absent = motion_energy[1] / motion_energy[0] if motion_energy[0] != 0 else 0 + moving = motion_energy[2] / motion_energy[0] if motion_energy[0] != 0 else 0 + stationary = motion_energy[3] / motion_energy[0] if motion_energy[0] != 0 else 0 + both = motion_energy[4] / motion_energy[0] if motion_energy[0] != 0 else 0 + + m_values = [v / motion_energy[0] if motion_energy[0] != 0 else 0 for v in motion_energy[5:14]] + s_values = [v / stationary_energy[0] if stationary_energy[0] != 0 else 0 for v in stationary_energy[1:10]] + + values.append(( + datetime.fromtimestamp(row['time'], tz=timezone.utc), + row['device_id'], + absent, moving, stationary, both, + *m_values, + *s_values + )) + + elif data_type in ['temperature', 'humidity', 'pressure', 'light']: + query = f""" + INSERT INTO sensor_readings (time, device_id, {data_type}) + VALUES %s + """ + values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], + row.get(data_type)) for row in data_batch] + + else: + raise ValueError(f"Unknown data type: {data_type}") + + try: + execute_values(cursor, query, values) + #logger.warning(f"Inserted {len(values)} {data_type} readings") + except Exception as e: + print(f"Error inserting {data_type} data: {e}") + print(f"Problematic values: {values[:5]}...") # Print first 5 values for debugging + + logger.error(f"Error inserting {data_type} data: {e}") + logger.error(f"Problematic values: {values[:5]}...") # Print first 5 values for debugging + raise + +def GetLastPointQuery(device_id, field_name, threshold_value): + """ + Generate a SQL query to find the last point in radar_readings where the specified field + meets the threshold condition. + + Parameters: + device_id (int): The device ID to filter by + field_name (str): The field to check, e.g., "s2_max", "s28_min", etc. + threshold_value (float): The threshold value to compare against + + Returns: + str: SQL query string + """ + # Parse the field name to get base field and operation + parts = field_name.split('_') + base_field = parts[0] + operation = parts[1] if len(parts) > 1 else None + + # Define the field expression based on base_field + if base_field == 's28': + field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7" + elif base_field == 'm08': + field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9" + else: + field_expr = base_field + + # Define comparison operator based on operation + operator = ">" if operation == "max" else "<" + + # Generate the SQL query + query = f""" + SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id} + AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1; + """ + + return query + +def GetIdFromMAC(cursor, MAC): + try: + query = "SELECT device_id FROM devices WHERE device_mac = '" + MAC + "'" + + cursor.execute(query) + result = cursor.fetchone() + if result and len(result) > 0: + return result[0] + else: + query = f""" + INSERT INTO public.devices (device_mac, well_id, description) VALUES ( + '{MAC}', (SELECT COALESCE(MAX(well_id), 0) + 1 FROM public.devices), '') + RETURNING well_id, device_id + """ + cursor.execute(query) + result = cursor.fetchone() + cursor.connection.commit() # Use cursor's connection, not undefined 'conn' + new_device_id = result[1] + return new_device_id + + except Exception as e: + logger.error(f"Error in GetIdFromMAC sql= {query} {e}") + return 0 + +def GetLastDetected(device_id, field_name, threshold_value): + # Example usage: + + query = GetLastPointQuery(device_id, field_name, threshold_value) + print(query) + last_detected = None + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(query) + last_detected = cur.fetchone()[0].timestamp() + + return last_detected + +def UpdateLastSeen(new_message_dict): + print(new_message_dict) + + device_id_s = str(new_message_dict['device_id']) + + #matches code in well-api + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + threshold_details = GetRedisString('radar_threshold'+device_id_s) + try: + radar_threshold_list = ast.literal_eval(threshold_details) + radar_threshold_signal = radar_threshold_list[0] + radar_threshold_value = radar_threshold_list[1] + except: + #key not found so read from DB, and store to key + sql = f""" + SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s}; + """ + + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(sql) + threshold_details = cur.fetchone()[0] #cur.fetchall()# + print(threshold_details) + radar_threshold_signal = "s3_max" + radar_threshold_value = 10 + + if threshold_details != None: + + threshold_details_list = ast.literal_eval(threshold_details) + radar_threshold_signal = threshold_details_list[0] + radar_threshold_value = threshold_details_list[1] + + r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value])) + + + try: + + #lets determine if presence is detected + component = radar_threshold_signal.split("_")[0] + radar_val = 0 + if component == "s28": + radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + elif component[0] == "s": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + elif component[0] == "m": + component_index = ast.literal_eval(component[1]) + radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + + if radar_val > radar_threshold_value: #seen now + r.set('lastseen_'+device_id_s, new_message_dict['time']) + + else: #not seen now, but lets determine when he was and add the key + last_seen = GetRedisFloat('lastseen_'+device_id_s) + if last_seen == None: + last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value) + r.set('lastseen_'+device_id_s, last_seen) + + + except Exception as e: + print(f"UpdateLastSeen failed: {e}") + +def process_message_data(cursor, body): + + global MACS2id, ids2MAC + """Process a single message from the queue.""" + try: + if isinstance(body, dict): + data = body + else: + data = json.loads(body) + + if "data_base64" in data: + + decrypt_data = data["data_base64"] + #print(data["time"]) + decrypt_data = base64.b64decode(decrypt_data) + decrypt_data_old = decrypt_data + if isinstance(decrypt_data, bytes): + decrypt_data = decrypt_data.decode('utf-8') + m_type = 0 + new_message_dict = {} + #print(decrypt_data) + if len(decrypt_data) > 14: + + if "data_base64" in decrypt_data: + data = json.loads(decrypt_data) + payload = data["data_base64"] + decrypt_data = base64.b64decode(payload) + + if len(decrypt_data) > 14: + pointer = 0 + lenn = 4 + #print ("") + seconds = struct.unpack(' 0: + + + new_message_dict = {"time":packet_time, "device_id":device_id} + + line_entry = str(packet_time) + dcr_l = len(decrypt_data) + lenn_1 = 1 + + message_type = struct.unpack(' 0: + st = time.time() + if message_type == 16: #Radar message + m_type = "radar" + #counter, no targets, moving targets, stationary targets, both, 9 moving gates energy + #counter, 9 stationary gates energy + pointer = pointer + lenn_1 + motion_energy = [0] * 14 + stationary_energy = [0] * 10 + #print("@1",time.time()-st) + lenn = 2 + for i in range(14): + V = struct.unpack('= 0x20 and message_type <= 0x2F): #Sensors message + #work_dict = dict(common_dict) + #print("@4",time.time()-st) + m_type = "sensors" + pointer = pointer + lenn_1 + + lenn = 4 + P = struct.unpack(' NOW() - INTERVAL '60 seconds' + """) + + return cur.fetchone()[0] + +def register_instance(conn): + + with conn.cursor() as cur: + cur.execute(""" + INSERT INTO queue_muncher_instances (instance_id, last_heartbeat, status, container_id) + VALUES (%s, NOW(), 'active', %s) + ON CONFLICT (instance_id) DO UPDATE + SET last_heartbeat = NOW(), status = 'active' + """, (INSTANCE_ID, "")) + conn.commit() + + +def should_continue_processing(conn): + queue_length = get_queue_length() + instance_count = get_active_instances(conn) + + logger.info(f"{INSTANCE_ID}Current queue length: {queue_length}, Instance count: {instance_count}") + + if queue_length > QUEUE_LENGTH_THRESHOLD: + return True + elif instance_count > 1: + return False + else: + return True # If this is the last instance, it should continue processing + +def deregister_instance(conn): + with conn.cursor() as cur: + cur.execute(""" + UPDATE queue_muncher_instances + SET status = 'inactive' + WHERE instance_id = %s + """, (INSTANCE_ID,)) + conn.commit() + +client = connect_mqtt(mqtt_client_id) + +def mqtt_network_diagnostics(): + """Run network diagnostics to troubleshoot MQTT connection""" + try: + print("Running MQTT connection diagnostics...") + + # Try DNS resolution + try: + import socket + ip_address = socket.gethostbyname(MQTTSERVER) + print(f"DNS resolution for {MQTTSERVER}: {ip_address}") + except Exception as e: + print(f"DNS resolution failed: {e}") + + # Try a basic socket connection to test reachability + try: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5) + result = s.connect_ex((MQTTSERVER, mqtt_port)) + if result == 0: + print(f"Socket connection to {MQTTSERVER}:{mqtt_port} successful") + else: + print(f"Socket connection failed with error code: {result}") + s.close() + except Exception as e: + print(f"Socket test failed: {e}") + + # Try a ping (requires shell access) + try: + import subprocess + ping_result = subprocess.run(["ping", "-c", "1", "-W", "5", MQTTSERVER], + capture_output=True, text=True, timeout=6) + print(f"Ping result: {ping_result.returncode}") + print(ping_result.stdout) + except Exception as e: + print(f"Ping test failed: {e}") + + except Exception as e: + print(f"Diagnostics failed: {e}") + + +if __name__ == "__main__": + + # Run diagnostics + mqtt_network_diagnostics() + + r = redis.Redis(host=redis_host, port=6379, db=0) + #r = redis.Redis(host='redis', port=6379, db=0) + + # This block will only run when the script is executed directly (not through OpenFaaS) + logger.info("Running in debug mode") + queue_size = get_queue_length() + # This block will only run when the script is executed directly (not through OpenFaaS) + debug_port = int(os.getenv('DEBUG_PORT', '8085')) # Use a different default port for debugging + + processing_thread = threading.Thread(target=main_loop, name="processing_thread") + processing_thread.start() + + #server_thread = threading.Thread(target=run_server, args=(debug_port,), name="server_thread") + #server_thread.start() + httpd = run_server(8085) + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Shutting down...") + with get_db_connection() as conn: + deregister_instance(conn) + +else: + + #r = redis.Redis(host='localhost', port=6379, db=0) + try: + r = redis.Redis(host=redis_host, port=6379, db=0) + except Exception as e: + print(f"redis.Redis failed: {e}") + + # This block runs when the script is imported by OpenFaaS + logger.info("Running in production mode") + processing_thread = threading.Thread(target=main_loop, name="processing_thread") + processing_thread.start() + + server_thread = threading.Thread(target=run_server, name="server_thread") + server_thread.start() diff --git a/src/data_backups/data_radar.pkl b/src/data_backups/data_radar.pkl new file mode 100644 index 0000000000000000000000000000000000000000..256d9ecabe07ea27f234f07477bf27f7f64413cb GIT binary patch literal 499 zcmX|8L9UxH6l|sLdY8&Th{U3c5CSnDJ2hY%yr}?34aOn#hlCKRQV-B&daoX%$0$VV zf6_=#ni*-{yq_O_|9$`NIX@o1KFXpmzLv##zg#@BKNGPZ!%nQ>5~|9;s zd9=sg28h}?pmbwJsjKrNv9Y(ZggU5vglE{|tIR?rlF5HGZgNX{5im2#u&!F6t9sVN zb`{WiSBIFgm_`jV1-Aj+vbqfPy$DzX>!B8b4|3+z`Q*il%zDHv;Xl&;1og#+_8_D-$Lor1SWeH&169*}n$(E8cO zAUHNf7Tyc)T%bDDC|L||_i=nJ+gMKk`^?odpJ(nc^`P=B?n-lUElmtWM*7N%N5TDm o_Er|Fiz8EV{!=S;8Qdh($ literal 0 HcmV?d00001 diff --git a/src/data_backups/data_sensors.pkl b/src/data_backups/data_sensors.pkl new file mode 100644 index 0000000000000000000000000000000000000000..7b7e4db6f6c9a56b58fc8658b00d99ce4b3f23bd GIT binary patch literal 511 zcmX|-O|F|T6op%O2I0?`fFpivVoxftUlkid^8^WrrAl2uhw1LRj4q($ z4SJ$7a8=4;!GuN&ua__G{d@5!0_XoGZkVXspycg=J zDb@lKwO0uyv><6mEncqyiK%K~ZyHEDq-Moo1Q>M{TUe~~r#>zNWNPJ_hG08CO+Z?G zva{Sb8B#kZQA7+^oKjK^XPOj&@YOnW4&x=j|a zWw)1o5SpFY&U?L`pNz1*p4m$W13tHwv*bayRBu)4CmlN!M`pPf3F$?~|NA+L*c+P~ zAD4gER=L;3e=q%&k#W+q8a!De>*S5s7$E&!NsEpxT(4TYxC+>7ad$y$duL`s^(}vS Id5mBF0sZ8vk^lez literal 0 HcmV?d00001 diff --git a/src/data_backups_100/data_radar.pkl b/src/data_backups_100/data_radar.pkl new file mode 100644 index 0000000000000000000000000000000000000000..d556c98c6a395875a71da192eb3d8c3b460ada95 GIT binary patch literal 23032 zcmb7L2YeLO_GV@_$tEFy6zOCVN3K42_%$9TS_3I211h}t8|HU1x0Zw z!ULr$MX@Z33WECJsh|j~f+8yF!vd)2|2y~2%+Blvf4~2Gk$ktzobTRy?z!ilGYPmJ zztpFk_}`2bnWYIO1!GFajxSl!e?=yjT@#8%7p(AkKfQFe+o7v%OB06{Oez{uFtBLo z3Tm#FCQm3BU0g7J(1eNO(aUppNox;YRq8IBIJ#(P(S!$4mp){$pX=gA6%8+(up*~M zsk?Z5K}m^xV)5*uA<r(4frqVYC+}_&k-KjQ22fQ!t@0 z(dU^vdvActNrF=Pq*whY*(fm_aH>lPwE%5yF`LuMEeeXdwAUUF2$0@?e;B|);!OQH zAkm6Rc<$c$9d~3BdeQ;zTIkLx#~c=KeM#q{M-=Ef)v12Zf8QS0*#e55oW!R?AJ zfbM?YBf+qN0SV+QoK#-k1}ce?O5W5kIx>~|{o8sNx@kwe;L<;+b6Il9o?n$qWi61k zWaD?TizL``T+(ya`Wd?ETTRST>--RxC6}6}RBKLRL0-WRK|JB}=EW}# z3(#XxNBl-j^-%MW05$v_!V+9BX5e7<+t4atR#nbYAB$PNQ8PX03TAC82xi0MLtK{3 zI)8i&h3HvY5o+-rW~mWE;q|H z$+pa1-NAFh=n5{rM8RdqI{_|BE*a;NP;heL2^p1=r=uPqYeGPPj42(%63qF?BZ1sP zP7>G7gTysO*k1I)|9C`JzSO~T2#81OE%50pwC;p`jTucmi84j z9rHq>dvmR_0oLB8%bE-2`~VJ0Pi{B|O10%0aW_e++VE!r4Bb@SEhq)s`nhb(%Y-jb zh)}W-vdqlzq4B!tGwj0=0SOK#NYKdCr$%N?Mm;aOIGOD^_6C@>6wL0fXxZTU3wHU@ z)zaXN6#`t8#Pn5<>L}SZG{b?D2B4C`Ev}#d1HP#57eKG@w?~5R>+u_X^m*I4#xY5I z|N1wm)RD@wZm}Qjyl%;#+!bPIo|$V~iK3*E^{P;^C@LerKIO+FKJQ3rK)AW__a0r; zWzBy&EWv?pK>#O*537PhEy1DVSZg|x&b$;tS1i;IQg^mq_HbE}$h@|iOHm|>TI>m+ z*5@s%Q4|!Q@P{;y0GX!2$(+<)mo;DR3_D!+M4A9Kgyu_yqt9PjQWd!xyLafWt3hFLj)C0?mR$ z`r6jq74vb#B}hykAj*T+8lm&zh!ZK;v@t32q# zb2Q?mhJMk#@!&)Fjhe>qb`1!SmFUv}?3B9452_AI%><>p=56D4VR&Y{gACntS$Lj{ zJC6}fl2WUtv%+Y#C^?pH{LiCg1;mu}U&~#g)jtgiN$|uLzW`Zp z@AU}KB;(Jp1hL0e_nT{Hfl+z6Zto29qa#mJ+M_Rq7@E8Mg-Zc0N=BK|Ke9~?!{IM? z^q>#Gw-xbQtXSlP_+JfOG-g(v&5e#(Nqaa69MUC+@<&5xXAbEZQ~V6wId_OznfblW zMaiMr^@S+dH@P)0KJV!AFK{Ek$QBZex^g`zYMKRR1td7oJ}kl2I6r`cQC64XVAMu1 zy0fEY`_gZ@m7js?DB|E;^q&g>E=x)+pUn-SWaD8eu?5DyBi_h0L`G(+j*?AJY;!Y8-{%*N+?>(pmkl ziCQn$3Nv)m&(iu0SmWoip{yH+22iq*U|4u?7}o-r7nH&iUBjX#Gk2R`f|U;iC78b@ z4B+76>-#*oD1yt~5qsBQR!j5*7vn}bmqJ`L7G?iYKT5Vqu=K(LC8L*zMW2#s^YI&< zOL{&O5}@hzMgakuEm^5ca5N3Soy11|@+*j?3SynwSd&!g-Bx~dHHdj#6aEf!Q4-5) zzb=51Ma(u?@U7ae4vEI%YQ1#<#(vdDg2rq11w~Ec6VHbP$jW*v4B(imiKSh^=(2PG zBSx>mtRv{j%Qi>a;fH&8xGX89nlg-;aZnmqe4mc42n77~C_e-6dV_!f^$x#-->69) zP$w*b+;R>|Gl$FvrC%kbhL$=vEG_Zi8BppW{fbe41i36JWi;O#M#;g%TDQR~?$Xf+ zUwwQj_X3bNW1qL-XTxPpquwKQ(PW&+4oWj>o&qIJD)%Fn$+@X?Hy$z9&9k`lSq~Q_ zqqH8!LMTOD8~hvVg;DGC`e*0)CFojH0{!@DkEltj@L^bjJH;wEsBB695>)C+-JUR& zJMuoOHp%BO1lxeLnv7)WC5P8Y46<=7L5kry0&s(0({%eFB;qVnsYA) zl^w3zL8XeI;=Sp+0NR;K%F%Ovh9;Hy6GL2-RMJ;X52IA}I`y4D(St^xx9{zlA){BC zoCCOAhi{XO=`|h*pwU5PZ`Tu`QeAGWh-+Tw(c%0IbmqVEGBx<3&Sl9Z`~BM7YRyJj zV8Ui^Q1qGbdUwABW{u@lL<0x2m#W20TtZ**t)ZRZ(jnq>)&JG)$1~va zu(Z*)RrhdNa;dwzcMzr6$ze);Oxx1uk*7nVS09r}op!YYqNZ-bjGzRPt%K73W6eP6 zCQ0dfF?SSaS5XHbrZlm<@Gsi*)VVAvrL9g1qf|Df-sZ{B>uN)f=+&#|>YxCT@Ya#< z{GujJ2HzZ%&Zs_6swXIUzaGXNnNpQ*!v%EH-xUR=#dAa4Bq?P+_)Qq4sNG)BV4nxI zKJU;ocZ4M9`wD)erqL5)g92o1yPyk@sUE;@r(S~f)C_lPdTWnoU zfQyn!6W^y?vS!DoMR3!P{~Q*LBXgAy05@iBKNtW+FiE=RG9b&NHi4l(dn}HkoG2>WT(cW_A5b(cXK zo`zgA5L>D0ssyw^+wMEm25$!<1LmUPVe$@2wkx>lZBRSP4#e5 zQqgP74WMLEDGT@mzlTL1zZp02r`Oa)P3nlYAqhe*S);4{{HSpdYyINGAof48P#q)s zTYu1>VRXepossU~L)|@Gmdp&7&LZSgnz*__bcOcFJ0UkVdbO)x)McbSs!JgKL!i@H7bphI@9}P$l zTIUg<-I#3w00)-|2X}%?bGer$TFzH`-?iKo3szYy*z5%;nbXpC#>yZ{Wl`~Yr%v1K zK_fu76Wj}+ORum1Z5vIt0`Ixt$v_?0F5?A3#^eWebj9rKH?B47!d#YIvSoxO$_|g)IMa_lKCh8}VSd@BW05^>zAhTG zCw6AwAhE0<01~lzm&J2J=n4{tB?(8~CGF2#G(v{ufm-Uzl6xO&5*E$9O)b%D@6kn# z{=*r+0BL1#S-o!xxWr~R#@!yqGvJafvm3*%p=eG>XEWFt&BexUX$$cN=C*XrXF)vd z^IEwL-!E%)SrZ#S?y=#mg`i}lf?l!gaeBK$Av^<0KN+c@!XTIB8r8g!J8GkpH}hE! zp742loGlJX5NNFnQ2niq0Rd{r&FS;jj7trn#<6gl!yM|5O}~6oOGj5Qnz& zfDh)&KDGbVQb!-hTz#-$1&A5xAwOCrF};%~z|fsDQdq`KFNvZg)@FD~2#ppox#OnP z-l&V(DJjG78_knt#sXQQd%g&Ynl=?a4oe`tBRiG6Iv4qRV`ONC4fCTT&r#Ydr}-Hy zTd|R$nT(=2Nk)byip$^zMFI4|WzPa;_|Le7_`K<6tgwEAi9vLBaB1{aZ*Vb^BZ`mP znM;G3PYCFyfktv9Aj)!HT6A@B$>L%G)}`V_MY?D#HV^q?^Y+)=4E{8Ku(EtX^Y70F z(AhyLs}jd3W0OYXhodJ?&uk-U^t7MLl2XR8#X*!TCKg~yqdI;QK%;=}nI6$E!_1Uq z%uEc*n%(aQ032L8Z%70eBT-Y&Qs-*Jm+0sUF0V<4Ft|3%WrNGTLZ=)Q3`J%N!%bIx z*qi_6x4_RBSIGcBt(U;K>N~g;6qJBVY-(oS*f8E=ZLN*TelAKXx_JY)Hc~7YxHc<* zCvb?>Y3z}}HQq0PZbn>mdHngjWgKF8h3A`tn2{tYULVGztV2m>PIwr)>4K3YITGNq zWY$>mg;264$pSn@MgKbM5sgK4x`YKNlnD`^*NEQ&n0KH|8A@}^)tJXMP>RhR9nA{j zb=F+1y(7d$$*4)wIYE>xA{JmqBfqQ?5{;v(t6>2~zIqdWqbcjAH696OibDxCn-X-? zI2bK(9|fb>%*mQP0d&PA8L>lW=ES@}+0@5cz(~rZ#VbL)11S^J>NM(puP$mD&6pgJ zVDtT4Tm3uGg`;81`eHkhoxU?EG(1TWsOIcw2x6kk!Y-yzNU$+g5npTy3_>HDk=0^an^nqOCV0N%FXHJal z&AZ!lw2Px`@s%(`^SFQESw9!03T1Xa8bGNmK9(j$aycIu`)>G>fdz z zjJ%2Y%Vi|Wr zlHa@(HQuX&A|5Xz+n=A+p2K#r`IDtv`MuWmF73;&Fy<(2gyBP!HAns#>q6*?G>Z9^ zkzYm-d|v&+$9~b2R_$6qf?H>X0UUGGqDM6_GLlU7tqs=j$>kn&#j^b%L*b)Bel8mu ztoqI%N>RauNOs9QRR){iDMYeM*5?`WgEa>i|1AEVGB%I0Z&H|Ln;UF?*AN#am2@eq z?Dgt9u7d|%ecpfl76?diQ4a}_W`4~W`6Tz2=eUo9QQmeAy_At;8myx$7*!YNd+>D+ z7bPPj$y632uA3GT(OiJX>iO{$B4X<{8{pS>Lb4|Ilg|91rTsxAHnX#Mc>vE~q3R|H zE`xvLcNko-8Ny193;lM`hkRz z#Ch~iAqh6H{;yR%cm^a^8cCeaqAW?&oz{dqS`$(h=p%(8sk8AY`l556yF2;?s57>= zM*`CeFp@eBW<`7N1GCs%&ajgqJPc;VMlNSakjs*pOHb;0F8)cd|MH}8JM3R zL@r!b_NL~NAo@5+EDApa60sSRFN4wsklZT;Y z1Q&{;WK?Tu=KvaQG;C)@cbO(eUdR0QKDVxMhp%o1%xajkTvP5i2cx6oUj`#1!PC06 zkjrlJz2!p?3PyJd!}G%0AQvSgUiQ-ap_8dPBhsXRpugKi) zjeb7MkHa0bQaZ6A-~3D86@yD`N~ewV|J-!WNa=_Z$KcXV-Xo+eK$onx)b^D$P~tk*IYz{V72L;^z?wyv###6q;S0;^Qe% zqfRQ_qtaQ0;#E4Y;HuU#r_fL=JqkefJ#FLuZ$#YliE4o;#Y3car2 zhQn&wPo@3}jj;>(MWdTEdRwKXDy>xMfJ&c638*}pKUIRi6eED3O8CLMH}G3&mO`Bs znx@byyM*N~x>utQRC-wq;kZhQQ(1HCU)m1|xSPgGxFwl3xmkTO6k4O;6sATh59w{Y z1kB2VDveaBqtMzeDy+U^8FCveSqTZqi^fWTDxFDU4H~3U6O|4s^dE))t!7{7)h@cv zMK5dAOe4QWIT{6`#*jVzASA$vDpLZ4Jg?Fmg?>PNYzFp@OJ2wqnj&Tf!>d!A2j-n-%SrIFom`;U_0QaV|`j#n>tjW0r8X&=Uj)r zgW5p?E|L)m=63isnMTL4F~Cy1rqE=C{=}-QFn=})b2OSR_On}~&sAE0=BP1DDdd<7 zPW@O3e{iXn4 zrODJVj>fC}?(Y@e2`?(-S823;4C!%H$wlKd>Y&lh8a=>Ag>xNBQ}~NW__VPS_Le6# zfu2_=sL&*Z-cc|`Ie)^4+lKHzl__BM@N-o`LW}=!<1Y#4@BNQOyLzm#Bw| z4%Av$z%6_h*~XCIqLV6ZS81el1ik)vhynABz{}L=BL4S z3Gg)V`YkH0RcV~c3Ux3_!imB#Bp|~dE8*8f%5%{e7(Ru^a6y4*N<9>q5!)<%qERwV zHI-J0t&*<-J5}jaF<(aD`Nv9lB#}D1SW||vt-%q(>kVbSV3%+;o({t2anX+&J+09a zjp{ki(A+OQ4?qGkClLwehKqO8S8?>H(9|SxKL4iBeuWC`+u^8+zaM4GgpJTjrRxgP zoz5{_>(&|)kW-6Dpd2OVX6VvONuF-1Zn#277pXP!sq*4q4^5@9-DvzE=qUNP>o=_L-2Z3x)vp1*b3g#@LXdAuvz^+ z0e@A*$Brt_2pn=1R0y$2xLug`dBPRYVOdp-**SzYQ$|4ozDtOe0D9PJO%$9?tf`L) zhX6bLSG$DgR6fD+gJr<^fth^4Y52Nt4c!3=NUFt3cq<+sn)A&83y2dAM-Uuu7}}$D z30pL(F4p=DmDa1Yk+-*V4BPW|KmxJ~5eXG?;&ZxT5q?aenF)NjKc`SPJg?Bl!Yv+U zmwpkK09gZ5q8;@Rq1O!VX5?W%zwZ<4+3H)%fc=sm?El}8eLV{x*u-o9y;9&R+ zZVx*FG|YKQcMk=IKmxvNh)6KUfW1&732rv^VpYxgi<Qn7sq&uFw9XOl_~U_&VE zQNh!(-{HEy_cH(G1;X?Z0j5=gvTjIXqlv{`tMlmHLgkCl78NeC zQ^KK9Y*~>LjYu$ukdR0-Tr^vuiQ*WA%EDj5x)<8V&_knZE>h!Yjxb4`Rq7li;j3xv zwBm!fSP2+I-4wol;>_8pP#Vq{g>Gd7XxnhbF7{aAeos+pfY^4WQ4)UYJr@#?OpTQQ zKc*m#7rKu^n-w~y@M#CDw8bu=v5Peb>wu$ttjagOyeJ93FXL}&kim&az*fkq3RO~) z>7#h^;ba${(vy5X^Er9eE@5ORXLgNQ&4tKJJm|-{?;nUhShONRyND)UH zm>@8rL`i6LaUUeyI4W~REHYfNqYCv^5jl6>)nC*oMdMr8+t@5(%dL+V@cT>%z~{WN zLqJsKY9ir)Lo5Wg6+Z70g$^mWQ`?5{ql@8Un9E;A~F>*WD)AaoV3#*5;F z#=C0R59w5Kgbq@vG9RAorJsnB&}(P_5^fZg`6ZrxJS^i#1v5!rVZc-D5;n$D(*(L( zc&@u~C+6#%Q(60e^bsV)L}j3`q@~b3@${X_RtZ1yEX8^|lofm7HW%f%=oO7Th;3+e zFlrs9{5~EM%8bgiaMK=MA`bW5iTIrezQGN+W?$()HOj+ms)PX?RQbyNdDIvdlyH9X z22q)Jl6ehb^RT`!dsyxP3JtdlfbKw9e^B`l!r=zTes`3BWkrJ_;2)wg6;-~$!Czgf z&|L~&v!1g{fMLxQp`ZV#>{;%*4~h|3KQyM1;E zm&HB{h^P$osFDf?+bLl~DZAY_ipum!;>%wr6}K1OYKR8FJUhZ~=Uuc;qg4nbYBUkv zhDP~O3%zp+UmgA-Dg!SIaa^nb?hv>HVbTzVjD&KJi*@*&gJKjEcdV$WOpkY}zX=It zMr9tv!$N}|Mi4-F?r<5QS(oh+5CLec;tI&e6rzB&RLYN{D%0as{#8gQGb(dEnO=|M1#X++9_ALc{`am_l3E+9hBN zZ8iR>s&UL4!IP-FLyxP2o`i%l<37_|Qn{bc$iO(k7nH?K0 zulnFA5OAZY3<9mV;o=7};FHyWNnn@HCSfW}Db7FP@_Zo~s1lqq zxMT3ea0Gp69|H`=+XB^!!c9B!$0&pz)K@kcZ_iptc{IA)k3u5j=Jj))+o!+2lN zIB&2Wu_ukMaDS?BlARLzj#`B=lo>NQnMhMbRHj&%8=Y?rysVC3@!ojyy9iN-LmHK8 z9H%@PHA}q)^BG!Z+(%8K-U!L7lq)<4n5d2_!HRFS&k_87An{bNEFz#H7El0@Bq#UvlQZVnR6XBSK%9o)4?I?c0^pYo6n9=EM2tu|s}n!^5Z6S1$B+b0Redb8Ni@F!D6#;=Z3LVmpo z{AVm>MrG!>c=oY}{VKd#Hs3uk4GMi=m+-T2RH0^&gYm$(f?sam()lfNIXLEsnUu?M z<@ljWH&>#4Zo)p>t$+e2QE*#a!f6`YERAtdXAzNx&j(lOBIo0xB%YKFlWh?oid4TsTKp>a4MPK3tb_nk+;F-k(S*P1{=nUSTI3DiA~kDc-&PB~NM zcsksfkL_c?{q~qpA-sOJ2;fHrDf2tb;Y^<+Dq|Y&g$Z;TDHm~reqRK<`>E77>ZXe; zVn=aHgzZ}=V&1Ky#^8DIG9=t6Dl<2ZLMNw*&0aXmN)_^a^)HoN!UM#QZLWe9kySfXliJm5a(s=NIR#hJ-R>CTHSkBpWQs z6KjC88V(=MIfrY19p13YX*9T-1tM&+G-{R#COr!YH;Vf_nZiy$bK!NvXl%f~;8n40 zhr`H<;rxJTm9rE$DfUK5n7Ww9;E2kYs`OAYosZ{$(rY3pmW>b-g5B^b>=G8%rA;Z6 z1Vlw;l$M&AS%=P5qVyDMoJ1@CUs0L-XH)Kngnx*cz-q%} zCy5+Y0c)x=bk$hnsV8ApF9oeJAl9Ho%+dQ?&XonYA?5Z50S836@5%ef32 zf0_UZH;T$U?Bwe+Z6s z6@Es0VK!gok4$iDx9x^mF18uCujBQQbDDL^Ifm26`8&p#h{oU?L{3xaiFi%|cjTKN zTi6OB!sD{91FX{PF8UY;skrODs?n!WV>sd0A>l?*naATfRPu(x2^3h^_F_XhzV}fo zkhd}|gR3G=Yn-$FqQ-Fb*XJOi%$P}iH+{;^CP&IKOEX1)?0|xgb8NFT&qYWvZBjY+ zvqo5@V@?S@D-Y%9*gu3cs)&#V?uM|hy~Mvy*d>&>XpjiLT@q0l_%u;Llb(sizd*tb zqB3>Mvj>T=*ta6mP#c=5kRx+~Pe70i#nB8e0PZWm?>vNp!ub9x3KQ^OBK~vtUoo-y F{{b^TPeA|x literal 0 HcmV?d00001 diff --git a/src/new-ca.crt b/src/new-ca.crt new file mode 100644 index 0000000..fff2847 --- /dev/null +++ b/src/new-ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHTCCAgWgAwIBAgIUak6HXnrheUs5SnAXYa+jUU1l28YwDQYJKoZIhvcNAQEL +BQAwHjEcMBoGA1UEAwwTZWx1eG5ldHdvcmtzLm5ldCBDQTAeFw0yNTAyMjYwNTA4 +MDFaFw0zNTAyMjQwNTA4MDFaMB4xHDAaBgNVBAMME2VsdXhuZXR3b3Jrcy5uZXQg +Q0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC0ooycKCciiC9CY6mP +ph+WS1I42kf3Io7kzZ1/gMFb1EJxabMlAga94NmWO9uQkwiaFQOWrH2cvLyLL9kD +kz7ZmQUij4C2sHU2CQkqG0mo8xeBxaFYmSXwAd0jYd/6GHABCF63/USWIrfUPkNt +f7HaoM6yPZ61w3Ne0G5Kfd5/HsTiRiGbpCXHUpp6NMeuG59j1Ma+eEDXPKMimKti +R3bCMI5tOsCOey6yjEP+DituitqUZZYKPmk+7cvi1tK50OGMT330P+mPZPJQxauK +dew3mhTv5iKiGYhdN5ZFUy1KVJHf3y3rmNjEWesU0X8483v4tuhhcjNIA+D8/Tcn +qKQ5AgMBAAGjUzBRMB0GA1UdDgQWBBTd6ubEStLdE60De4Re5IQENKn/aTAfBgNV +HSMEGDAWgBTd6ubEStLdE60De4Re5IQENKn/aTAPBgNVHRMBAf8EBTADAQH/MA0G +CSqGSIb3DQEBCwUAA4IBAQAMkrP0zdt1uAOI1B77nV7+EZSzJxahrubo8opjrkvd +4/stCVG6OfDutxARvCdC4OoJiRPGXBiA22bIi7ZMl7DTpA+CeFEICfF3MKcf8xIT +V5sCm25dX+KHwACWNccNazpIlIAVGxMmcSs70SlMIfPksCW2FRibsVnzESQCgQcP +e6owfVvsnQNN4UADki4JMZJ1RQ/nUNM3aNJSf/SFVJYjiUXHLNJY65FfiYV4MGRi +Yq+NDPs3D0KLkwQ03FFcw56TdPnCAYuihDHkITuQDtS1DawLhuwFDDPcys5Mnigc +1Y1o5V4Z6L6xkttwgD9zP3DQJscDZ/Gki1NqYV3TvJPr +-----END CERTIFICATE----- diff --git a/timescaledb_backup.dump b/timescaledb_backup.dump new file mode 100644 index 0000000..e69de29