In process of adding parsers for other MQTT formats

This commit is contained in:
NucBox_EVO-X2\robert 2026-01-11 13:56:37 -08:00
parent e22c41e20a
commit 375096e303
2 changed files with 316 additions and 163 deletions

View File

@ -12,7 +12,7 @@ functions:
read_timeout: "1h" read_timeout: "1h"
write_timeout: "1h" write_timeout: "1h"
exec_timeout: "1h" exec_timeout: "1h"
max_inflight: 16
# RabbitMQ configuration # RabbitMQ configuration
RABBITMQ_URL: "amqp://well_pipe:wellnuo_2024@192.168.68.70:5672//" RABBITMQ_URL: "amqp://well_pipe:wellnuo_2024@192.168.68.70:5672//"
RABBITMQ_QUEUE: "evgrid-dev-demo" RABBITMQ_QUEUE: "evgrid-dev-demo"

View File

@ -78,6 +78,9 @@ PROCESSING_PERIOD = int(os.getenv('PROCESSING_PERIOD', '300')) # 5 minutes
QUEUE_LENGTH_THRESHOLD = int(os.getenv('QUEUE_LENGTH_THRESHOLD', '1000')) QUEUE_LENGTH_THRESHOLD = int(os.getenv('QUEUE_LENGTH_THRESHOLD', '1000'))
redis_host = os.getenv('REDIS_HOST', '192.168.68.70') redis_host = os.getenv('REDIS_HOST', '192.168.68.70')
redis_host = '192.168.68.70' redis_host = '192.168.68.70'
#in debugger:
#redis_host = '127.0.0.1'
cache_dir = "cache" cache_dir = "cache"
MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net" MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net"
mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port
@ -98,12 +101,10 @@ logger.info(f"DB_HOST= {DB_HOST}")
logger.info(f"DB_PORT= {DB_PORT}") logger.info(f"DB_PORT= {DB_PORT}")
logger.info(f"PROCESSING_PERIOD= {PROCESSING_PERIOD}") logger.info(f"PROCESSING_PERIOD= {PROCESSING_PERIOD}")
logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}")
logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}")
logger.info(f"BATCH_SIZE= {BATCH_SIZE}") logger.info(f"BATCH_SIZE= {BATCH_SIZE}")
logger.info(f"mqtt_port= {mqtt_port}") logger.info(f"mqtt_port= {mqtt_port}")
logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}")
logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}")
logger.info(f"QUEUE_NAME= {QUEUE_NAME}") logger.info(f"QUEUE_NAME= {QUEUE_NAME}")
logger.info(f"RABBITMQ_URL= {RABBITMQ_URL}") logger.info(f"RABBITMQ_URL= {RABBITMQ_URL}")
@ -260,7 +261,7 @@ def connect_mqtt(client_id):
if rc == 0: if rc == 0:
connected = True connected = True
print(f"Connected to {MQTTSERVER} with {unique_client_id}") print(f"Connected to {MQTTSERVER} with {unique_client_id}")
subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1)] subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1),("mesh/#",1)]
print("subscribing to ", subbedTo) print("subscribing to ", subbedTo)
client.subscribe(subbedTo) client.subscribe(subbedTo)
else: else:
@ -462,7 +463,7 @@ def process_messages(connection, db_conn):
# Basic QoS with all required parameters # Basic QoS with all required parameters
channel.basic_qos( channel.basic_qos(
prefetch_size=0, prefetch_size=0,
prefetch_count=1, prefetch_count=100,
a_global=False a_global=False
) )
logger.info("Basic QoS set") logger.info("Basic QoS set")
@ -500,6 +501,11 @@ def process_messages(connection, db_conn):
#logger.info(f"Properties: {msg.properties}") #logger.info(f"Properties: {msg.properties}")
#logger.info(f"Body type: {type(msg.body)}") #logger.info(f"Body type: {type(msg.body)}")
#logger.info(f"Body length: {len(msg.body) if msg.body else 0}") #logger.info(f"Body length: {len(msg.body) if msg.body else 0}")
is_mesh = False
topic = ""
if "mesh" in msg.body:
topic = json.loads(msg.body)["subject"]
is_mesh = True
if isinstance(msg.body, (bytes, bytearray)): if isinstance(msg.body, (bytes, bytearray)):
hex_dump = ' '.join(f'{b:02x}' for b in msg.body[:32]) hex_dump = ' '.join(f'{b:02x}' for b in msg.body[:32])
@ -517,7 +523,7 @@ def process_messages(connection, db_conn):
with PROCESSING_TIME.time(): with PROCESSING_TIME.time():
with db_conn.cursor() as cursor: with db_conn.cursor() as cursor:
data_type, parsed_data = process_message_data(cursor, data) data_type, parsed_data = process_message_data(cursor, data, is_mesh, topic)
if data_type and parsed_data: if data_type and parsed_data:
#logger.info(f"Successfully processed as {data_type}") #logger.info(f"Successfully processed as {data_type}")
@ -527,6 +533,8 @@ def process_messages(connection, db_conn):
parsed_data_mqtt["temperature"] = parsed_data["temperature"] + temperature_offset parsed_data_mqtt["temperature"] = parsed_data["temperature"] + temperature_offset
parsed_data_mqtt["humidity"] = parsed_data["humidity"] + humidity_offset parsed_data_mqtt["humidity"] = parsed_data["humidity"] + humidity_offset
if parsed_data["device_id"] == 679:
print("stop")
MQSend("/"+ids2MAC[parsed_data["device_id"]], json.dumps(parsed_data_mqtt)) MQSend("/"+ids2MAC[parsed_data["device_id"]], json.dumps(parsed_data_mqtt))
if data_type in data_batches: if data_type in data_batches:
data_batches[data_type].append(parsed_data) data_batches[data_type].append(parsed_data)
@ -903,7 +911,8 @@ def GetLastDetected(device_id, field_name, threshold_value):
cur.execute(query) cur.execute(query)
record = cur.fetchone() record = cur.fetchone()
if record != None: if record != None:
last_detected = cur.fetchone()[0].timestamp() last_detected = record[0].timestamp()
#last_detected = cur.fetchone()[0].timestamp()
return last_detected return last_detected
@ -982,10 +991,16 @@ def UpdateLastSeen(new_message_dict):
radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0]
elif component[0] == "s": elif component[0] == "s":
component_index = ast.literal_eval(component[1]) component_index = ast.literal_eval(component[1])
if new_message_dict['radar'][1][0] > 0:
radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0]
else:
radar_val = 0
elif component[0] == "m": elif component[0] == "m":
component_index = ast.literal_eval(component[1]) component_index = ast.literal_eval(component[1])
if new_message_dict['radar'][0][0] > 0:
radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0]
else:
radar_val = 0
if radar_val > radar_threshold_value: #seen now if radar_val > radar_threshold_value: #seen now
r.set('lastseen_'+device_id_s, new_message_dict['time']) r.set('lastseen_'+device_id_s, new_message_dict['time'])
@ -1002,7 +1017,97 @@ def UpdateLastSeen(new_message_dict):
logger.error(f"Error: {traceback.format_exc()}") logger.error(f"Error: {traceback.format_exc()}")
logger.error(f"UpdateLastSeen failed: {e}") logger.error(f"UpdateLastSeen failed: {e}")
def process_message_data(cursor, body): def get_or_create_device(mac: str) -> int:
"""Get ble_id, creating device if needed"""
sql = """
INSERT INTO ble_devices (mac_address)
VALUES (%s)
ON CONFLICT (mac_address) DO UPDATE SET mac_address = EXCLUDED.mac_address
RETURNING ble_id
"""
with conn.cursor() as cursor: # or your db connection variable
cursor.execute(sql, (mac,))
conn.commit() # needed for INSERT
return cursor.fetchone()[0]
def ingest_ble_report(topic: str, payload: dict):
"""Process both full and delta reports identically"""
reporter_id = payload['id']
report_time = datetime.fromtimestamp(payload['time'] / 1000, tz=timezone.utc)
devices = payload.get('devices', []) if payload['type'] == 'full' else payload.get('changes', [])
for dev in devices:
mac = dev['mac']
rssi = dev['rssi']
alarm_state = dev.get('a_state', 0)
sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \
if 'time' in dev else report_time
if is_random_mac(mac):
# Option A: Store in separate short-lived table
execute("""
INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id)
VALUES (%s, %s, %s, %s, %s)
""", (mac, rssi, alarm_state, sighting_time, reporter_id))
continue # Skip main device table
# Option B: Just skip entirely
# continue
device_name = dev.get('name') # Only in full reports
ble_id = get_or_create_device(mac)
# Full reports have per-device timestamp, deltas use report time
sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \
if 'time' in dev else report_time
# 1. Insert to history (always)
insert_sighting(mac, rssi, alarm_state, device_name, sighting_time, reporter_id)
# 2. Upsert current state
upsert_device_state(mac, rssi, alarm_state, device_name, sighting_time, reporter_id)
def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int):
"""Single query: update if no change, insert if changed"""
RSSI_THRESHOLD = 5
execute("""
WITH latest AS (
SELECT id, rssi, alarm_state
FROM ble_sightings
WHERE ble_id = %(ble_id)s
ORDER BY last_seen DESC
LIMIT 1
),
updated AS (
UPDATE ble_sightings s
SET last_seen = %(time)s, reporter_id = %(reporter)s
FROM latest l
WHERE s.id = l.id
AND abs(l.rssi - %(rssi)s) < %(threshold)s
AND l.alarm_state = %(alarm)s
RETURNING s.id
)
INSERT INTO ble_sightings (ble_id, rssi, alarm_state, first_seen, last_seen, reporter_id)
SELECT %(ble_id)s, %(rssi)s, %(alarm)s, %(time)s, %(time)s, %(reporter)s
WHERE NOT EXISTS (SELECT 1 FROM updated)
AND NOT EXISTS (
SELECT 1 FROM latest l
WHERE abs(l.rssi - %(rssi)s) < %(threshold)s
AND l.alarm_state = %(alarm)s
)
""", {
'ble_id': ble_id,
'rssi': rssi,
'alarm': alarm_state,
'time': sighting_time,
'reporter': reporter_id,
'threshold': RSSI_THRESHOLD
})
def process_message_data(cursor, body, is_mesh, topic):
global MACS2id, ids2MAC global MACS2id, ids2MAC
"""Process a single message from the queue.""" """Process a single message from the queue."""
@ -1012,6 +1117,8 @@ def process_message_data(cursor, body):
else: else:
data = json.loads(body) data = json.loads(body)
#print(decrypt_data)
if "data_base64" in data: if "data_base64" in data:
decrypt_data = data["data_base64"] decrypt_data = data["data_base64"]
@ -1022,14 +1129,60 @@ def process_message_data(cursor, body):
decrypt_data = decrypt_data.decode('utf-8') decrypt_data = decrypt_data.decode('utf-8')
m_type = 0 m_type = 0
new_message_dict = {} new_message_dict = {}
#print(decrypt_data)
if len(decrypt_data) > 14: if len(decrypt_data) > 14:
if "data_base64" in decrypt_data: if "data_base64" in decrypt_data:
data = json.loads(decrypt_data) data = json.loads(decrypt_data)
payload = data["data_base64"] payload = data["data_base64"]
decrypt_data = base64.b64decode(payload) decrypt_data = base64.b64decode(payload)
#if data["subject"] != "wellnuotopics/topic1":
# print(decrypt_data)
if is_mesh:
decrypt_dict = json.loads(decrypt_data)
if "/full" in topic:
#{'id': 704, 'time': 1768070305983, 'seq': 8662, 'type': 'full', 'packet': 2, 'packets': 2, 'dev_count': 14,
# 'devices': [
# {'mac': '44:AC:AE:BD:95:22', 'rssi': -89, 'a_state': 255, 'time': 1768070302720},
# {'mac': '6D:B9:B5:A5:0D:C6', 'rssi': -62, 'a_state': 255, 'time': 1768070303820},
# {'mac': '38:CB:8F:DC:A8:13', 'rssi': -81, 'a_state': 255, 'time': 1768070252414},
# {'mac': 'A4:C1:38:BE:3D:94', 'rssi': -77, 'a_state': 255, 'time': 1768070300456, 'name': 'NVIDIA SHIELD Remote'},
# {'mac': '1C:67:76:B7:B5:5D', 'rssi': -83, 'a_state': 255, 'time': 1768070304506},
# {'mac': 'E2:25:91:41:CB:4C', 'rssi': -71, 'a_state': 255, 'time': 1768070303194},
# {'mac': 'EF:A5:A3:05:81:B4', 'rssi': -83, 'a_state': 255, 'time': 1768070206193, 'name': '20123099999'},
# {'mac': '15:E9:FB:C0:75:04', 'rssi': -63, 'a_state': 255, 'time': 1768070304705},
# {'mac': '50:21:8C:20:35:27', 'rssi': -96, 'a_state': 255, 'time': 1768070257700},
# {'mac': '64:B7:08:89:08:9A', 'rssi': -97, 'a_state': 255, 'time': 1768070236446, 'name': 'WP_238_890898'},
# {'mac': '33:C1:E9:DC:28:03', 'rssi': -85, 'a_state': 255, 'time': 1768070303919},
# {'mac': 'BC:57:29:05:EF:CF', 'rssi': -79, 'a_state': 255, 'time': 1768070300787, 'name': 'KBPro_472435'},
# {'mac': 'E0:92:95:25:C2:90', 'rssi': -89, 'a_state': 255, 'time': 1768070304907},
# {'mac': '78:03:D9:36:82:4A', 'rssi': -96, 'a_state': 255, 'time': 1768070304915}]}
devices = decrypt_dict["devices"]
for device in devices:
print(device)
#upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int)
ble_id = get_or_create_device(device['mac'])
sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc)
upsert_sighting(
ble_id=ble_id,
rssi=device['rssi'],
alarm_state=device.get('a_state', 0),
sighting_time=sighting_time,
reporter_id=decrypt_dict['id']
)
print(decrypt_dict)
elif "/delta" in topic:
print(decrypt_dict)
elif "mesh/topology/summary" in topic:
print(decrypt_dict)
elif "mesh/topology/node/" in topic:
print(decrypt_dict)
elif "mesh/topology/links/" in topic:
print(decrypt_dict)
else:
if len(decrypt_data) > 14: if len(decrypt_data) > 14:
pointer = 0 pointer = 0
lenn = 4 lenn = 4