From 60ffb1cfe6a82ee239d228ba56c5c1d7dc9c2b49 Mon Sep 17 00:00:00 2001 From: "RZ_MINIX\\rober" Date: Thu, 26 Jun 2025 12:57:50 -0700 Subject: [PATCH] Fixed events storing to DB on time --- src/app.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/app.py b/src/app.py index aceb214..9021f8c 100644 --- a/src/app.py +++ b/src/app.py @@ -524,8 +524,13 @@ def process_messages(connection, db_conn): if data_type in data_batches: data_batches[data_type].append(parsed_data) batch_size = len(data_batches[data_type]) - #logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}") + #if data_type is not "sensors" or "radar", it is event, so store it without waiting + if data_type in ["light", "pressure", "temperature", "humidity"]: + batch_size = BATCH_SIZE + + #logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}") + #logger.info(f"batch_size = {batch_size}") if batch_size >= BATCH_SIZE: #logger.info(f"Processing {data_type} batch") @@ -547,7 +552,7 @@ def process_messages(connection, db_conn): #print(f"Error writing pickle file: {e}") #raise # Re-raise the exception for handling upstream. - + logger.info("insert_into_timescaledb_My") insert_into_timescaledb_My(cursor, data_type, data_batches[data_type]) db_conn.commit() data_batches[data_type].clear() @@ -799,11 +804,11 @@ def insert_into_timescaledb_My(cursor, data_type, data_batch): elif data_type in ['temperature', 'humidity', 'pressure', 'light']: query = f""" - INSERT INTO sensor_readings (time, device_id, {data_type}) + INSERT INTO sensor_readings (time, device_id, {data_type}, mtype) VALUES %s """ values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], - row.get(data_type)) for row in data_batch] + row.get(data_type), row.get('mtype')) for row in data_batch] else: raise ValueError(f"Unknown data type: {data_type}") @@ -1108,6 +1113,7 @@ def process_message_data(cursor, body): new_message_dict["smell"] = S elif message_type == 1: #pressure event m_type = "pressure" + print(f"{m_type} event detected") #print("@7",time.time()-st) pointer = pointer + lenn_1 lenn = 4 @@ -1116,6 +1122,7 @@ def process_message_data(cursor, body): pointer = pointer + lenn elif message_type == 2: #temperature event m_type = "temperature" + print(f"{m_type} event detected") pointer = pointer + lenn_1 lenn = 4 T = struct.unpack('