545 lines
22 KiB
Python
545 lines
22 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import csv
|
|
import json
|
|
import zipfile
|
|
import argparse
|
|
import datetime
|
|
import logging
|
|
import ast
|
|
import io
|
|
from collections import defaultdict
|
|
from dotenv import load_dotenv
|
|
|
|
# Try to import psycopg2
|
|
try:
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
except ImportError:
|
|
print("Error: psycopg2 module not found. Please install it: pip install psycopg2-binary")
|
|
sys.exit(1)
|
|
|
|
# ==========================================
|
|
# Configuration & Defaults
|
|
# ==========================================
|
|
|
|
load_dotenv()
|
|
|
|
DEFAULTS = {
|
|
'DB_NAME': os.getenv('DB_NAME', 'wellnuo'),
|
|
'DB_USER': os.getenv('DB_USER', 'well_app'),
|
|
'DB_PASS': os.getenv('DB_PASSWORD', 'well_app_2024'),
|
|
'DB_HOST': os.getenv('DB_HOST', '192.168.68.70'),
|
|
'DB_PORT': os.getenv('DB_PORT', '5432'),
|
|
'OUT_FILE': "out.zip",
|
|
'RADAR_PART': "s28",
|
|
'GROUP_BY': "by_minute"
|
|
}
|
|
|
|
# Custom Logging Levels
|
|
LOG_INFO = 20
|
|
LOG_STEPS = 15 # Level 1 (-d 1)
|
|
LOG_DATA = 12 # Level 2 (-d 2)
|
|
LOG_SQL = 5 # Level 3 (-d 3)
|
|
|
|
logging.addLevelName(LOG_STEPS, "STEPS")
|
|
logging.addLevelName(LOG_DATA, "DATA")
|
|
logging.addLevelName(LOG_SQL, "SQL")
|
|
|
|
logger = logging.getLogger("wellDbQuery")
|
|
handler = logging.StreamHandler(sys.stdout)
|
|
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
# ==========================================
|
|
# Database Abstraction
|
|
# ==========================================
|
|
|
|
class Database:
|
|
def __init__(self, args):
|
|
self.conn_params = {
|
|
"host": args.db_host,
|
|
"port": args.db_port,
|
|
"dbname": args.db_name,
|
|
"user": args.db_username,
|
|
"password": args.db_password
|
|
}
|
|
self.conn = None
|
|
|
|
def connect(self):
|
|
if self.conn is None or self.conn.closed:
|
|
try:
|
|
logger.log(LOG_STEPS, f"Connecting to database {self.conn_params['host']}...")
|
|
self.conn = psycopg2.connect(**self.conn_params)
|
|
except Exception as e:
|
|
logger.error(f"Database connection failed: {e}")
|
|
sys.exit(1)
|
|
|
|
def close(self):
|
|
if self.conn:
|
|
self.conn.close()
|
|
logger.log(LOG_STEPS, "Database connection closed.")
|
|
|
|
def execute(self, query, params=None):
|
|
self.connect()
|
|
try:
|
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
logger.log(LOG_SQL, f"EXECUTING SQL: {query}")
|
|
if params:
|
|
logger.log(LOG_SQL, f"PARAMS: {params}")
|
|
|
|
cur.execute(query, params)
|
|
|
|
# If query returns rows
|
|
if cur.description:
|
|
return cur.fetchall()
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Query execution failed: {e}")
|
|
if logger.level <= LOG_SQL:
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
# ==========================================
|
|
# Data Processing Logic
|
|
# ==========================================
|
|
|
|
class DataProcessor:
|
|
"""
|
|
Handles the complexity of unwrapping 'mtype' sensor readings
|
|
and merging them with radar data.
|
|
"""
|
|
|
|
@staticmethod
|
|
def get_bucket_interval(group_by):
|
|
if group_by == 'by_10_seconds':
|
|
return '10 seconds'
|
|
elif group_by == 'by_hour':
|
|
return '1 hour'
|
|
elif group_by == 'by_10_minute':
|
|
return '10 minutes'
|
|
return '1 minute'
|
|
|
|
@staticmethod
|
|
def build_fetch_query(device_ids, start_time, end_time, radar_part, all_radar, group_by):
|
|
"""
|
|
Builds a query that fetches raw rows. We perform the pivoting in Python
|
|
to dynamically handle the sparse s10-s79 columns.
|
|
"""
|
|
device_list_str = ",".join(map(str, device_ids))
|
|
bucket_interval = DataProcessor.get_bucket_interval(group_by)
|
|
|
|
# Radar Selection Logic
|
|
if all_radar:
|
|
radar_select = """
|
|
MAX(absent) AS radar_absent, MAX(moving) AS radar_moving, MAX(stationary) AS radar_stationary, MAX("both") AS radar_both,
|
|
MAX(m0) AS m0, MAX(m1) AS m1, MAX(m2) AS m2, MAX(m3) AS m3, MAX(m4) AS m4, MAX(m5) AS m5, MAX(m6) AS m6, MAX(m7) AS m7, MAX(m8) AS m8,
|
|
MAX(s2) AS radar_s2, MAX(s3) AS radar_s3, MAX(s4) AS radar_s4, MAX(s5) AS radar_s5, MAX(s6) AS radar_s6, MAX(s7) AS radar_s7, MAX(s8) AS radar_s8
|
|
"""
|
|
radar_outer = """
|
|
rr.radar_absent, rr.radar_moving, rr.radar_stationary, rr.radar_both,
|
|
rr.m0, rr.m1, rr.m2, rr.m3, rr.m4, rr.m5, rr.m6, rr.m7, rr.m8,
|
|
rr.radar_s2, rr.radar_s3, rr.radar_s4, rr.radar_s5, rr.radar_s6, rr.radar_s7, rr.radar_s8
|
|
"""
|
|
else:
|
|
radar_expr = radar_part
|
|
if radar_expr == "s28":
|
|
radar_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
|
|
radar_select = f"MAX({radar_expr}) AS radar"
|
|
radar_outer = "rr.radar"
|
|
|
|
# We fetch s0-s9 and mtype. Python will map these to s0-s79.
|
|
# We use MAX/MIN aggregation to flatten duplicates within the time bucket if any exist
|
|
# though typically mtypes are distinct rows.
|
|
sql = f"""
|
|
SELECT
|
|
COALESCE(sr.minute, rr.minute) as time,
|
|
COALESCE(sr.device_id, rr.device_id) as device_id,
|
|
sr.mtype,
|
|
sr.temperature, sr.humidity, sr.pressure, sr.light,
|
|
sr.s0, sr.s1, sr.s2, sr.s3, sr.s4, sr.s5, sr.s6, sr.s7, sr.s8, sr.s9,
|
|
{radar_outer}
|
|
FROM (
|
|
SELECT
|
|
time_bucket('{bucket_interval}', time) AS minute,
|
|
device_id,
|
|
mtype,
|
|
AVG(temperature) AS temperature,
|
|
AVG(humidity) AS humidity,
|
|
AVG(pressure) AS pressure,
|
|
MAX(light) AS light,
|
|
MIN(s0) as s0, MIN(s1) as s1, MIN(s2) as s2, MIN(s3) as s3, MIN(s4) as s4,
|
|
MIN(s5) as s5, MIN(s6) as s6, MIN(s7) as s7, MIN(s8) as s8, MIN(s9) as s9
|
|
FROM sensor_readings
|
|
WHERE device_id IN ({device_list_str})
|
|
AND time >= '{start_time}'
|
|
AND time < '{end_time}'
|
|
GROUP BY minute, device_id, mtype
|
|
) sr
|
|
FULL OUTER JOIN (
|
|
SELECT
|
|
time_bucket('{bucket_interval}', time) AS minute,
|
|
device_id,
|
|
{radar_select}
|
|
FROM radar_readings
|
|
WHERE device_id IN ({device_list_str})
|
|
AND time >= '{start_time}'
|
|
AND time < '{end_time}'
|
|
GROUP BY minute, device_id
|
|
) rr
|
|
ON sr.minute = rr.minute AND sr.device_id = rr.device_id
|
|
ORDER BY time ASC, device_id ASC;
|
|
"""
|
|
return sql
|
|
|
|
@staticmethod
|
|
def process_rows(rows):
|
|
"""
|
|
Pivots the rows.
|
|
Input: Multiple rows per timestamp (one for each mtype).
|
|
Output: Single row per timestamp with columns s0...s79 populated dynamically.
|
|
"""
|
|
if not rows:
|
|
return [], []
|
|
|
|
# Dictionary to hold merged records: key = (time, device_id)
|
|
merged_data = defaultdict(dict)
|
|
|
|
# Track which sensor columns actually have data to optimize CSV headers
|
|
active_s_columns = set()
|
|
|
|
# Base headers that are always present
|
|
base_headers = ['time', 'device_id', 'temperature', 'humidity', 'pressure', 'light']
|
|
|
|
# Radar headers depend on query, extract them from the first row excluding known sensor/base cols
|
|
first_row_keys = list(rows[0].keys())
|
|
radar_headers = [k for k in first_row_keys if k not in base_headers and k not in ['mtype'] and not (k.startswith('s') and k[1:].isdigit())]
|
|
|
|
for row in rows:
|
|
key = (row['time'], row['device_id'])
|
|
|
|
# Initialize base data if not present
|
|
if key not in merged_data:
|
|
for h in base_headers:
|
|
merged_data[key][h] = row.get(h)
|
|
for h in radar_headers:
|
|
merged_data[key][h] = row.get(h)
|
|
|
|
# Merge Base Sensor Data (Temp/Hum/etc) if current row has it and stored is None
|
|
# (This handles FULL OUTER JOIN nulls)
|
|
if row.get('temperature') is not None:
|
|
for h in base_headers[2:]: # Skip time/id
|
|
merged_data[key][h] = row.get(h)
|
|
|
|
# Merge Radar Data
|
|
if any(row.get(h) is not None for h in radar_headers):
|
|
for h in radar_headers:
|
|
if row.get(h) is not None:
|
|
merged_data[key][h] = row.get(h)
|
|
|
|
# Process Extended Sensors (s0-s79) based on mtype
|
|
mtype = row.get('mtype')
|
|
|
|
# Logic from well-api.py:
|
|
# mtype 0, 17, 100 -> s0-s9
|
|
# mtype 110 -> s10-s19 ... mtype 170 -> s70-s79
|
|
|
|
if mtype is not None:
|
|
base_offset = 0
|
|
if mtype in [0, 17, 100]:
|
|
base_offset = 0
|
|
elif 110 <= mtype <= 170:
|
|
base_offset = mtype - 100
|
|
else:
|
|
# Unknown mtype, skip sensor mapping or log debug
|
|
continue
|
|
|
|
for i in range(10):
|
|
val = row.get(f's{i}')
|
|
if val is not None:
|
|
target_idx = base_offset + i
|
|
col_name = f's{target_idx}'
|
|
merged_data[key][col_name] = val
|
|
active_s_columns.add(target_idx)
|
|
|
|
# Sort active sensor columns numerically
|
|
sorted_s_cols = [f's{i}' for i in sorted(list(active_s_columns))]
|
|
|
|
# Final Header List
|
|
final_headers = base_headers + radar_headers + sorted_s_cols
|
|
|
|
# Flatten dictionary to list of dicts
|
|
final_rows = []
|
|
for key in sorted(merged_data.keys()): # Sort by time, device_id
|
|
row_data = merged_data[key]
|
|
# Ensure all columns exist in dict for CSV writer
|
|
for col in final_headers:
|
|
if col not in row_data:
|
|
row_data[col] = None
|
|
final_rows.append(row_data)
|
|
|
|
return final_rows, final_headers
|
|
|
|
# ==========================================
|
|
# Main Application Logic
|
|
# ==========================================
|
|
|
|
class WellExporter:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.db = Database(args)
|
|
self.target_device_ids = []
|
|
self.file_identifier = "unknown" # Used for filename prefix
|
|
|
|
def resolve_devices(self):
|
|
"""Resolves command line arguments into a list of internal device_ids."""
|
|
ids = set()
|
|
|
|
# 1. Explicit Device ID
|
|
if self.args.device_id:
|
|
ids.add(int(self.args.device_id))
|
|
self.file_identifier = str(self.args.device_id)
|
|
logger.log(LOG_STEPS, f"Resolved Device ID: {self.args.device_id}")
|
|
|
|
# 2. Well ID
|
|
if self.args.well_id:
|
|
rows = self.db.execute("SELECT device_id FROM public.devices WHERE well_id = %s", (self.args.well_id,))
|
|
if rows:
|
|
ids.add(rows[0]['device_id'])
|
|
self.file_identifier = str(self.args.well_id)
|
|
logger.log(LOG_STEPS, f"Resolved Well ID {self.args.well_id} -> Device ID {rows[0]['device_id']}")
|
|
else:
|
|
logger.warning(f"Well ID {self.args.well_id} not found.")
|
|
|
|
# 3. MAC Address
|
|
if self.args.mac:
|
|
rows = self.db.execute("SELECT device_id FROM public.devices WHERE device_mac = %s", (self.args.mac,))
|
|
if rows:
|
|
ids.add(rows[0]['device_id'])
|
|
self.file_identifier = self.args.mac
|
|
logger.log(LOG_STEPS, f"Resolved MAC {self.args.mac} -> Device ID {rows[0]['device_id']}")
|
|
else:
|
|
logger.warning(f"MAC {self.args.mac} not found.")
|
|
|
|
# 4. Deployment ID
|
|
if self.args.deployment_id:
|
|
self.file_identifier = str(self.args.deployment_id)
|
|
logger.log(LOG_STEPS, f"Resolving devices for Deployment ID: {self.args.deployment_id}")
|
|
rows = self.db.execute("SELECT devices FROM public.deployment_details WHERE deployment_id = %s", (self.args.deployment_id,))
|
|
|
|
if rows and rows[0]['devices']:
|
|
raw_devices = rows[0]['devices']
|
|
macs_to_find = []
|
|
|
|
try:
|
|
# Handle various formats stored in DB (JSON list, string representation, nested lists)
|
|
# Format seen in well-api: '["MAC1", "MAC2"]' or '[[id,id,loc,desc,MAC],...]'
|
|
try:
|
|
device_data = json.loads(raw_devices)
|
|
except json.JSONDecodeError:
|
|
try:
|
|
device_data = ast.literal_eval(raw_devices)
|
|
except:
|
|
# Fallback for simple comma separated string
|
|
device_data = [d.strip() for d in raw_devices.strip('[]"\'').split(',')]
|
|
|
|
for item in device_data:
|
|
# Format: [[id, id, "Loc", "Desc", "MAC"], ...]
|
|
if isinstance(item, list) and len(item) > 4:
|
|
macs_to_find.append(item[4])
|
|
# Format: ["MAC1", "MAC2"]
|
|
elif isinstance(item, str):
|
|
# Clean potential quotes
|
|
clean_mac = item.strip().replace('"', '').replace("'", "")
|
|
if len(clean_mac) in [12, 17]:
|
|
macs_to_find.append(clean_mac)
|
|
|
|
if macs_to_find:
|
|
placeholders = ','.join(['%s'] * len(macs_to_find))
|
|
sql = f"SELECT device_id FROM public.devices WHERE device_mac IN ({placeholders})"
|
|
d_rows = self.db.execute(sql, tuple(macs_to_find))
|
|
if d_rows:
|
|
for r in d_rows:
|
|
ids.add(r['device_id'])
|
|
logger.info(f"Found {len(d_rows)} devices in deployment {self.args.deployment_id}")
|
|
else:
|
|
logger.warning("No matching devices found in DB for the MACs in deployment.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse deployment devices string: {e}")
|
|
else:
|
|
logger.warning(f"Deployment {self.args.deployment_id} not found or empty.")
|
|
|
|
self.target_device_ids = sorted(list(ids))
|
|
if not self.target_device_ids:
|
|
logger.error("No valid devices found based on input parameters.")
|
|
sys.exit(1)
|
|
|
|
logger.log(LOG_DATA, f"Target Device IDs: {self.target_device_ids}")
|
|
|
|
def run(self):
|
|
# 1. Setup
|
|
self.resolve_devices()
|
|
|
|
# 2. Parse Dates
|
|
try:
|
|
start_date = datetime.datetime.strptime(self.args.date_from, "%Y-%m-%d")
|
|
end_date = datetime.datetime.strptime(self.args.date_to, "%Y-%m-%d")
|
|
except ValueError:
|
|
logger.error("Invalid date format. Use YYYY-MM-DD")
|
|
sys.exit(1)
|
|
|
|
if start_date > end_date:
|
|
start_date, end_date = end_date, start_date
|
|
|
|
# 3. Open Zip File
|
|
try:
|
|
logger.info(f"Creating output file: {self.args.outFile}")
|
|
zip_buffer = zipfile.ZipFile(self.args.outFile, 'w', zipfile.ZIP_DEFLATED)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create zip file: {e}")
|
|
sys.exit(1)
|
|
|
|
# 4. Iterate Days
|
|
current_date = start_date
|
|
total_rows_exported = 0
|
|
|
|
try:
|
|
while current_date <= end_date:
|
|
day_str = current_date.strftime("%Y-%m-%d")
|
|
|
|
# Define 24h window (UTC assumed based on schema usage in well-api)
|
|
t_start = f"{day_str} 00:00:00"
|
|
t_end = (current_date + datetime.timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")
|
|
|
|
logger.info(f"Processing date: {day_str}...")
|
|
|
|
# Build Query
|
|
sql = DataProcessor.build_fetch_query(
|
|
self.target_device_ids,
|
|
t_start,
|
|
t_end,
|
|
self.args.radar_part,
|
|
self.args.allRadar,
|
|
self.args.group_by
|
|
)
|
|
|
|
# Execute
|
|
raw_rows = self.db.execute(sql)
|
|
|
|
if raw_rows:
|
|
logger.log(LOG_DATA, f" -> Fetched {len(raw_rows)} raw rows (including mtype splits).")
|
|
|
|
# Process / Pivot Data
|
|
processed_rows, headers = DataProcessor.process_rows(raw_rows)
|
|
|
|
count = len(processed_rows)
|
|
total_rows_exported += count
|
|
logger.log(LOG_STEPS, f" -> Processed into {count} unique time records.")
|
|
|
|
# Generate CSV in memory
|
|
csv_buffer = io.StringIO()
|
|
writer = csv.DictWriter(csv_buffer, fieldnames=headers)
|
|
writer.writeheader()
|
|
writer.writerows(processed_rows)
|
|
|
|
# Add to Zip
|
|
# Format: {ID}_{DATE}_{GROUP}_rc_data.csv
|
|
csv_filename = f"{self.file_identifier}_{day_str}_{self.args.group_by}_rc_data.csv"
|
|
zip_buffer.writestr(csv_filename, csv_buffer.getvalue())
|
|
logger.log(LOG_STEPS, f" -> Added {csv_filename} to zip.")
|
|
else:
|
|
logger.log(LOG_STEPS, " -> No data found for this date.")
|
|
|
|
current_date += datetime.timedelta(days=1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.warning("Operation cancelled by user.")
|
|
except Exception as e:
|
|
logger.error(f"An error occurred during processing: {e}")
|
|
if self.args.debug > 0:
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
zip_buffer.close()
|
|
self.db.close()
|
|
|
|
logger.info(f"Export complete. Total records: {total_rows_exported}. File: {self.args.outFile}")
|
|
|
|
# ==========================================
|
|
# Argument Parsing
|
|
# ==========================================
|
|
|
|
def parse_arguments():
|
|
parser = argparse.ArgumentParser(
|
|
description="Wellnuo Database Export Tool",
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
epilog="""Examples:
|
|
# Query by Device ID
|
|
wellDbQuery.py --device_id 560 --date_from 2025-03-09 --date_to 2025-04-22 --outFile c.zip
|
|
|
|
# Query by Deployment ID (all devices in deployment)
|
|
wellDbQuery.py --deployment_id 21 --date_from 2025-06-01 --date_to 2025-06-01 --outFile deployment_21.zip
|
|
|
|
# Query by MAC with all radar columns and high debug
|
|
wellDbQuery.py --mac 64B70888FAB0 --date_from 2025-01-01 --date_to 2025-01-01 --allRadar -d 2
|
|
"""
|
|
)
|
|
|
|
# Selection Group (Mutually Exclusive logic handled in code)
|
|
sel = parser.add_argument_group('Target Selection (One required)')
|
|
sel.add_argument('--device_id', '-di', type=int, help='Target Device ID (internal DB id)')
|
|
sel.add_argument('--well_id', '-wi', type=int, help='Target Well ID (external id)')
|
|
sel.add_argument('--mac', '-m', type=str, help='Target Device MAC Address')
|
|
sel.add_argument('--deployment_id', '-depid', type=int, help='Target Deployment ID (fetches all devices in deployment)')
|
|
|
|
# Date Group
|
|
date = parser.add_argument_group('Date Range')
|
|
date.add_argument('--date_from', '-df', type=str, required=True, help='Start Date (YYYY-MM-DD)')
|
|
date.add_argument('--date_to', '-dt', type=str, required=True, help='End Date (YYYY-MM-DD)')
|
|
|
|
# DB Config
|
|
db = parser.add_argument_group('Database Configuration')
|
|
db.add_argument('--db_name' , default=DEFAULTS['DB_NAME'], help=f"Default: {DEFAULTS['DB_NAME']}")
|
|
db.add_argument('--db_username' , default=DEFAULTS['DB_USER'], help=f"Default: {DEFAULTS['DB_USER']}")
|
|
db.add_argument('--db_password' , default=DEFAULTS['DB_PASS'], help="Default: from .env")
|
|
db.add_argument('--db_host' , default=DEFAULTS['DB_HOST'], help=f"Default: {DEFAULTS['DB_HOST']}")
|
|
db.add_argument('--db_port' , default=DEFAULTS['DB_PORT'], help=f"Default: {DEFAULTS['DB_PORT']}")
|
|
|
|
# Options
|
|
opts = parser.add_argument_group('Export Options')
|
|
opts.add_argument('--outFile', '-o' , default=DEFAULTS['OUT_FILE'], help=f"Output ZIP filename (default: {DEFAULTS['OUT_FILE']})")
|
|
opts.add_argument('--radar_part', '-radar', default=DEFAULTS['RADAR_PART'], help=f"Radar column expression (default: {DEFAULTS['RADAR_PART']})")
|
|
opts.add_argument('--allRadar', '-allr', action='store_true', help="Retrieve all raw radar columns instead of calculated part")
|
|
opts.add_argument('--group_by', '-g', default=DEFAULTS['GROUP_BY'], choices=['by_minute', 'by_10_seconds', 'by_hour', 'by_10_minute'], help="Time aggregation bucket")
|
|
opts.add_argument('-d', '--debug', type=int, default=0, choices=[0, 1, 2, 3], help="Debug level: 0=Info, 1=Steps, 2=Data, 3=SQL")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not any([args.device_id, args.well_id, args.mac, args.deployment_id]):
|
|
parser.error("You must provide one of --device_id, --well_id, --mac, or --deployment_id")
|
|
|
|
return args
|
|
|
|
def setup_logging_level(level):
|
|
if level == 0:
|
|
logger.setLevel(logging.INFO)
|
|
elif level == 1:
|
|
logger.setLevel(LOG_STEPS)
|
|
elif level == 2:
|
|
logger.setLevel(LOG_DATA)
|
|
elif level == 3:
|
|
logger.setLevel(LOG_SQL)
|
|
|
|
# ==========================================
|
|
# Entry Point
|
|
# ==========================================
|
|
|
|
if __name__ == "__main__":
|
|
args = parse_arguments()
|
|
setup_logging_level(args.debug)
|
|
|
|
exporter = WellExporter(args)
|
|
exporter.run() |