#!/usr/bin/env python3 import os import sys import logging import json import time import telnyx from sendgrid import SendGridAPIClient from sendgrid.helpers.mail import Mail, Email, To, Content from kombu import Connection, Exchange, Queue # Remove Consumer import here # Import Consumer where it's used or rely on connection.Consumer from kombu.exceptions import KombuError # Import specific exception for connection errors from dotenv import load_dotenv # --- Configuration Loading --- # Load environment variables from from $HOME/.env and ./.env files dotenv_path = os.path.join(os.environ.get("HOME", "."), '.env') load_dotenv(dotenv_path=dotenv_path, override=True) # Allow overriding if $HOME/.env exists dotenv_path = os.path.join(os.getcwd(), '.env') load_dotenv(dotenv_path=dotenv_path) # --- Configuration --- TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY") TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID", "+16505820706") TELNYX_CONNECTION_ID = os.environ.get("TELNYX_CONNECTION_ID") # For Voice/TTS Calls messaging_profile_id = os.environ.get("TELNYX_MESSAGING_PROFILE_ID") # Optional but good practice # RabbitMQ Configuration using Kombu URL format # Default to standard local guest user if only hostname 'localhost' is provided raw_rabbitmq_url = os.environ.get("RABBITMQ_URL", "localhost") if raw_rabbitmq_url == "localhost": RABBITMQ_URL = "amqp://guest:guest@localhost:5672//" # If your RabbitMQ needs different user/pass/vhost, provide the full URL # Example: "amqp://user:password@host:port/vhost" else: # If the URL is not localhost, assume it's a full URL RABBITMQ_URL = raw_rabbitmq_url RABBITMQ_ALERTS_QNAME = os.environ.get("RABBITMQ_ALERTS_QNAME", "alerts") # Define the exchange (default direct exchange) exchange = Exchange("", type='direct') # Define the queue, binding it to the default exchange using its name as routing key # Make sure queue is durable to survive broker restarts alert_queue = Queue(RABBITMQ_ALERTS_QNAME, exchange=exchange, routing_key=RABBITMQ_ALERTS_QNAME, durable=True) # SendGrid configuration SENDGRID_API_KEY = os.environ.get("SENDGRID_API_KEY") SENDGRID_FROM_EMAIL = os.environ.get("SENDGRID_FROM_EMAIL") # Verified sender email # --- Setup Logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", stream=sys.stdout, ) logger = logging.getLogger(__name__) # --- Log Initial Configuration --- logger.info("=============================================") logger.info(" Initial Service Configuration ") logger.info("---------------------------------------------") logger.info(f"RABBITMQ_URL = {RABBITMQ_URL}") logger.info(f"RABBITMQ_ALERTS_QNAME = {RABBITMQ_ALERTS_QNAME}") logger.info(f"TELNYX_SENDER_ID = {TELNYX_SENDER_ID}") logger.info(f"TELNYX_CONNECTION_ID (TTS) = {TELNYX_CONNECTION_ID if TELNYX_CONNECTION_ID else 'Not Set'}") logger.info(f"SENDGRID_FROM_EMAIL (Email)= {SENDGRID_FROM_EMAIL if SENDGRID_FROM_EMAIL else 'Not Set'}") logger.info(f"TELNYX_API_KEY = {'Set' if TELNYX_API_KEY else 'Not Set'}") logger.info(f"SENDGRID_API_KEY = {'Set' if SENDGRID_API_KEY else 'Not Set'}") logger.info("=============================================") # --- Helper Functions --- def check_env_vars(): """Checks if required environment variables are set based on needs.""" required_vars = ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"] missing_vars = [var for var in required_vars if not os.environ.get(var)] # Check conditional variables needed for specific methods # Assume we might get *any* type, so check all potential optional vars if not TELNYX_CONNECTION_ID: logger.warning("Optional environment variable TELNYX_CONNECTION_ID is missing (required for phone/TTS)") if not SENDGRID_API_KEY: logger.warning("Optional environment variable SENDGRID_API_KEY is missing (required for email)") if not SENDGRID_FROM_EMAIL: logger.warning("Optional environment variable SENDGRID_FROM_EMAIL is missing (required for email)") if any(v in missing_vars for v in ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"]): logger.error(f"Missing critical environment variables: {', '.join(missing_vars)}") sys.exit(1) else: logger.info("Core environment variables appear to be set.") def setup_telnyx(): """Configures the Telnyx client.""" if not TELNYX_API_KEY: logger.error("Cannot configure Telnyx client: TELNYX_API_KEY not set.") return False telnyx.api_key = TELNYX_API_KEY logger.info("Telnyx client configured.") return True # --- Sending Functions (Keep these identical) --- def send_telnyx_sms(recipient: str, message_body: str) -> bool: """Sends an SMS using the Telnyx API.""" if not recipient: logger.error("Cannot send SMS: Recipient phone number is missing.") return False if not TELNYX_SENDER_ID: logger.error("Cannot send SMS: TELNYX_SENDER_ID is not set.") return False logger.info(f"Attempting to send SMS from '{TELNYX_SENDER_ID}' to '{recipient}'") try: message_create_params = { "from_": TELNYX_SENDER_ID, "to": recipient, "text": message_body, "messaging_profile_id": messaging_profile_id, # Optional but good practice "type": "sms" # Explicitly set type to SMS } response = telnyx.Message.create(**message_create_params) logger.info(f"SMS submitted successfully to Telnyx. Message ID: {response.id}") return True except telnyx.error.TelnyxError as e: logger.error(f"Telnyx API Error sending SMS to {recipient}: {e}") if hasattr(e, 'http_body') and e.http_body: logger.error(f"Telnyx Error Details: {e.http_body}") return False except Exception as e: logger.error(f"Unexpected error sending SMS to {recipient}: {e}") return False def send_email(recipient_email: str, subject: str, body: str) -> bool: """Sends an email using SendGrid.""" if not recipient_email: logger.error("Cannot send email: Recipient email address is missing.") return False if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL: logger.error("Cannot send email: SENDGRID_API_KEY or SENDGRID_FROM_EMAIL not configured.") return False logger.info(f"Attempting to send email via SendGrid from '{SENDGRID_FROM_EMAIL}' to '{recipient_email}'") message = Mail( from_email=Email(SENDGRID_FROM_EMAIL), # Use Email object to_emails=To(recipient_email), # Use To object subject=subject if subject else 'Alert Notification', # Default subject plain_text_content=Content("text/plain", body) # Use Content object ) try: sg = SendGridAPIClient(SENDGRID_API_KEY) response = sg.send(message) # SendGrid returns 2xx status codes on success if 200 <= response.status_code < 300: logger.info(f"Email submitted successfully to SendGrid for {recipient_email}. Status: {response.status_code}") return True else: logger.error(f"SendGrid API Error sending email to {recipient_email}. Status: {response.status_code}, Body: {response.body}") return False except Exception as e: # Log the full exception details from SendGrid/http client if possible logger.exception(f"Unexpected error sending email via SendGrid to {recipient_email}: {e}") return False def make_telnyx_tts_call(recipient: str, tts_body: str) -> bool: """Initiates a Telnyx Call Control V2 call to speak TTS.""" if not recipient: logger.error("Cannot initiate TTS call: Recipient phone number is missing.") return False if not TELNYX_SENDER_ID: logger.error("Cannot initiate TTS call: TELNYX_SENDER_ID is not set.") return False if not TELNYX_CONNECTION_ID: logger.error("Cannot initiate TTS call: TELNYX_CONNECTION_ID (Call Control Application) is not set.") return False logger.info(f"Attempting to initiate TTS call from '{TELNYX_SENDER_ID}' to '{recipient}' via Connection ID '{TELNYX_CONNECTION_ID}'") try: call_request = { "to": recipient, "from_": TELNYX_SENDER_ID, "connection_id": TELNYX_CONNECTION_ID, "custom_headers": [ {"name": "X-TTS-Payload", "value": tts_body} ], } response = telnyx.Call.create(**call_request) logger.info(f"Telnyx call initiation request successful for {recipient}. Call Control ID: {response.call_control_id}, Call Leg ID: {response.call_leg_id}") logger.warning(f"Reminder: A Call Control Application webhook MUST handle call events for Connection ID '{TELNYX_CONNECTION_ID}' and issue the 'speak_text' command using the provided body: '{tts_body[:50]}...'") return True except telnyx.error.TelnyxError as e: logger.error(f"Telnyx API Error initiating call to {recipient}: {e}") if hasattr(e, 'http_body') and e.http_body: logger.error(f"Telnyx Error Details: {e.http_body}") return False except Exception as e: logger.error(f"Unexpected error initiating call to {recipient}: {e}") return False # --- Kombu Callback --- def handle_message(body, message): """Callback function for Kombu to process messages.""" delivery_tag = message.delivery_tag logger.info(f"Received message via Kombu (Delivery Tag: {delivery_tag})") try: # Decode JSON payload if isinstance(body, bytes): message_data = json.loads(body.decode('utf-8')) elif isinstance(body, str): message_data = json.loads(body) elif isinstance(body, dict): message_data = body # Already decoded else: raise TypeError(f"Unexpected message body type: {type(body)}") logger.debug(f"Decoded message data: {message_data}") send_method = message_data.get('send_method') destination = message_data.get('destination') message_body = message_data.get('body') subject = message_data.get('subject', '') # Subject primarily for email if not all([send_method, destination, message_body]): logger.error(f"Invalid message format: Missing 'send_method', 'destination', or 'body'. Message: {body}") message.reject(requeue=False) # Reject and discard malformed message logger.warning(f"Message rejected (tag: {delivery_tag}) due to invalid format.") return logger.info(f"Processing alert - Method: {send_method}, Destination: {destination[:15]}...") success = False if send_method == "sms": success = send_telnyx_sms(recipient=destination, message_body=message_body) elif send_method == "email": # Check if email vars are present before calling if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL: logger.error(f"Cannot send email for tag {delivery_tag}: SendGrid config missing.") success = False # Mark as failure for this specific task else: success = send_email(recipient_email=destination, subject=subject, body=message_body) elif send_method == "phone": # Check if TTS var is present before calling if not TELNYX_CONNECTION_ID: logger.error(f"Cannot make TTS call for tag {delivery_tag}: TELNYX_CONNECTION_ID missing.") success = False # Mark as failure else: success = make_telnyx_tts_call(recipient=destination, tts_body=message_body) else: logger.error(f"Unsupported send_method: '{send_method}'. Discarding message (tag: {delivery_tag}).") message.reject(requeue=False) # Reject and discard return # Acknowledge or Requeue/Reject based on success if success: logger.info(f"Successfully submitted '{send_method}' task for tag {delivery_tag}. Acknowledging.") message.ack() else: logger.error(f"Failed to submit '{send_method}' task for tag {delivery_tag}. Rejecting message (requeue=False).") message.reject(requeue=False) except (json.JSONDecodeError, TypeError) as decode_err: logger.error(f"Failed to decode or process message body (Tag: {delivery_tag}): {decode_err}. Body sample: {str(body)[:100]}. Rejecting (requeue=False).") message.reject(requeue=False) # Reject malformed/unexpected message type except Exception as e: # Catch-all for unexpected errors during processing logger.exception(f"Unexpected error processing message (Tag: {delivery_tag}): {e}. Rejecting message (requeue=False).") try: message.reject(requeue=False) # Try to reject even after an exception except Exception as reject_e: logger.error(f"Further error trying to reject message (Tag: {delivery_tag}): {reject_e}") # --- Main Service Logic using Kombu (Corrected) --- def main(): """Main function to start the service using Kombu.""" logger.info("Starting Alert Service using Kombu...") check_env_vars() if not setup_telnyx(): sys.exit("Failed to configure Telnyx client. Exiting.") logger.info(f"Attempting to connect to RabbitMQ via Kombu at {RABBITMQ_URL}") # Connection loop while True: try: # Establish connection using a context manager with Connection(RABBITMQ_URL, connect_timeout=10, heartbeat=60) as connection: logger.info("Kombu connection established.") # *** FIX: Create the Consumer *using* the connection object *** # This ensures the consumer is properly bound to the connection/channel. consumer = connection.Consumer( queues=[alert_queue], # List of queues to consume from callbacks=[handle_message], # List of callbacks accept=['json', 'text/plain'],# Define acceptable content-types prefetch_count=1 # Process one message at a time ) # Start consuming messages. This declares queues, sets QoS etc. consumer.consume() logger.info(f"Kombu consumer started. Waiting for messages on '{RABBITMQ_ALERTS_QNAME}'. To exit press CTRL+C") # Inner loop to process messages using drain_events while True: try: # Process messages indefinitely, checking for events every second connection.drain_events(timeout=1) except socket.timeout: # No events within the timeout, connection likely still fine. Continue loop. # You could add a check here connection.connected if needed. pass except KeyboardInterrupt: logger.info("KeyboardInterrupt received during drain_events. Shutting down...") consumer.cancel() # Stop consuming logger.info("Kombu consumer cancelled.") # Exit the inner loop to allow the 'with Connection' block to close raise # Re-raise KeyboardInterrupt to exit outer loop/program except (KombuError, ConnectionResetError, ConnectionAbortedError) as e: # Catch specific connection/channel errors logger.error(f"Kombu connection/channel error during drain_events: {e}. Breaking inner loop to reconnect...") consumer.cancel() # Try to cancel consumer cleanly break # Exit inner loop to force reconnection via outer loop except Exception as e: # Catch other unexpected errors in consumer loop logger.exception(f"Unexpected error in Kombu drain_events loop: {e}. Breaking inner loop to reconnect...") consumer.cancel() # Try to cancel consumer cleanly break # Exit inner loop # Handle outer loop exceptions (connection failures, final KeyboardInterrupt) except KeyboardInterrupt: logger.info("KeyboardInterrupt received during connection attempt or after exit from inner loop. Exiting.") break # Exit the main while loop except (ConnectionRefusedError, KombuError, Exception) as conn_err: logger.error(f"Failed to connect or catastrophic failure ({type(conn_err).__name__}): {conn_err}. Retrying in 10 seconds...") # Consumer is already cancelled or wasn't created if connection failed initially time.sleep(10) logger.info("Alert Service stopped.") if __name__ == "__main__": main()