Fixed events storing to DB on time

This commit is contained in:
RZ_MINIX\rober 2025-06-26 12:57:50 -07:00
parent ef9bc7f17f
commit 60ffb1cfe6

View File

@ -524,8 +524,13 @@ def process_messages(connection, db_conn):
if data_type in data_batches: if data_type in data_batches:
data_batches[data_type].append(parsed_data) data_batches[data_type].append(parsed_data)
batch_size = len(data_batches[data_type]) batch_size = len(data_batches[data_type])
#logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}")
#if data_type is not "sensors" or "radar", it is event, so store it without waiting
if data_type in ["light", "pressure", "temperature", "humidity"]:
batch_size = BATCH_SIZE
#logger.info(f"Added to {data_type} batch. Size[radar]: {len(data_batches["radar"])} Size[sensors]: {len(data_batches["sensors"])} mlen: {len(msg.body)}")
#logger.info(f"batch_size = {batch_size}")
if batch_size >= BATCH_SIZE: if batch_size >= BATCH_SIZE:
#logger.info(f"Processing {data_type} batch") #logger.info(f"Processing {data_type} batch")
@ -547,7 +552,7 @@ def process_messages(connection, db_conn):
#print(f"Error writing pickle file: {e}") #print(f"Error writing pickle file: {e}")
#raise # Re-raise the exception for handling upstream. #raise # Re-raise the exception for handling upstream.
logger.info("insert_into_timescaledb_My")
insert_into_timescaledb_My(cursor, data_type, data_batches[data_type]) insert_into_timescaledb_My(cursor, data_type, data_batches[data_type])
db_conn.commit() db_conn.commit()
data_batches[data_type].clear() data_batches[data_type].clear()
@ -799,11 +804,11 @@ def insert_into_timescaledb_My(cursor, data_type, data_batch):
elif data_type in ['temperature', 'humidity', 'pressure', 'light']: elif data_type in ['temperature', 'humidity', 'pressure', 'light']:
query = f""" query = f"""
INSERT INTO sensor_readings (time, device_id, {data_type}) INSERT INTO sensor_readings (time, device_id, {data_type}, mtype)
VALUES %s VALUES %s
""" """
values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'], values = [(datetime.fromtimestamp(row['time'], tz=timezone.utc), row['device_id'],
row.get(data_type)) for row in data_batch] row.get(data_type), row.get('mtype')) for row in data_batch]
else: else:
raise ValueError(f"Unknown data type: {data_type}") raise ValueError(f"Unknown data type: {data_type}")
@ -1108,6 +1113,7 @@ def process_message_data(cursor, body):
new_message_dict["smell"] = S new_message_dict["smell"] = S
elif message_type == 1: #pressure event elif message_type == 1: #pressure event
m_type = "pressure" m_type = "pressure"
print(f"{m_type} event detected")
#print("@7",time.time()-st) #print("@7",time.time()-st)
pointer = pointer + lenn_1 pointer = pointer + lenn_1
lenn = 4 lenn = 4
@ -1116,6 +1122,7 @@ def process_message_data(cursor, body):
pointer = pointer + lenn pointer = pointer + lenn
elif message_type == 2: #temperature event elif message_type == 2: #temperature event
m_type = "temperature" m_type = "temperature"
print(f"{m_type} event detected")
pointer = pointer + lenn_1 pointer = pointer + lenn_1
lenn = 4 lenn = 4
T = struct.unpack('<f', decrypt_data[pointer:pointer+lenn])[0] T = struct.unpack('<f', decrypt_data[pointer:pointer+lenn])[0]
@ -1123,6 +1130,7 @@ def process_message_data(cursor, body):
pointer = pointer + lenn pointer = pointer + lenn
elif message_type == 3: #humidity event elif message_type == 3: #humidity event
m_type = "humidity" m_type = "humidity"
print(f"{m_type} event detected")
#print("@12",time.time()-st) #print("@12",time.time()-st)
pointer = pointer + lenn_1 pointer = pointer + lenn_1
lenn = 4 lenn = 4
@ -1131,7 +1139,7 @@ def process_message_data(cursor, body):
pointer = pointer + lenn pointer = pointer + lenn
elif message_type == 4: #light event elif message_type == 4: #light event
m_type = "light" m_type = "light"
#print("@15",time.time()-st) print(f"{m_type} event detected")
pointer = pointer + lenn_1 pointer = pointer + lenn_1
lenn = 2 lenn = 2
L = struct.unpack('<H', decrypt_data[pointer:pointer+lenn])[0] L = struct.unpack('<H', decrypt_data[pointer:pointer+lenn])[0]