well-svc-alert/well-svc-msg.py
2025-06-16 10:31:41 -07:00

347 lines
17 KiB
Python

#!/usr/bin/env python3
import os
import sys
import logging
import json
import time
import telnyx
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail, Email, To, Content
from kombu import Connection, Exchange, Queue # Remove Consumer import here
# Import Consumer where it's used or rely on connection.Consumer
from kombu.exceptions import KombuError # Import specific exception for connection errors
from dotenv import load_dotenv
# --- Configuration Loading ---
# Load environment variables from from $HOME/.env and ./.env files
dotenv_path = os.path.join(os.environ.get("HOME", "."), '.env')
load_dotenv(dotenv_path=dotenv_path, override=True) # Allow overriding if $HOME/.env exists
dotenv_path = os.path.join(os.getcwd(), '.env')
load_dotenv(dotenv_path=dotenv_path)
# --- Configuration ---
TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY")
TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID", "+16505820706")
TELNYX_CONNECTION_ID = os.environ.get("TELNYX_CONNECTION_ID") # For Voice/TTS Calls
messaging_profile_id = os.environ.get("TELNYX_MESSAGING_PROFILE_ID") # Optional but good practice
# RabbitMQ Configuration using Kombu URL format
# Default to standard local guest user if only hostname 'localhost' is provided
raw_rabbitmq_url = os.environ.get("RABBITMQ_URL", "localhost")
if raw_rabbitmq_url == "localhost":
RABBITMQ_URL = "amqp://guest:guest@localhost:5672//"
# If your RabbitMQ needs different user/pass/vhost, provide the full URL
# Example: "amqp://user:password@host:port/vhost"
else:
# If the URL is not localhost, assume it's a full URL
RABBITMQ_URL = raw_rabbitmq_url
RABBITMQ_ALERTS_QNAME = os.environ.get("RABBITMQ_ALERTS_QNAME", "alerts")
# Define the exchange (default direct exchange)
exchange = Exchange("", type='direct')
# Define the queue, binding it to the default exchange using its name as routing key
# Make sure queue is durable to survive broker restarts
alert_queue = Queue(RABBITMQ_ALERTS_QNAME, exchange=exchange, routing_key=RABBITMQ_ALERTS_QNAME, durable=True)
# SendGrid configuration
SENDGRID_API_KEY = os.environ.get("SENDGRID_API_KEY")
SENDGRID_FROM_EMAIL = os.environ.get("SENDGRID_FROM_EMAIL") # Verified sender email
# --- Setup Logging ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
stream=sys.stdout,
)
logger = logging.getLogger(__name__)
# --- Log Initial Configuration ---
logger.info("=============================================")
logger.info(" Initial Service Configuration ")
logger.info("---------------------------------------------")
logger.info(f"RABBITMQ_URL = {RABBITMQ_URL}")
logger.info(f"RABBITMQ_ALERTS_QNAME = {RABBITMQ_ALERTS_QNAME}")
logger.info(f"TELNYX_SENDER_ID = {TELNYX_SENDER_ID}")
logger.info(f"TELNYX_CONNECTION_ID (TTS) = {TELNYX_CONNECTION_ID if TELNYX_CONNECTION_ID else 'Not Set'}")
logger.info(f"SENDGRID_FROM_EMAIL (Email)= {SENDGRID_FROM_EMAIL if SENDGRID_FROM_EMAIL else 'Not Set'}")
logger.info(f"TELNYX_API_KEY = {'Set' if TELNYX_API_KEY else 'Not Set'}")
logger.info(f"SENDGRID_API_KEY = {'Set' if SENDGRID_API_KEY else 'Not Set'}")
logger.info("=============================================")
# --- Helper Functions ---
def check_env_vars():
"""Checks if required environment variables are set based on needs."""
required_vars = ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"]
missing_vars = [var for var in required_vars if not os.environ.get(var)]
# Check conditional variables needed for specific methods
# Assume we might get *any* type, so check all potential optional vars
if not TELNYX_CONNECTION_ID:
logger.warning("Optional environment variable TELNYX_CONNECTION_ID is missing (required for phone/TTS)")
if not SENDGRID_API_KEY:
logger.warning("Optional environment variable SENDGRID_API_KEY is missing (required for email)")
if not SENDGRID_FROM_EMAIL:
logger.warning("Optional environment variable SENDGRID_FROM_EMAIL is missing (required for email)")
if any(v in missing_vars for v in ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"]):
logger.error(f"Missing critical environment variables: {', '.join(missing_vars)}")
sys.exit(1)
else:
logger.info("Core environment variables appear to be set.")
def setup_telnyx():
"""Configures the Telnyx client."""
if not TELNYX_API_KEY:
logger.error("Cannot configure Telnyx client: TELNYX_API_KEY not set.")
return False
telnyx.api_key = TELNYX_API_KEY
logger.info("Telnyx client configured.")
return True
# --- Sending Functions (Keep these identical) ---
def send_telnyx_sms(recipient: str, message_body: str) -> bool:
"""Sends an SMS using the Telnyx API."""
if not recipient:
logger.error("Cannot send SMS: Recipient phone number is missing.")
return False
if not TELNYX_SENDER_ID:
logger.error("Cannot send SMS: TELNYX_SENDER_ID is not set.")
return False
logger.info(f"Attempting to send SMS from '{TELNYX_SENDER_ID}' to '{recipient}'")
try:
message_create_params = {
"from_": TELNYX_SENDER_ID,
"to": recipient,
"text": message_body,
"messaging_profile_id": messaging_profile_id, # Optional but good practice
"type": "sms" # Explicitly set type to SMS
}
response = telnyx.Message.create(**message_create_params)
logger.info(f"SMS submitted successfully to Telnyx. Message ID: {response.id}")
return True
except telnyx.error.TelnyxError as e:
logger.error(f"Telnyx API Error sending SMS to {recipient}: {e}")
if hasattr(e, 'http_body') and e.http_body:
logger.error(f"Telnyx Error Details: {e.http_body}")
return False
except Exception as e:
logger.error(f"Unexpected error sending SMS to {recipient}: {e}")
return False
def send_email(recipient_email: str, subject: str, body: str) -> bool:
"""Sends an email using SendGrid."""
if not recipient_email:
logger.error("Cannot send email: Recipient email address is missing.")
return False
if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL:
logger.error("Cannot send email: SENDGRID_API_KEY or SENDGRID_FROM_EMAIL not configured.")
return False
logger.info(f"Attempting to send email via SendGrid from '{SENDGRID_FROM_EMAIL}' to '{recipient_email}'")
message = Mail(
from_email=Email(SENDGRID_FROM_EMAIL), # Use Email object
to_emails=To(recipient_email), # Use To object
subject=subject if subject else 'Alert Notification', # Default subject
plain_text_content=Content("text/plain", body) # Use Content object
)
try:
sg = SendGridAPIClient(SENDGRID_API_KEY)
response = sg.send(message)
# SendGrid returns 2xx status codes on success
if 200 <= response.status_code < 300:
logger.info(f"Email submitted successfully to SendGrid for {recipient_email}. Status: {response.status_code}")
return True
else:
logger.error(f"SendGrid API Error sending email to {recipient_email}. Status: {response.status_code}, Body: {response.body}")
return False
except Exception as e:
# Log the full exception details from SendGrid/http client if possible
logger.exception(f"Unexpected error sending email via SendGrid to {recipient_email}: {e}")
return False
def make_telnyx_tts_call(recipient: str, tts_body: str) -> bool:
"""Initiates a Telnyx Call Control V2 call to speak TTS."""
if not recipient:
logger.error("Cannot initiate TTS call: Recipient phone number is missing.")
return False
if not TELNYX_SENDER_ID:
logger.error("Cannot initiate TTS call: TELNYX_SENDER_ID is not set.")
return False
if not TELNYX_CONNECTION_ID:
logger.error("Cannot initiate TTS call: TELNYX_CONNECTION_ID (Call Control Application) is not set.")
return False
logger.info(f"Attempting to initiate TTS call from '{TELNYX_SENDER_ID}' to '{recipient}' via Connection ID '{TELNYX_CONNECTION_ID}'")
try:
call_request = {
"to": recipient,
"from_": TELNYX_SENDER_ID,
"connection_id": TELNYX_CONNECTION_ID,
"custom_headers": [
{"name": "X-TTS-Payload", "value": tts_body}
],
}
response = telnyx.Call.create(**call_request)
logger.info(f"Telnyx call initiation request successful for {recipient}. Call Control ID: {response.call_control_id}, Call Leg ID: {response.call_leg_id}")
logger.warning(f"Reminder: A Call Control Application webhook MUST handle call events for Connection ID '{TELNYX_CONNECTION_ID}' and issue the 'speak_text' command using the provided body: '{tts_body[:50]}...'")
return True
except telnyx.error.TelnyxError as e:
logger.error(f"Telnyx API Error initiating call to {recipient}: {e}")
if hasattr(e, 'http_body') and e.http_body:
logger.error(f"Telnyx Error Details: {e.http_body}")
return False
except Exception as e:
logger.error(f"Unexpected error initiating call to {recipient}: {e}")
return False
# --- Kombu Callback ---
def handle_message(body, message):
"""Callback function for Kombu to process messages."""
delivery_tag = message.delivery_tag
logger.info(f"Received message via Kombu (Delivery Tag: {delivery_tag})")
try:
# Decode JSON payload
if isinstance(body, bytes):
message_data = json.loads(body.decode('utf-8'))
elif isinstance(body, str):
message_data = json.loads(body)
elif isinstance(body, dict):
message_data = body # Already decoded
else:
raise TypeError(f"Unexpected message body type: {type(body)}")
logger.debug(f"Decoded message data: {message_data}")
send_method = message_data.get('send_method')
destination = message_data.get('destination')
message_body = message_data.get('body')
subject = message_data.get('subject', '') # Subject primarily for email
if not all([send_method, destination, message_body]):
logger.error(f"Invalid message format: Missing 'send_method', 'destination', or 'body'. Message: {body}")
message.reject(requeue=False) # Reject and discard malformed message
logger.warning(f"Message rejected (tag: {delivery_tag}) due to invalid format.")
return
logger.info(f"Processing alert - Method: {send_method}, Destination: {destination[:15]}...")
success = False
if send_method == "sms":
success = send_telnyx_sms(recipient=destination, message_body=message_body)
elif send_method == "email":
# Check if email vars are present before calling
if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL:
logger.error(f"Cannot send email for tag {delivery_tag}: SendGrid config missing.")
success = False # Mark as failure for this specific task
else:
success = send_email(recipient_email=destination, subject=subject, body=message_body)
elif send_method == "phone":
# Check if TTS var is present before calling
if not TELNYX_CONNECTION_ID:
logger.error(f"Cannot make TTS call for tag {delivery_tag}: TELNYX_CONNECTION_ID missing.")
success = False # Mark as failure
else:
success = make_telnyx_tts_call(recipient=destination, tts_body=message_body)
else:
logger.error(f"Unsupported send_method: '{send_method}'. Discarding message (tag: {delivery_tag}).")
message.reject(requeue=False) # Reject and discard
return
# Acknowledge or Requeue/Reject based on success
if success:
logger.info(f"Successfully submitted '{send_method}' task for tag {delivery_tag}. Acknowledging.")
message.ack()
else:
logger.error(f"Failed to submit '{send_method}' task for tag {delivery_tag}. Rejecting message (requeue=False).")
message.reject(requeue=False)
except (json.JSONDecodeError, TypeError) as decode_err:
logger.error(f"Failed to decode or process message body (Tag: {delivery_tag}): {decode_err}. Body sample: {str(body)[:100]}. Rejecting (requeue=False).")
message.reject(requeue=False) # Reject malformed/unexpected message type
except Exception as e:
# Catch-all for unexpected errors during processing
logger.exception(f"Unexpected error processing message (Tag: {delivery_tag}): {e}. Rejecting message (requeue=False).")
try:
message.reject(requeue=False) # Try to reject even after an exception
except Exception as reject_e:
logger.error(f"Further error trying to reject message (Tag: {delivery_tag}): {reject_e}")
# --- Main Service Logic using Kombu (Corrected) ---
def main():
"""Main function to start the service using Kombu."""
logger.info("Starting Alert Service using Kombu...")
check_env_vars()
if not setup_telnyx():
sys.exit("Failed to configure Telnyx client. Exiting.")
logger.info(f"Attempting to connect to RabbitMQ via Kombu at {RABBITMQ_URL}")
# Connection loop
while True:
try:
# Establish connection using a context manager
with Connection(RABBITMQ_URL, connect_timeout=10, heartbeat=60) as connection:
logger.info("Kombu connection established.")
# *** FIX: Create the Consumer *using* the connection object ***
# This ensures the consumer is properly bound to the connection/channel.
consumer = connection.Consumer(
queues=[alert_queue], # List of queues to consume from
callbacks=[handle_message], # List of callbacks
accept=['json', 'text/plain'],# Define acceptable content-types
prefetch_count=1 # Process one message at a time
)
# Start consuming messages. This declares queues, sets QoS etc.
consumer.consume()
logger.info(f"Kombu consumer started. Waiting for messages on '{RABBITMQ_ALERTS_QNAME}'. To exit press CTRL+C")
# Inner loop to process messages using drain_events
while True:
try:
# Process messages indefinitely, checking for events every second
connection.drain_events(timeout=1)
except socket.timeout:
# No events within the timeout, connection likely still fine. Continue loop.
# You could add a check here connection.connected if needed.
pass
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received during drain_events. Shutting down...")
consumer.cancel() # Stop consuming
logger.info("Kombu consumer cancelled.")
# Exit the inner loop to allow the 'with Connection' block to close
raise # Re-raise KeyboardInterrupt to exit outer loop/program
except (KombuError, ConnectionResetError, ConnectionAbortedError) as e: # Catch specific connection/channel errors
logger.error(f"Kombu connection/channel error during drain_events: {e}. Breaking inner loop to reconnect...")
consumer.cancel() # Try to cancel consumer cleanly
break # Exit inner loop to force reconnection via outer loop
except Exception as e: # Catch other unexpected errors in consumer loop
logger.exception(f"Unexpected error in Kombu drain_events loop: {e}. Breaking inner loop to reconnect...")
consumer.cancel() # Try to cancel consumer cleanly
break # Exit inner loop
# Handle outer loop exceptions (connection failures, final KeyboardInterrupt)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received during connection attempt or after exit from inner loop. Exiting.")
break # Exit the main while loop
except (ConnectionRefusedError, KombuError, Exception) as conn_err:
logger.error(f"Failed to connect or catastrophic failure ({type(conn_err).__name__}): {conn_err}. Retrying in 10 seconds...")
# Consumer is already cancelled or wasn't created if connection failed initially
time.sleep(10)
logger.info("Alert Service stopped.")
if __name__ == "__main__":
main()