diff --git a/queue-muncher.yml b/queue-muncher.yml index 8e9411d..f7edbdb 100644 --- a/queue-muncher.yml +++ b/queue-muncher.yml @@ -12,7 +12,7 @@ functions: read_timeout: "1h" write_timeout: "1h" exec_timeout: "1h" - + max_inflight: 16 # RabbitMQ configuration RABBITMQ_URL: "amqp://well_pipe:wellnuo_2024@192.168.68.70:5672//" RABBITMQ_QUEUE: "evgrid-dev-demo" diff --git a/src/app.py b/src/app.py index 688de40..21ae5b5 100644 --- a/src/app.py +++ b/src/app.py @@ -78,6 +78,9 @@ PROCESSING_PERIOD = int(os.getenv('PROCESSING_PERIOD', '300')) # 5 minutes QUEUE_LENGTH_THRESHOLD = int(os.getenv('QUEUE_LENGTH_THRESHOLD', '1000')) redis_host = os.getenv('REDIS_HOST', '192.168.68.70') redis_host = '192.168.68.70' +#in debugger: +#redis_host = '127.0.0.1' + cache_dir = "cache" MQTTSERVER = os.getenv('MQTTSERVER', "mqtt.eluxnetworks.net") #"mqtt.eluxnetworks.net" mqtt_port = int(os.getenv('MQTT_PORT', "8883")) # TLS port @@ -98,12 +101,10 @@ logger.info(f"DB_HOST= {DB_HOST}") logger.info(f"DB_PORT= {DB_PORT}") logger.info(f"PROCESSING_PERIOD= {PROCESSING_PERIOD}") logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") -logger.info(f"QUEUE_LENGTH_THRESHOLD= {QUEUE_LENGTH_THRESHOLD}") logger.info(f"BATCH_SIZE= {BATCH_SIZE}") logger.info(f"mqtt_port= {mqtt_port}") logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") -logger.info(f"MQTT_CLIENT_ID= {mqtt_client_id}") logger.info(f"QUEUE_NAME= {QUEUE_NAME}") logger.info(f"RABBITMQ_URL= {RABBITMQ_URL}") @@ -260,7 +261,7 @@ def connect_mqtt(client_id): if rc == 0: connected = True print(f"Connected to {MQTTSERVER} with {unique_client_id}") - subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1)] + subbedTo = [("/w",1),("wellnuotopics/topic1",1),("/faws",1),("mesh/#",1)] print("subscribing to ", subbedTo) client.subscribe(subbedTo) else: @@ -462,7 +463,7 @@ def process_messages(connection, db_conn): # Basic QoS with all required parameters channel.basic_qos( prefetch_size=0, - prefetch_count=1, + prefetch_count=100, a_global=False ) logger.info("Basic QoS set") @@ -500,6 +501,11 @@ def process_messages(connection, db_conn): #logger.info(f"Properties: {msg.properties}") #logger.info(f"Body type: {type(msg.body)}") #logger.info(f"Body length: {len(msg.body) if msg.body else 0}") + is_mesh = False + topic = "" + if "mesh" in msg.body: + topic = json.loads(msg.body)["subject"] + is_mesh = True if isinstance(msg.body, (bytes, bytearray)): hex_dump = ' '.join(f'{b:02x}' for b in msg.body[:32]) @@ -517,7 +523,7 @@ def process_messages(connection, db_conn): with PROCESSING_TIME.time(): with db_conn.cursor() as cursor: - data_type, parsed_data = process_message_data(cursor, data) + data_type, parsed_data = process_message_data(cursor, data, is_mesh, topic) if data_type and parsed_data: #logger.info(f"Successfully processed as {data_type}") @@ -527,6 +533,8 @@ def process_messages(connection, db_conn): parsed_data_mqtt["temperature"] = parsed_data["temperature"] + temperature_offset parsed_data_mqtt["humidity"] = parsed_data["humidity"] + humidity_offset + if parsed_data["device_id"] == 679: + print("stop") MQSend("/"+ids2MAC[parsed_data["device_id"]], json.dumps(parsed_data_mqtt)) if data_type in data_batches: data_batches[data_type].append(parsed_data) @@ -903,7 +911,8 @@ def GetLastDetected(device_id, field_name, threshold_value): cur.execute(query) record = cur.fetchone() if record != None: - last_detected = cur.fetchone()[0].timestamp() + last_detected = record[0].timestamp() + #last_detected = cur.fetchone()[0].timestamp() return last_detected @@ -982,10 +991,16 @@ def UpdateLastSeen(new_message_dict): radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0] elif component[0] == "s": component_index = ast.literal_eval(component[1]) - radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + if new_message_dict['radar'][1][0] > 0: + radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0] + else: + radar_val = 0 elif component[0] == "m": component_index = ast.literal_eval(component[1]) - radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + if new_message_dict['radar'][0][0] > 0: + radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0] + else: + radar_val = 0 if radar_val > radar_threshold_value: #seen now r.set('lastseen_'+device_id_s, new_message_dict['time']) @@ -1002,7 +1017,97 @@ def UpdateLastSeen(new_message_dict): logger.error(f"Error: {traceback.format_exc()}") logger.error(f"UpdateLastSeen failed: {e}") -def process_message_data(cursor, body): +def get_or_create_device(mac: str) -> int: + """Get ble_id, creating device if needed""" + sql = """ + INSERT INTO ble_devices (mac_address) + VALUES (%s) + ON CONFLICT (mac_address) DO UPDATE SET mac_address = EXCLUDED.mac_address + RETURNING ble_id + """ + with conn.cursor() as cursor: # or your db connection variable + cursor.execute(sql, (mac,)) + conn.commit() # needed for INSERT + return cursor.fetchone()[0] + +def ingest_ble_report(topic: str, payload: dict): + """Process both full and delta reports identically""" + reporter_id = payload['id'] + report_time = datetime.fromtimestamp(payload['time'] / 1000, tz=timezone.utc) + + devices = payload.get('devices', []) if payload['type'] == 'full' else payload.get('changes', []) + + + for dev in devices: + mac = dev['mac'] + rssi = dev['rssi'] + alarm_state = dev.get('a_state', 0) + + sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \ + if 'time' in dev else report_time + + if is_random_mac(mac): + # Option A: Store in separate short-lived table + execute(""" + INSERT INTO ble_random_sightings (mac_address, rssi, alarm_state, sighting_time, reporter_id) + VALUES (%s, %s, %s, %s, %s) + """, (mac, rssi, alarm_state, sighting_time, reporter_id)) + continue # Skip main device table + + # Option B: Just skip entirely + # continue + device_name = dev.get('name') # Only in full reports + ble_id = get_or_create_device(mac) + # Full reports have per-device timestamp, deltas use report time + sighting_time = datetime.fromtimestamp(dev['time'] / 1000, tz=timezone.utc) \ + if 'time' in dev else report_time + + # 1. Insert to history (always) + insert_sighting(mac, rssi, alarm_state, device_name, sighting_time, reporter_id) + + # 2. Upsert current state + upsert_device_state(mac, rssi, alarm_state, device_name, sighting_time, reporter_id) + +def upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int): + """Single query: update if no change, insert if changed""" + + RSSI_THRESHOLD = 5 + + execute(""" + WITH latest AS ( + SELECT id, rssi, alarm_state + FROM ble_sightings + WHERE ble_id = %(ble_id)s + ORDER BY last_seen DESC + LIMIT 1 + ), + updated AS ( + UPDATE ble_sightings s + SET last_seen = %(time)s, reporter_id = %(reporter)s + FROM latest l + WHERE s.id = l.id + AND abs(l.rssi - %(rssi)s) < %(threshold)s + AND l.alarm_state = %(alarm)s + RETURNING s.id + ) + INSERT INTO ble_sightings (ble_id, rssi, alarm_state, first_seen, last_seen, reporter_id) + SELECT %(ble_id)s, %(rssi)s, %(alarm)s, %(time)s, %(time)s, %(reporter)s + WHERE NOT EXISTS (SELECT 1 FROM updated) + AND NOT EXISTS ( + SELECT 1 FROM latest l + WHERE abs(l.rssi - %(rssi)s) < %(threshold)s + AND l.alarm_state = %(alarm)s + ) + """, { + 'ble_id': ble_id, + 'rssi': rssi, + 'alarm': alarm_state, + 'time': sighting_time, + 'reporter': reporter_id, + 'threshold': RSSI_THRESHOLD + }) + +def process_message_data(cursor, body, is_mesh, topic): global MACS2id, ids2MAC """Process a single message from the queue.""" @@ -1012,6 +1117,8 @@ def process_message_data(cursor, body): else: data = json.loads(body) + #print(decrypt_data) + if "data_base64" in data: decrypt_data = data["data_base64"] @@ -1022,179 +1129,225 @@ def process_message_data(cursor, body): decrypt_data = decrypt_data.decode('utf-8') m_type = 0 new_message_dict = {} - #print(decrypt_data) + if len(decrypt_data) > 14: if "data_base64" in decrypt_data: data = json.loads(decrypt_data) payload = data["data_base64"] decrypt_data = base64.b64decode(payload) + #if data["subject"] != "wellnuotopics/topic1": + # print(decrypt_data) + if is_mesh: + decrypt_dict = json.loads(decrypt_data) + if "/full" in topic: + #{'id': 704, 'time': 1768070305983, 'seq': 8662, 'type': 'full', 'packet': 2, 'packets': 2, 'dev_count': 14, + # 'devices': [ + # {'mac': '44:AC:AE:BD:95:22', 'rssi': -89, 'a_state': 255, 'time': 1768070302720}, + # {'mac': '6D:B9:B5:A5:0D:C6', 'rssi': -62, 'a_state': 255, 'time': 1768070303820}, + # {'mac': '38:CB:8F:DC:A8:13', 'rssi': -81, 'a_state': 255, 'time': 1768070252414}, + # {'mac': 'A4:C1:38:BE:3D:94', 'rssi': -77, 'a_state': 255, 'time': 1768070300456, 'name': 'NVIDIA SHIELD Remote'}, + # {'mac': '1C:67:76:B7:B5:5D', 'rssi': -83, 'a_state': 255, 'time': 1768070304506}, + # {'mac': 'E2:25:91:41:CB:4C', 'rssi': -71, 'a_state': 255, 'time': 1768070303194}, + # {'mac': 'EF:A5:A3:05:81:B4', 'rssi': -83, 'a_state': 255, 'time': 1768070206193, 'name': '20123099999'}, + # {'mac': '15:E9:FB:C0:75:04', 'rssi': -63, 'a_state': 255, 'time': 1768070304705}, + # {'mac': '50:21:8C:20:35:27', 'rssi': -96, 'a_state': 255, 'time': 1768070257700}, + # {'mac': '64:B7:08:89:08:9A', 'rssi': -97, 'a_state': 255, 'time': 1768070236446, 'name': 'WP_238_890898'}, + # {'mac': '33:C1:E9:DC:28:03', 'rssi': -85, 'a_state': 255, 'time': 1768070303919}, + # {'mac': 'BC:57:29:05:EF:CF', 'rssi': -79, 'a_state': 255, 'time': 1768070300787, 'name': 'KBPro_472435'}, + # {'mac': 'E0:92:95:25:C2:90', 'rssi': -89, 'a_state': 255, 'time': 1768070304907}, + # {'mac': '78:03:D9:36:82:4A', 'rssi': -96, 'a_state': 255, 'time': 1768070304915}]} + devices = decrypt_dict["devices"] + for device in devices: + print(device) + #upsert_sighting(ble_id: int, rssi: int, alarm_state: int, sighting_time: datetime, reporter_id: int) + ble_id = get_or_create_device(device['mac']) + sighting_time = datetime.fromtimestamp(device['time'] / 1000, tz=timezone.utc) + upsert_sighting( + ble_id=ble_id, + rssi=device['rssi'], + alarm_state=device.get('a_state', 0), + sighting_time=sighting_time, + reporter_id=decrypt_dict['id'] + ) - if len(decrypt_data) > 14: - pointer = 0 - lenn = 4 - #print ("") - seconds = struct.unpack(' 0: + print(decrypt_dict) + elif "/delta" in topic: + print(decrypt_dict) + elif "mesh/topology/summary" in topic: + print(decrypt_dict) + elif "mesh/topology/node/" in topic: + print(decrypt_dict) + elif "mesh/topology/links/" in topic: + print(decrypt_dict) - new_message_dict = {"time":packet_time, "device_id":device_id} + else: + if len(decrypt_data) > 14: + pointer = 0 + lenn = 4 + #print ("") + seconds = struct.unpack(' 0: - st = time.time() - if message_type == 16: #Radar message + packet_time = float(seconds+useconds/1000000) + this_time = time.localtime(packet_time) + year = this_time[0] + month = this_time[1] + day = this_time[2] + hour = this_time[3] + minute = this_time[4] - if MAC == "10061C15C330": - print("Stop") + #YMDHm_str = time.strftime('%Y-%m-%d_%H-%M', this_time) + #minute_in_day = 60*hour + minute + #lets populate report structure + #new_message_dict = {"time":packet_time, "temperature":0, "humidity":0, "light":0, "pressure":0, "smell":[], "radar":[]} + if MAC in MACS2id: + device_id = MACS2id[MAC] + else: + device_id = GetIdFromMAC(cursor, MAC) + MACS2id[MAC] = device_id + ids2MAC[device_id] = MAC - m_type = "radar" - #counter, no targets, moving targets, stationary targets, both, 9 moving gates energy - #counter, 9 stationary gates energy - pointer = pointer + lenn_1 - motion_energy = [0] * 14 - stationary_energy = [0] * 10 - #print("@1",time.time()-st) - lenn = 2 - for i in range(14): - V = struct.unpack('= 0x20 and message_type <= 0x2F): #Sensors message - #work_dict = dict(common_dict) - #print("@4",time.time()-st) - m_type = "sensors" - pointer = pointer + lenn_1 - - lenn = 4 - P = struct.unpack(' 0: - #logging.info(str(packet_time)) - #logging.info("P:"+str(P)) - #logging.info("T:"+str(T)) - #logging.info("L:"+str(L)) - #logging.info("H:"+str(H)) - S=[0,0,0,0,0,0,0,0,0,0] - for index in range(dp): + new_message_dict = {"time":packet_time, "device_id":device_id} + + line_entry = str(packet_time) + dcr_l = len(decrypt_data) + lenn_1 = 1 + + message_type = struct.unpack(' 0: + st = time.time() + if message_type == 16: #Radar message + + if MAC == "10061C15C330": + print("Stop") + + m_type = "radar" + #counter, no targets, moving targets, stationary targets, both, 9 moving gates energy + #counter, 9 stationary gates energy + pointer = pointer + lenn_1 + motion_energy = [0] * 14 + stationary_energy = [0] * 10 + #print("@1",time.time()-st) + lenn = 2 + for i in range(14): + V = struct.unpack('= 0x20 and message_type <= 0x2F): #Sensors message + #work_dict = dict(common_dict) + #print("@4",time.time()-st) + m_type = "sensors" + pointer = pointer + lenn_1 + + lenn = 4 + P = struct.unpack(' 100: - P = 100 - new_message_dict["pressure"] = P - pointer = pointer + lenn - elif message_type == 2: #temperature event - m_type = "temperature" - print(f"{m_type} event detected") - pointer = pointer + lenn_1 - lenn = 4 - T = struct.unpack(' 100: + P = 100 + new_message_dict["pressure"] = P + pointer = pointer + lenn + elif message_type == 2: #temperature event + m_type = "temperature" + print(f"{m_type} event detected") + pointer = pointer + lenn_1 + lenn = 4 + T = struct.unpack('