Fixed alarm table entry

This commit is contained in:
NucBox_EVO-X2\robert 2026-01-11 19:49:35 -08:00
parent 375096e303
commit 8ed841afd3

View File

@ -877,6 +877,7 @@ def GetLastPointQuery(device_id, field_name, threshold_value):
return query
def GetIdFromMAC(cursor, MAC):
query = ""
try:
query = "SELECT device_id FROM devices WHERE device_mac = '" + MAC + "'"
@ -898,6 +899,11 @@ def GetIdFromMAC(cursor, MAC):
except Exception as e:
logger.error(f"Error in GetIdFromMAC sql= {query} {e}")
# Rollback the failed transaction to allow subsequent queries to work
try:
cursor.connection.rollback()
except Exception:
pass
return 0
def GetLastDetected(device_id, field_name, threshold_value):
@ -988,7 +994,10 @@ def UpdateLastSeen(new_message_dict):
component = radar_threshold_signal.split("_")[0]
radar_val = 0
if component == "s28":
if new_message_dict['radar'][1][0] > 0:
radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0]
else:
radar_val = 0
elif component[0] == "s":
component_index = ast.literal_eval(component[1])
if new_message_dict['radar'][1][0] > 0:
@ -1017,63 +1026,69 @@ def UpdateLastSeen(new_message_dict):
logger.error(f"Error: {traceback.format_exc()}")
logger.error(f"UpdateLastSeen failed: {e}")
def get_or_create_device(mac: str) -> int:
"""Get ble_id, creating device if needed"""
def is_random_mac(mac: str) -> bool:
"""Check if MAC address is a random/private address (bit 1 of first octet is set)"""
try:
first_octet = int(mac.split(':')[0], 16)
return bool(first_octet & 0x02) # Check locally administered bit
except (ValueError, IndexError):
return False
def get_or_create_device(cursor, mac: str, rssi: int = None, device_name: str = None, reporter_id: int = None) -> int:
"""Get ble_id, creating device if needed, optionally updating last_rssi, device_name, last_reporter"""
sql = """
INSERT INTO ble_devices (mac_address)
VALUES (%s)
ON CONFLICT (mac_address) DO UPDATE SET mac_address = EXCLUDED.mac_address
INSERT INTO ble_devices (mac_address, last_rssi, device_name, last_reporter)
VALUES (%(mac)s, %(rssi)s, %(name)s, %(reporter)s)
ON CONFLICT (mac_address) DO UPDATE SET
last_seen = NOW(),
last_rssi = COALESCE(EXCLUDED.last_rssi, ble_devices.last_rssi),
device_name = COALESCE(EXCLUDED.device_name, ble_devices.device_name),
last_reporter = COALESCE(EXCLUDED.last_reporter, ble_devices.last_reporter)
RETURNING ble_id
"""
with conn.cursor() as cursor: # or your db connection variable
cursor.execute(sql, (mac,))
conn.commit() # needed for INSERT
cursor.execute(sql, {'mac': mac, 'rssi': rssi, 'name': device_name, 'reporter': reporter_id})
return cursor.fetchone()[0]
def ingest_ble_report(topic: str, payload: dict):
"""Process both full and delta reports identically"""
reporter_id = payload['id']
report_time = datetime.fromtimestamp(payload['time'] / 1000, tz=timezone.utc)
def insert_ble_alarm(cursor, ble_id: int, rssi: int, prev_alarm: int, curr_alarm: int, event_time: datetime, reporter_id: int):
"""Insert alarm event into ble_alarms table"""
cursor.execute("""
INSERT INTO ble_alarms (ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id)
VALUES (%s, %s, %s, %s, %s, %s)
""", (ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id))
devices = payload.get('devices', []) if payload['type'] == 'full' else payload.get('changes', [])
def ingest_ble_alarm_event(cursor, decrypt_dict: dict):
"""Process direct BLE alarm event from mesh/ble_alarm/ topic
Format 1: {"id":703,"time":1768183818208,"seq":5,"type":"alarm","mac":"F0:CA:2A:62:00:83","rssi":-73,"prev_a_state":0,"curr_a_state":1}
Format 2: {"id":701,"time":1768185508583,"mac":"F0:CA:2A:62:00:83","name":"K620083","a_state":1,"event":"panic_button"}
"""
reporter_id = decrypt_dict['id']
event_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc)
mac = decrypt_dict['mac']
rssi = decrypt_dict.get('rssi', -100)
device_name = decrypt_dict.get('name')
# Handle both message formats: curr_a_state/prev_a_state or just a_state
if 'curr_a_state' in decrypt_dict:
curr_alarm = decrypt_dict.get('curr_a_state', 0)
prev_alarm = decrypt_dict.get('prev_a_state', 0)
else:
curr_alarm = decrypt_dict.get('a_state', 0)
prev_alarm = 0 # Not available in format 2
for dev in devices:
mac = dev['mac']
rssi = dev['rssi']
alarm_state = dev.get('a_state', 0)
# Get or create the device
ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id)
sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \
if 'time' in dev else report_time
# Always insert alarm events from this topic (they are explicit alarm notifications)
insert_ble_alarm(cursor, ble_id, rssi, prev_alarm, curr_alarm, event_time, reporter_id)
if is_random_mac(mac):
# Option A: Store in separate short-lived table
execute("""
INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id)
VALUES (%s, %s, %s, %s, %s)
""", (mac, rssi, alarm_state, sighting_time, reporter_id))
continue # Skip main device table
# Also update the sighting
upsert_sighting(cursor, ble_id, rssi, curr_alarm, event_time, reporter_id)
# Option B: Just skip entirely
# continue
device_name = dev.get('name') # Only in full reports
ble_id = get_or_create_device(mac)
# Full reports have per-device timestamp, deltas use report time
sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \
if 'time' in dev else report_time
# 1. Insert to history (always)
insert_sighting(mac, rssi, alarm_state, device_name, sighting_time, reporter_id)
# 2. Upsert current state
upsert_device_state(mac, rssi, alarm_state, device_name, sighting_time, reporter_id)
def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int):
def upsert_sighting(cursor, ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int):
"""Single query: update if no change, insert if changed"""
RSSI_THRESHOLD = 5
execute("""
cursor.execute("""
WITH latest AS (
SELECT id, rssi, alarm_state
FROM ble_sightings
@ -1107,6 +1122,128 @@ def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: dat
'threshold': RSSI_THRESHOLD
})
def insert_random_sighting(cursor, mac: str, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int):
"""Insert sighting for random/private MAC addresses"""
cursor.execute("""
INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id)
VALUES (%s, %s, %s, %s, %s)
""", (mac, rssi, alarm_state, sighting_time, reporter_id))
def ingest_ble_full_report(cursor, decrypt_dict: dict):
"""Process BLE full report with device list"""
reporter_id = decrypt_dict['id']
report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc)
devices = decrypt_dict.get('devices', [])
for device in devices:
mac = device['mac']
rssi = device.get('rssi', device.get('rssi_raw', -100))
alarm_state = device.get('a_state', 0)
device_name = device.get('name') # Only present in full reports
sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) \
if 'time' in device else report_time
if is_random_mac(mac):
insert_random_sighting(cursor, mac, rssi, alarm_state, sighting_time, reporter_id)
else:
ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id)
upsert_sighting(cursor, ble_id, rssi, alarm_state, sighting_time, reporter_id)
# Insert alarm if alarm_state indicates an active alarm (not 0 or 255)
if alarm_state not in (0, 255):
# Full reports don't have prev_a_state, use 0 as default
insert_ble_alarm(cursor, ble_id, rssi, 0, alarm_state, sighting_time, reporter_id)
def ingest_ble_delta_report(cursor, decrypt_dict: dict):
"""Process BLE delta report with changes only"""
reporter_id = decrypt_dict['id']
report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc)
changes = decrypt_dict.get('changes', [])
for device in changes:
mac = device['mac']
rssi = device.get('rssi', device.get('rssi_raw', -100))
alarm_state = device.get('a_state', 0)
device_name = device.get('name') # May be present in delta too
sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) \
if 'time' in device else report_time
if is_random_mac(mac):
insert_random_sighting(cursor, mac, rssi, alarm_state, sighting_time, reporter_id)
else:
ble_id = get_or_create_device(cursor, mac, rssi, device_name, reporter_id)
upsert_sighting(cursor, ble_id, rssi, alarm_state, sighting_time, reporter_id)
# Insert alarm if alarm_state indicates an active alarm (not 0 or 255)
if alarm_state not in (0, 255):
# Delta reports don't have prev_a_state, use 0 as default
insert_ble_alarm(cursor, ble_id, rssi, 0, alarm_state, sighting_time, reporter_id)
def ingest_topology_summary(cursor, decrypt_dict: dict):
"""Process mesh topology summary message"""
reporter_id = decrypt_dict.get('id', 0)
report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \
if 'time' in decrypt_dict else datetime.now(timezone.utc)
cursor.execute("""
INSERT INTO mesh_topology_summary (reporter_id, report_time, node_count, root_mac, layer_counts, raw_data)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
reporter_id,
report_time,
decrypt_dict.get('node_count'),
decrypt_dict.get('root_mac'),
json.dumps(decrypt_dict.get('layers', {})),
json.dumps(decrypt_dict)
))
def ingest_topology_node(cursor, decrypt_dict: dict, topic: str):
"""Process individual mesh node report"""
reporter_id = decrypt_dict.get('id', 0)
report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \
if 'time' in decrypt_dict else datetime.now(timezone.utc)
# Extract node MAC from topic if present (e.g., mesh/topology/node/AA:BB:CC:DD:EE:FF)
node_mac = decrypt_dict.get('mac', '')
if not node_mac and '/node/' in topic:
parts = topic.split('/node/')
if len(parts) > 1:
node_mac = parts[1].split('/')[0]
cursor.execute("""
INSERT INTO mesh_topology_nodes (reporter_id, report_time, node_mac, parent_mac, layer, rssi, is_root, child_count, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
reporter_id,
report_time,
node_mac,
decrypt_dict.get('parent_mac'),
decrypt_dict.get('layer'),
decrypt_dict.get('rssi'),
decrypt_dict.get('is_root', False),
decrypt_dict.get('child_count', 0),
json.dumps(decrypt_dict)
))
def ingest_topology_links(cursor, decrypt_dict: dict):
"""Process mesh topology links report"""
reporter_id = decrypt_dict.get('id', 0)
report_time = datetime.fromtimestamp(decrypt_dict['time'] / 1000, tz=timezone.utc) \
if 'time' in decrypt_dict else datetime.now(timezone.utc)
links = decrypt_dict.get('links', [])
for link in links:
cursor.execute("""
INSERT INTO mesh_topology_links (reporter_id, report_time, from_mac, to_mac, rssi, link_type, raw_data)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (
reporter_id,
report_time,
link.get('from_mac', link.get('parent', '')),
link.get('to_mac', link.get('child', '')),
link.get('rssi'),
link.get('type', 'parent-child'),
json.dumps(link)
))
def process_message_data(cursor, body, is_mesh, topic):
global MACS2id, ids2MAC
@ -1132,7 +1269,45 @@ def process_message_data(cursor, body, is_mesh, topic):
if len(decrypt_data) > 14:
if "data_base64" in decrypt_data:
# Handle mesh messages that are directly JSON (not double-wrapped)
if is_mesh and decrypt_data.startswith('{') and "data_base64" not in decrypt_data:
try:
decrypt_dict = json.loads(decrypt_data)
if "/full" in topic:
ingest_ble_full_report(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested BLE full report: {len(decrypt_dict.get('devices', []))} devices from reporter {decrypt_dict.get('id')}")
elif "/delta" in topic:
ingest_ble_delta_report(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested BLE delta report: {len(decrypt_dict.get('changes', []))} changes from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/summary" in topic:
ingest_topology_summary(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested topology summary from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/node/" in topic:
ingest_topology_node(cursor, decrypt_dict, topic)
cursor.connection.commit()
logger.debug(f"Ingested topology node from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/links" in topic:
ingest_topology_links(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested topology links from reporter {decrypt_dict.get('id')}")
elif "mesh/ble_alarm/" in topic:
ingest_ble_alarm_event(cursor, decrypt_dict)
cursor.connection.commit()
alarm_state = decrypt_dict.get('curr_a_state', decrypt_dict.get('a_state'))
logger.info(f"Ingested BLE alarm from reporter {decrypt_dict.get('id')}: mac={decrypt_dict.get('mac')} state={alarm_state}")
return "", {} # Mesh messages handled, no sensor data to return
except Exception as mesh_err:
logger.error(f"Error ingesting direct mesh data for topic {topic}: {mesh_err}")
try:
cursor.connection.rollback()
except Exception:
pass
return "", {}
elif "data_base64" in decrypt_data:
data = json.loads(decrypt_data)
payload = data["data_base64"]
decrypt_data = base64.b64decode(payload)
@ -1140,46 +1315,44 @@ def process_message_data(cursor, body, is_mesh, topic):
# print(decrypt_data)
if is_mesh:
decrypt_dict = json.loads(decrypt_data)
try:
if "/full" in topic:
#{'id': 704, 'time': 1768070305983, 'seq': 8662, 'type': 'full', 'packet': 2, 'packets': 2, 'dev_count': 14,
# 'devices': [
# {'mac': '44:AC:AE:BD:95:22', 'rssi': -89, 'a_state': 255, 'time': 1768070302720},
# {'mac': '6D:B9:B5:A5:0D:C6', 'rssi': -62, 'a_state': 255, 'time': 1768070303820},
# {'mac': '38:CB:8F:DC:A8:13', 'rssi': -81, 'a_state': 255, 'time': 1768070252414},
# {'mac': 'A4:C1:38:BE:3D:94', 'rssi': -77, 'a_state': 255, 'time': 1768070300456, 'name': 'NVIDIA SHIELD Remote'},
# {'mac': '1C:67:76:B7:B5:5D', 'rssi': -83, 'a_state': 255, 'time': 1768070304506},
# {'mac': 'E2:25:91:41:CB:4C', 'rssi': -71, 'a_state': 255, 'time': 1768070303194},
# {'mac': 'EF:A5:A3:05:81:B4', 'rssi': -83, 'a_state': 255, 'time': 1768070206193, 'name': '20123099999'},
# {'mac': '15:E9:FB:C0:75:04', 'rssi': -63, 'a_state': 255, 'time': 1768070304705},
# {'mac': '50:21:8C:20:35:27', 'rssi': -96, 'a_state': 255, 'time': 1768070257700},
# {'mac': '64:B7:08:89:08:9A', 'rssi': -97, 'a_state': 255, 'time': 1768070236446, 'name': 'WP_238_890898'},
# {'mac': '33:C1:E9:DC:28:03', 'rssi': -85, 'a_state': 255, 'time': 1768070303919},
# {'mac': 'BC:57:29:05:EF:CF', 'rssi': -79, 'a_state': 255, 'time': 1768070300787, 'name': 'KBPro_472435'},
# {'mac': 'E0:92:95:25:C2:90', 'rssi': -89, 'a_state': 255, 'time': 1768070304907},
# {'mac': '78:03:D9:36:82:4A', 'rssi': -96, 'a_state': 255, 'time': 1768070304915}]}
devices = decrypt_dict["devices"]
for device in devices:
print(device)
#upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int)
ble_id = get_or_create_device(device['mac'])
sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc)
upsert_sighting(
ble_id=ble_id,
rssi=device['rssi'],
alarm_state=device.get('a_state', 0),
sighting_time=sighting_time,
reporter_id=decrypt_dict['id']
)
print(decrypt_dict)
# BLE full report with device list
ingest_ble_full_report(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested BLE full report: {len(decrypt_dict.get('devices', []))} devices from reporter {decrypt_dict.get('id')}")
elif "/delta" in topic:
print(decrypt_dict)
# BLE delta report with changes only
ingest_ble_delta_report(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested BLE delta report: {len(decrypt_dict.get('changes', []))} changes from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/summary" in topic:
print(decrypt_dict)
# Mesh topology summary
ingest_topology_summary(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested topology summary from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/node/" in topic:
print(decrypt_dict)
elif "mesh/topology/links/" in topic:
print(decrypt_dict)
# Individual mesh node report
ingest_topology_node(cursor, decrypt_dict, topic)
cursor.connection.commit()
logger.debug(f"Ingested topology node from reporter {decrypt_dict.get('id')}")
elif "mesh/topology/links" in topic:
# Mesh topology links
ingest_topology_links(cursor, decrypt_dict)
cursor.connection.commit()
logger.debug(f"Ingested topology links from reporter {decrypt_dict.get('id')}")
elif "mesh/ble_alarm/" in topic:
# Direct BLE alarm event
ingest_ble_alarm_event(cursor, decrypt_dict)
cursor.connection.commit()
alarm_state = decrypt_dict.get('curr_a_state', decrypt_dict.get('a_state'))
logger.info(f"Ingested BLE alarm from reporter {decrypt_dict.get('id')}: mac={decrypt_dict.get('mac')} state={alarm_state}")
except Exception as mesh_err:
logger.error(f"Error ingesting mesh data for topic {topic}: {mesh_err}")
try:
cursor.connection.rollback()
except Exception:
pass
else: