diff --git a/src/app.py b/src/app.py index 21ae5b5..3bdfa0d 100644 --- a/src/app.py +++ b/src/app.py @@ -877,6 +877,7 @@ def GetLastPointQuery(device_id, field_name, threshold_value): return query def GetIdFromMAC(cursor, MAC): + query = "" try: query = "SELECT device_id FROM devices WHERE device_mac = '" + MAC + "'" @@ -898,6 +899,11 @@ def GetIdFromMAC(cursor, MAC): except Exception as e: logger.error(f"Error in GetIdFromMAC sql= {query} {e}") + # Rollback the failed transaction to allow subsequent queries to work + try: + cursor.connection.rollback() + except Exception: + pass return 0 def GetLastDetected(device_id, field_name, threshold_value): @@ -988,7 +994,10 @@ def UpdateLastSeen(new_message_dict): component = radar_threshold_signal.split("_")[0] radar_val = 0 if component == "s28": - radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + if new_message_dict['radar'][1][0] > 0: + radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] + else: + radar_val = 0 elif component[0] == "s": component_index = ast.literal_eval(component[1]) if new_message_dict['radar'][1][0] > 0: @@ -1017,63 +1026,69 @@ def UpdateLastSeen(new_message_dict): logger.error(f"Error: {traceback.format_exc()}") logger.error(f"UpdateLastSeen failed: {e}") -def get_or_create_device(mac: str) -> int: - """Get ble_id, creating device if needed""" +def is_random_mac(mac: str) -> bool: + """Check if MAC address is a random/private address (bit 1 of first octet is set)""" + try: + first_octet = int(mac.split(':')[0], 16) + return bool(first_octet & 0x02) # Check locally administered bit + except (ValueError, IndexError): + return False + +def get_or_create_device(cursor, mac: str, rssi: int = None, device_name: str = None, reporter_id: int = None) -> int: + """Get ble_id, creating device if needed, optionally updating last_rssi, device_name, last_reporter""" sql = """ - INSERT INTO ble_devices (mac_address) - VALUES (%s) - ON CONFLICT (mac_address) DO UPDATE SET mac_address = EXCLUDED.mac_address + INSERT INTO ble_devices (mac_address, last_rssi, device_name, last_reporter) + VALUES (%(mac)s, %(rssi)s, %(name)s, %(reporter)s) + ON CONFLICT (mac_address) DO UPDATE SET + last_seen = NOW(), + last_rssi = COALESCE(EXCLUDED.last_rssi, ble_devices.last_rssi), + device_name = COALESCE(EXCLUDED.device_name, ble_devices.device_name), + last_reporter = COALESCE(EXCLUDED.last_reporter, ble_devices.last_reporter) RETURNING ble_id """ - with conn.cursor() as cursor: # or your db connection variable - cursor.execute(sql, (mac,)) - conn.commit() # needed for INSERT - return cursor.fetchone()[0] + cursor.execute(sql, {'mac': mac, 'rssi': rssi, 'name': device_name, 'reporter': reporter_id}) + return cursor.fetchone()[0] -def ingest_ble_report(topic: str, payload: dict): - """Process both full and delta reports identically""" - reporter_id = payload['id'] - report_time = datetime.fromtimestamp(payload['time'] / 1000, tz=timezone.utc) +def insert_ble_alarm(cursor, ble_id: int, rssi: int, prev_alarm: int, curr_alarm: int, event_time: datetime, reporter_id: int): + """Insert alarm event into ble_alarms table""" + cursor.execute(""" + INSERT INTO ble_alarms (ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id) + VALUES (%s, %s, %s, %s, %s, %s) + """, (ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id)) - devices = payload.get('devices', []) if payload['type'] == 'full' else payload.get('changes', []) +def ingest_ble_alarm_event(cursor, decrypt_dict: dict): + """Process direct BLE alarm event from mesh/ble_alarm/ topic + Format 1: {"id":703,"time":1768183818208,"seq":5,"type":"alarm","mac":"F0:CA:2A:62:00:83","rssi":-73,"prev_a_state":0,"curr_a_state":1} + Format 2: {"id":701,"time":1768185508583,"mac":"F0:CA:2A:62:00:83","name":"K620083","a_state":1,"event":"panic_button"} + """ + reporter_id = decrypt_dict['id'] + event_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) + mac = decrypt_dict['mac'] + rssi = decrypt_dict.get('rssi', -100) + device_name = decrypt_dict.get('name') + # Handle both message formats: curr_a_state/prev_a_state or just a_state + if 'curr_a_state' in decrypt_dict: + curr_alarm = decrypt_dict.get('curr_a_state', 0) + prev_alarm = decrypt_dict.get('prev_a_state', 0) + else: + curr_alarm = decrypt_dict.get('a_state', 0) + prev_alarm = 0 # Not available in format 2 - for dev in devices: - mac = dev['mac'] - rssi = dev['rssi'] - alarm_state = dev.get('a_state', 0) + # Get or create the device + ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id) - sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \ - if 'time' in dev else report_time + # Always insert alarm events from this topic (they are explicit alarm notifications) + insert_ble_alarm(cursor, ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id) - if is_random_mac(mac): - # Option A: Store in separate short-lived table - execute(""" - INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id) - VALUES (%s, %s, %s, %s, %s) - """, (mac, rssi, alarm_state, sighting_time, reporter_id)) - continue # Skip main device table + # Also update the sighting + upsert_sighting(cursor, ble_id, rssi, curr_alarm, event_time, reporter_id) - # Option B: Just skip entirely - # continue - device_name = dev.get('name') # Only in full reports - ble_id = get_or_create_device(mac) - # Full reports have per-device timestamp, deltas use report time - sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \ - if 'time' in dev else report_time - - # 1. Insert to history (always) - insert_sighting(mac, rssi, alarm_state, device_name, sighting_time, reporter_id) - - # 2. Upsert current state - upsert_device_state(mac, rssi, alarm_state, device_name, sighting_time, reporter_id) - -def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int): +def upsert_sighting(cursor, ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int): """Single query: update if no change, insert if changed""" - RSSI_THRESHOLD = 5 - execute(""" + cursor.execute(""" WITH latest AS ( SELECT id, rssi, alarm_state FROM ble_sightings @@ -1107,6 +1122,128 @@ def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: dat 'threshold': RSSI_THRESHOLD }) +def insert_random_sighting(cursor, mac: str, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int): + """Insert sighting for random/private MAC addresses""" + cursor.execute(""" + INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id) + VALUES (%s, %s, %s, %s, %s) + """, (mac, rssi, alarm_state, sighting_time, reporter_id)) + +def ingest_ble_full_report(cursor, decrypt_dict: dict): + """Process BLE full report with device list""" + reporter_id = decrypt_dict['id'] + report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) + devices = decrypt_dict.get('devices', []) + + for device in devices: + mac = device['mac'] + rssi = device.get('rssi', device.get('rssi_raw', -100)) + alarm_state = device.get('a_state', 0) + device_name = device.get('name') # Only present in full reports + sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) \ + if 'time' in device else report_time + + if is_random_mac(mac): + insert_random_sighting(cursor, mac, rssi, alarm_state, sighting_time, reporter_id) + else: + ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id) + upsert_sighting(cursor, ble_id, rssi, alarm_state, sighting_time, reporter_id) + # Insert alarm if alarm_state indicates an active alarm (not 0 or 255) + if alarm_state not in (0, 255): + # Full reports don't have prev_a_state, use 0 as default + insert_ble_alarm(cursor, ble_id, rssi, 0, alarm_state, sighting_time, reporter_id) + +def ingest_ble_delta_report(cursor, decrypt_dict: dict): + """Process BLE delta report with changes only""" + reporter_id = decrypt_dict['id'] + report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) + changes = decrypt_dict.get('changes', []) + + for device in changes: + mac = device['mac'] + rssi = device.get('rssi', device.get('rssi_raw', -100)) + alarm_state = device.get('a_state', 0) + device_name = device.get('name') # May be present in delta too + sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) \ + if 'time' in device else report_time + + if is_random_mac(mac): + insert_random_sighting(cursor, mac, rssi, alarm_state, sighting_time, reporter_id) + else: + ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id) + upsert_sighting(cursor, ble_id, rssi, alarm_state, sighting_time, reporter_id) + # Insert alarm if alarm_state indicates an active alarm (not 0 or 255) + if alarm_state not in (0, 255): + # Delta reports don't have prev_a_state, use 0 as default + insert_ble_alarm(cursor, ble_id, rssi, 0, alarm_state, sighting_time, reporter_id) + +def ingest_topology_summary(cursor, decrypt_dict: dict): + """Process mesh topology summary message""" + reporter_id = decrypt_dict.get('id', 0) + report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \ + if 'time' in decrypt_dict else datetime.now(timezone.utc) + + cursor.execute(""" + INSERT INTO mesh_topology_summary (reporter_id, report_time, node_count, root_mac, layer_counts, raw_data) + VALUES (%s, %s, %s, %s, %s, %s) + """, ( + reporter_id, + report_time, + decrypt_dict.get('node_count'), + decrypt_dict.get('root_mac'), + json.dumps(decrypt_dict.get('layers', {})), + json.dumps(decrypt_dict) + )) + +def ingest_topology_node(cursor, decrypt_dict: dict, topic: str): + """Process individual mesh node report""" + reporter_id = decrypt_dict.get('id', 0) + report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \ + if 'time' in decrypt_dict else datetime.now(timezone.utc) + + # Extract node MAC from topic if present (e.g., mesh/topology/node/AA:BB:CC:DD:EE:FF) + node_mac = decrypt_dict.get('mac', '') + if not node_mac and '/node/' in topic: + parts = topic.split('/node/') + if len(parts) > 1: + node_mac = parts[1].split('/')[0] + + cursor.execute(""" + INSERT INTO mesh_topology_nodes (reporter_id, report_time, node_mac, parent_mac, layer, rssi, is_root, child_count, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + """, ( + reporter_id, + report_time, + node_mac, + decrypt_dict.get('parent_mac'), + decrypt_dict.get('layer'), + decrypt_dict.get('rssi'), + decrypt_dict.get('is_root', False), + decrypt_dict.get('child_count', 0), + json.dumps(decrypt_dict) + )) + +def ingest_topology_links(cursor, decrypt_dict: dict): + """Process mesh topology links report""" + reporter_id = decrypt_dict.get('id', 0) + report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \ + if 'time' in decrypt_dict else datetime.now(timezone.utc) + + links = decrypt_dict.get('links', []) + for link in links: + cursor.execute(""" + INSERT INTO mesh_topology_links (reporter_id, report_time, from_mac, to_mac, rssi, link_type, raw_data) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, ( + reporter_id, + report_time, + link.get('from_mac', link.get('parent', '')), + link.get('to_mac', link.get('child', '')), + link.get('rssi'), + link.get('type', 'parent-child'), + json.dumps(link) + )) + def process_message_data(cursor, body, is_mesh, topic): global MACS2id, ids2MAC @@ -1132,7 +1269,45 @@ def process_message_data(cursor, body, is_mesh, topic): if len(decrypt_data) > 14: - if "data_base64" in decrypt_data: + # Handle mesh messages that are directly JSON (not double-wrapped) + if is_mesh and decrypt_data.startswith('{') and "data_base64" not in decrypt_data: + try: + decrypt_dict = json.loads(decrypt_data) + if "/full" in topic: + ingest_ble_full_report(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested BLE full report: {len(decrypt_dict.get('devices', []))} devices from reporter {decrypt_dict.get('id')}") + elif "/delta" in topic: + ingest_ble_delta_report(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested BLE delta report: {len(decrypt_dict.get('changes', []))} changes from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/summary" in topic: + ingest_topology_summary(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested topology summary from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/node/" in topic: + ingest_topology_node(cursor, decrypt_dict, topic) + cursor.connection.commit() + logger.debug(f"Ingested topology node from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/links" in topic: + ingest_topology_links(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested topology links from reporter {decrypt_dict.get('id')}") + elif "mesh/ble_alarm/" in topic: + ingest_ble_alarm_event(cursor, decrypt_dict) + cursor.connection.commit() + alarm_state = decrypt_dict.get('curr_a_state', decrypt_dict.get('a_state')) + logger.info(f"Ingested BLE alarm from reporter {decrypt_dict.get('id')}: mac={decrypt_dict.get('mac')} state={alarm_state}") + return "", {} # Mesh messages handled, no sensor data to return + except Exception as mesh_err: + logger.error(f"Error ingesting direct mesh data for topic {topic}: {mesh_err}") + try: + cursor.connection.rollback() + except Exception: + pass + return "", {} + + elif "data_base64" in decrypt_data: data = json.loads(decrypt_data) payload = data["data_base64"] decrypt_data = base64.b64decode(payload) @@ -1140,46 +1315,44 @@ def process_message_data(cursor, body, is_mesh, topic): # print(decrypt_data) if is_mesh: decrypt_dict = json.loads(decrypt_data) - if "/full" in topic: - #{'id': 704, 'time': 1768070305983, 'seq': 8662, 'type': 'full', 'packet': 2, 'packets': 2, 'dev_count': 14, - # 'devices': [ - # {'mac': '44:AC:AE:BD:95:22', 'rssi': -89, 'a_state': 255, 'time': 1768070302720}, - # {'mac': '6D:B9:B5:A5:0D:C6', 'rssi': -62, 'a_state': 255, 'time': 1768070303820}, - # {'mac': '38:CB:8F:DC:A8:13', 'rssi': -81, 'a_state': 255, 'time': 1768070252414}, - # {'mac': 'A4:C1:38:BE:3D:94', 'rssi': -77, 'a_state': 255, 'time': 1768070300456, 'name': 'NVIDIA SHIELD Remote'}, - # {'mac': '1C:67:76:B7:B5:5D', 'rssi': -83, 'a_state': 255, 'time': 1768070304506}, - # {'mac': 'E2:25:91:41:CB:4C', 'rssi': -71, 'a_state': 255, 'time': 1768070303194}, - # {'mac': 'EF:A5:A3:05:81:B4', 'rssi': -83, 'a_state': 255, 'time': 1768070206193, 'name': '20123099999'}, - # {'mac': '15:E9:FB:C0:75:04', 'rssi': -63, 'a_state': 255, 'time': 1768070304705}, - # {'mac': '50:21:8C:20:35:27', 'rssi': -96, 'a_state': 255, 'time': 1768070257700}, - # {'mac': '64:B7:08:89:08:9A', 'rssi': -97, 'a_state': 255, 'time': 1768070236446, 'name': 'WP_238_890898'}, - # {'mac': '33:C1:E9:DC:28:03', 'rssi': -85, 'a_state': 255, 'time': 1768070303919}, - # {'mac': 'BC:57:29:05:EF:CF', 'rssi': -79, 'a_state': 255, 'time': 1768070300787, 'name': 'KBPro_472435'}, - # {'mac': 'E0:92:95:25:C2:90', 'rssi': -89, 'a_state': 255, 'time': 1768070304907}, - # {'mac': '78:03:D9:36:82:4A', 'rssi': -96, 'a_state': 255, 'time': 1768070304915}]} - devices = decrypt_dict["devices"] - for device in devices: - print(device) - #upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int) - ble_id = get_or_create_device(device['mac']) - sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) - upsert_sighting( - ble_id=ble_id, - rssi=device['rssi'], - alarm_state=device.get('a_state', 0), - sighting_time=sighting_time, - reporter_id=decrypt_dict['id'] - ) - - print(decrypt_dict) - elif "/delta" in topic: - print(decrypt_dict) - elif "mesh/topology/summary" in topic: - print(decrypt_dict) - elif "mesh/topology/node/" in topic: - print(decrypt_dict) - elif "mesh/topology/links/" in topic: - print(decrypt_dict) + try: + if "/full" in topic: + # BLE full report with device list + ingest_ble_full_report(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested BLE full report: {len(decrypt_dict.get('devices', []))} devices from reporter {decrypt_dict.get('id')}") + elif "/delta" in topic: + # BLE delta report with changes only + ingest_ble_delta_report(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested BLE delta report: {len(decrypt_dict.get('changes', []))} changes from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/summary" in topic: + # Mesh topology summary + ingest_topology_summary(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested topology summary from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/node/" in topic: + # Individual mesh node report + ingest_topology_node(cursor, decrypt_dict, topic) + cursor.connection.commit() + logger.debug(f"Ingested topology node from reporter {decrypt_dict.get('id')}") + elif "mesh/topology/links" in topic: + # Mesh topology links + ingest_topology_links(cursor, decrypt_dict) + cursor.connection.commit() + logger.debug(f"Ingested topology links from reporter {decrypt_dict.get('id')}") + elif "mesh/ble_alarm/" in topic: + # Direct BLE alarm event + ingest_ble_alarm_event(cursor, decrypt_dict) + cursor.connection.commit() + alarm_state = decrypt_dict.get('curr_a_state', decrypt_dict.get('a_state')) + logger.info(f"Ingested BLE alarm from reporter {decrypt_dict.get('id')}: mac={decrypt_dict.get('mac')} state={alarm_state}") + except Exception as mesh_err: + logger.error(f"Error ingesting mesh data for topic {topic}: {mesh_err}") + try: + cursor.connection.rollback() + except Exception: + pass else: