#!/usr/bin/env python3 import os import sys import logging import json import time import argparse from datetime import datetime, timezone import requests # Added for voice calls import telnyx # For SMS from kombu import Connection, Exchange, Queue from kombu.exceptions import KombuError, OperationalError from kombu.simple import SimpleQueue # Using SimpleQueue for get/put ease from dotenv import load_dotenv # --- Configuration Loading --- # Load environment variables from $HOME/.env then ./.env (current dir overrides home) dotenv_path_home = os.path.join(os.environ.get("HOME", "."), '.env') load_dotenv(dotenv_path=dotenv_path_home) dotenv_path_cwd = os.path.join(os.getcwd(), '.env') load_dotenv(dotenv_path=dotenv_path_cwd, override=True) # --- Configuration --- # General Telnyx TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY") TELNYX_MESSAGING_PROFILE_ID = os.environ.get("TELNYX_MESSAGING_PROFILE_ID") # SMS Specific TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID") # Sending SMS *FROM* this E.164 number TELNYX_SENDER_ID_ALPHA = os.environ.get("TELNYX_SENDER_ID_ALPHA") # Sending SMS/Voice *FROM* this Alpha Sender ID # Voice Call Specific TELNYX_CONNECTION_ID_VOICE = os.environ.get("TELNYX_CONNECTION_ID_VOICE","2671409623596009055") TELNYX_WEBHOOK_URL_VOICE = os.environ.get("TELNYX_WEBHOOK_URL_VOICE", "http://your-webhook-server.com/telnyx-webhook") # RabbitMQ Configuration raw_rabbitmq_url = os.environ.get("RABBITMQ_URL", "localhost") if raw_rabbitmq_url == "localhost": RABBITMQ_URL = "amqp://guest:guest@localhost:5672//" else: RABBITMQ_URL = raw_rabbitmq_url RABBITMQ_ALERTS_QNAME = os.environ.get("RABBITMQ_ALERTS_QNAME", "alerts") MOSQUITTO_PASSWORD_FILE = os.environ.get('MOSQUITTO_PASSWORD_FILE', '/etc/mosquitto/passwd') MOSQUITTO_ACL_FILE = os.environ.get('MOSQUITTO_ACL_FILE', '/etc/mosquitto/acl') # Kombu Exchange and Queue definitions exchange = Exchange("", type='direct') alert_queue_obj = Queue(RABBITMQ_ALERTS_QNAME, exchange=exchange, routing_key=RABBITMQ_ALERTS_QNAME, durable=True) # --- Setup Logging --- logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - [%(funcName)s] - %(message)s", stream=sys.stdout, ) logger = logging.getLogger("MessengerTool") # --- Helper Functions --- def check_env_vars(require_telnyx_sms=False, require_telnyx_voice=False, require_rabbitmq=False): missing_vars = [] if require_rabbitmq: if not RABBITMQ_URL: missing_vars.append("RABBITMQ_URL") if not RABBITMQ_ALERTS_QNAME: missing_vars.append("RABBITMQ_ALERTS_QNAME") if require_telnyx_sms or require_telnyx_voice: if not TELNYX_API_KEY: missing_vars.append("TELNYX_API_KEY") if require_telnyx_sms: if not TELNYX_SENDER_ID: missing_vars.append("TELNYX_SENDER_ID (for numeric SMS sender)") # TELNYX_SENDER_ID_ALPHA is needed if 'alpha' or 'auto' for non-US SMS is used if not TELNYX_SENDER_ID_ALPHA: logger.debug("TELNYX_SENDER_ID_ALPHA not set (optional for SMS if only numeric is used or for US destinations in auto mode).") if require_telnyx_voice: if not TELNYX_CONNECTION_ID_VOICE: missing_vars.append("TELNYX_CONNECTION_ID_VOICE") if not TELNYX_WEBHOOK_URL_VOICE: missing_vars.append("TELNYX_WEBHOOK_URL_VOICE") if not TELNYX_SENDER_ID: missing_vars.append("TELNYX_SENDER_ID (for numeric voice caller ID)") if not TELNYX_SENDER_ID_ALPHA: missing_vars.append("TELNYX_SENDER_ID_ALPHA (for alphanumeric voice caller ID)") if missing_vars: logger.error(f"Missing required environment variables for this operation: {', '.join(missing_vars)}") sys.exit(1) def normalize_phone_number(phone_number_str: str) -> str: if not phone_number_str: return "" cleaned_number = "".join(filter(lambda char: char.isdigit() or char == '+', phone_number_str)) if not cleaned_number.startswith('+'): if cleaned_number.startswith('1') and len(cleaned_number) >= 11: cleaned_number = '+' + cleaned_number else: cleaned_number = '+' + cleaned_number return cleaned_number # --- Telnyx SMS Sending Function --- def setup_telnyx_sms_client(): check_env_vars(require_telnyx_sms=True) try: telnyx.api_key = TELNYX_API_KEY logger.info("Telnyx client for SMS configured.") return True except Exception as e: logger.error(f"Failed to configure Telnyx client for SMS: {e}") return False def send_telnyx_sms(recipient_phone: str, message_body: str, caller_id_type: str = "auto") -> bool: """Sends an SMS using the Telnyx API, with dynamic 'from_' based on caller_id_type.""" if not setup_telnyx_sms_client(): return False if not recipient_phone or not message_body: logger.error("Cannot send Telnyx SMS: Recipient phone and message are required.") return False recipient_phone = normalize_phone_number(recipient_phone) from_id = TELNYX_SENDER_ID # Default to numeric if caller_id_type == "alpha": if TELNYX_SENDER_ID_ALPHA: from_id = TELNYX_SENDER_ID_ALPHA else: logger.warning("SMS Caller ID type 'alpha' requested, but TELNYX_SENDER_ID_ALPHA is not set. Falling back to numeric.") elif caller_id_type == "numeric": from_id = TELNYX_SENDER_ID elif caller_id_type == "auto": if recipient_phone.startswith("+1"): # US/Canada from_id = TELNYX_SENDER_ID elif TELNYX_SENDER_ID_ALPHA: # Other international, try Alpha if available from_id = TELNYX_SENDER_ID_ALPHA else: # Fallback to numeric if Alpha not set for international logger.warning("SMS Caller ID type 'auto' for non-US/Canada destination, but TELNYX_SENDER_ID_ALPHA not set. Using numeric.") from_id = TELNYX_SENDER_ID else: # Should not happen with argparse choices logger.warning(f"Invalid caller_id_type '{caller_id_type}' for SMS. Defaulting to numeric.") from_id = TELNYX_SENDER_ID logger.info(f"Attempting to send Telnyx SMS from '{from_id}' to '{recipient_phone}'") try: message_create_params = { "from_": from_id, "to": recipient_phone, "text": message_body, "messaging_profile_id": TELNYX_MESSAGING_PROFILE_ID, "type": "sms" } if not message_create_params["messaging_profile_id"]: del message_create_params["messaging_profile_id"] response = telnyx.Message.create(**message_create_params) logger.info(f"SMS submitted successfully. Message ID: {response.id}") return True except telnyx.error.TelnyxError as e: logger.error(f"Telnyx API Error sending SMS to {recipient_phone} from '{from_id}': {e}") if hasattr(e, 'json_body') and e.json_body and 'errors' in e.json_body: for err in e.json_body['errors']: logger.error(f" - Code: {err.get('code')}, Title: {err.get('title')}, Detail: {err.get('detail')}") return False except Exception as e: logger.error(f"Unexpected error sending Telnyx SMS to {recipient_phone}: {e}") return False # --- Telnyx Voice Call Function --- def make_telnyx_voice_call(to_phone: str, tts_message: str = None, audio_url: str = None, connection_id_override: str = None, webhook_url_override: str = None, caller_id_type: str = "auto", amd_mode: str = "disabled", extra_custom_headers: list = None): check_env_vars(require_telnyx_voice=True) api_url = "https://api.telnyx.com/v2/calls" to_phone = normalize_phone_number(to_phone) from_id = TELNYX_SENDER_ID if caller_id_type == "alpha": if TELNYX_SENDER_ID_ALPHA: from_id = TELNYX_SENDER_ID_ALPHA else: logger.warning("Voice Caller ID type 'alpha' requested, but TELNYX_SENDER_ID_ALPHA is not set. Falling back to numeric.") elif caller_id_type == "numeric": from_id = TELNYX_SENDER_ID elif caller_id_type == "auto": if to_phone.startswith("+1"): from_id = TELNYX_SENDER_ID elif TELNYX_SENDER_ID_ALPHA: from_id = TELNYX_SENDER_ID_ALPHA else: logger.warning("Voice Caller ID type 'auto' for non-US/Canada destination, but TELNYX_SENDER_ID_ALPHA not set. Using numeric.") from_id = TELNYX_SENDER_ID telnyx_connection_id = connection_id_override or TELNYX_CONNECTION_ID_VOICE telnyx_webhook_url = webhook_url_override or TELNYX_WEBHOOK_URL_VOICE headers = { "Authorization": f"Bearer {TELNYX_API_KEY}", "Content-Type": "application/json", "Accept": "application/json" } payload_custom_headers = [] if tts_message: payload_custom_headers.append({"name": "X-TTS-Payload", "value": tts_message}) if audio_url: payload_custom_headers.append({"name": "X-Audio-Url", "value": audio_url}) if extra_custom_headers: for header_item in extra_custom_headers: if "=" in header_item: name, value = header_item.split("=", 1) payload_custom_headers.append({"name": name.strip(), "value": value.strip()}) else: logger.warning(f"Skipping malformed custom header: {header_item}. Expected NAME=VALUE") api_payload = { "to": to_phone, "from": from_id, "connection_id": telnyx_connection_id, "webhook_url": telnyx_webhook_url, "webhook_url_method": "POST", "answering_machine_detection": amd_mode } if payload_custom_headers: api_payload["custom_headers"] = payload_custom_headers logger.info(f"Initiating Telnyx voice call from '{from_id}' to '{to_phone}' using connection '{telnyx_connection_id}'") logger.debug(f"Voice Call API Payload: {json.dumps(api_payload, indent=2)}") try: response = requests.post(api_url, headers=headers, json=api_payload) response.raise_for_status() call_data = response.json().get("data", {}) logger.info("Voice call initiated successfully!") logger.info(f" Call Control ID: {call_data.get('call_control_id')}, Session ID: {call_data.get('call_session_id')}, Leg ID: {call_data.get('call_leg_id')}") return True except requests.exceptions.HTTPError as e: logger.error(f"Telnyx API HTTP Error initiating voice call: {e}") if e.response is not None: logger.error(f"Status Code: {e.response.status_code}") try: error_details = e.response.json() if "errors" in error_details: for err in error_details["errors"]: logger.error(f" - Code: {err.get('code')}, Title: {err.get('title')}, Detail: {err.get('detail')}") else: logger.error(f"Response Body: {json.dumps(error_details, indent=2)}") except json.JSONDecodeError: logger.error(f"Response Body (raw): {e.response.text}") return False except requests.exceptions.RequestException as e: logger.error(f"Network error initiating voice call: {e}") return False except Exception as e: logger.error(f"Unexpected error initiating voice call: {e}") return False # --- RabbitMQ Interaction Functions --- def publish_to_rmq(payload: dict): check_env_vars(require_rabbitmq=True) try: with Connection(RABBITMQ_URL) as connection: logger.info(f"Connecting to RabbitMQ at {connection.as_uri(hide_password=True)} to publish...") producer = connection.Producer(serializer='json') producer.publish(payload, exchange=exchange, routing_key=RABBITMQ_ALERTS_QNAME, declare=[alert_queue_obj], retry=True, retry_policy={'interval_start': 0, 'interval_step': 2, 'interval_max': 30, 'max_retries': 3}, delivery_mode='persistent') logger.info(f"Message published successfully to queue '{RABBITMQ_ALERTS_QNAME}'.") return True except (KombuError, OperationalError, ConnectionRefusedError, Exception) as e: logger.error(f"Failed to publish message to RabbitMQ ({RABBITMQ_URL}): {e}") return False def peek_messages_rmq(limit=5): check_env_vars(require_rabbitmq=True) messages_peeked = [] try: with Connection(RABBITMQ_URL) as connection: logger.info(f"Connecting to RabbitMQ at {connection.as_uri(hide_password=True)} to peek...") alert_queue_obj(connection.channel()).declare(passive=True) queue = connection.SimpleQueue(RABBITMQ_ALERTS_QNAME) logger.info(f"Peeking up to {limit} messages from queue '{RABBITMQ_ALERTS_QNAME}'...") count = 0 while count < limit: try: message = queue.get(block=False) print("-" * 20 + f" Message {count + 1} " + "-" * 20) print(json.dumps(message.payload, indent=2)) messages_peeked.append(message.payload) message.requeue() count += 1 except queue.Empty: logger.info("Queue is empty or no more messages."); break except Exception as get_err: logger.error(f"Error during message get/requeue: {get_err}") if 'message' in locals() and hasattr(message, 'delivery_tag') and message.delivery_tag: try: message.requeue() except Exception as req_err: logger.error(f"Failed to requeue msg after error: {req_err}") break queue.close() if not messages_peeked: logger.info(f"No messages found in queue '{RABBITMQ_ALERTS_QNAME}'.") return messages_peeked except (KombuError, OperationalError, ConnectionRefusedError) as e: logger.error(f"Failed to peek from RMQ: {e}"); return [] except Exception as e: logger.error(f"Unexpected error peeking: {e}"); return [] def get_and_process_one_rmq_sms(target_phone: str): check_env_vars(require_rabbitmq=True, require_telnyx_sms=True) message = None try: with Connection(RABBITMQ_URL) as connection: logger.info(f"Connecting to RMQ for get_and_process_one_rmq_sms...") alert_queue_obj(connection.channel()).declare(passive=True) queue = connection.SimpleQueue(RABBITMQ_ALERTS_QNAME) try: message = queue.get(block=True, timeout=1) logger.info(f"Got message from queue (Tag: {message.delivery_tag}).") payload = message.payload if not isinstance(payload, dict): raise TypeError("Msg payload not dict.") body = payload.get("body") if not body: raise ValueError("Msg payload missing 'body'.") # For this function, caller_id_type for SMS from RMQ will be 'auto' by default # as it's not specified in the RMQ message itself. sms_sent = send_telnyx_sms(recipient_phone=target_phone, message_body=body, caller_id_type="auto") if sms_sent: logger.info(f"Telnyx SMS sent. Acking RMQ msg (Tag: {message.delivery_tag}).") message.ack() else: logger.error(f"Failed to send SMS. Rejecting RMQ msg (Tag: {message.delivery_tag}).") message.reject(requeue=False) queue.close() return sms_sent except queue.Empty: logger.info(f"Queue '{RABBITMQ_ALERTS_QNAME}' empty."); queue.close(); return False except (KombuError, OperationalError, ConnectionRefusedError, TypeError, ValueError, json.JSONDecodeError) as e: logger.error(f"RMQ/Payload error: {e}.") if message and hasattr(message, 'delivery_tag'): try: message.reject(requeue=False) except Exception as reject_e: logger.error(f"Failed to reject msg after error: {reject_e}") return False except Exception as e: logger.error(f"Unexpected error in get_and_process_one_rmq_sms: {e}"); return False def clear_rmq_messages(mode='one'): check_env_vars(require_rabbitmq=True) try: with Connection(RABBITMQ_URL) as connection: logger.info(f"Connecting to RMQ to clear messages (mode: {mode})...") channel = connection.channel() alert_queue_obj(channel).declare(passive=True) if mode == 'one': method_frame, _, _ = channel.basic_get(RABBITMQ_ALERTS_QNAME) if method_frame: channel.basic_ack(method_frame.delivery_tag); logger.info("One msg acked.") else: logger.info(f"Queue '{RABBITMQ_ALERTS_QNAME}' empty.") elif mode == 'all': confirm = input("PURGE ALL messages from queue? (yes/no): ").lower() if confirm == 'yes': msg_count = channel.queue_purge(RABBITMQ_ALERTS_QNAME); logger.info(f"Queue purged. Approx {msg_count} msgs removed.") else: logger.info("Purge cancelled.") else: logger.error(f"Invalid clear mode '{mode}'.") channel.close(); return True except (KombuError, OperationalError, ConnectionRefusedError) as e: logger.error(f"Failed to clear RMQ: {e}"); return False except Exception as e: logger.error(f"Unexpected error clearing RMQ: {e}"); return False # --- Main Execution --- if __name__ == "__main__": parser = argparse.ArgumentParser(description="Messenger Tool: Send SMS/Voice via Telnyx and interact with RabbitMQ.", formatter_class=argparse.RawTextHelpFormatter) subparsers = parser.add_subparsers(dest='command', help='Available actions', required=True) # --- Subparser: sms --- parser_sms = subparsers.add_parser('sms', help='Send an SMS directly using Telnyx.') parser_sms.add_argument('--to', required=True, help='Recipient phone number (E.164 format, e.g., +14155552671).') parser_sms.add_argument('--message', required=True, help='The content of the SMS message.') parser_sms.add_argument( # ADDED for SMS '--caller-id-type', choices=['auto', 'alpha', 'numeric'], default='auto', help="Choose SMS caller ID type: 'auto' (numeric for +1, alpha otherwise), 'alpha', or 'numeric'. Default: auto." ) # --- Subparser: voice-tts-call --- parser_voice = subparsers.add_parser('voice-tts-call', help='Initiate a voice call with TTS or audio playback using Telnyx.', description=("Initiates a voice call. Your webhook needs to be configured in Telnyx\nto handle `call.answered` and issue `speak_text` or `play_audio`.")) parser_voice.add_argument('--to', default='+14082397258', help='Recipient phone for voice call (default: +14082397258).') parser_voice.add_argument('--message', default='Test message from Telnyx.', help='TTS message for call (default: "Test message from Telnyx."). Sent via X-TTS-Payload.') parser_voice.add_argument('--audio-url', default=None, help='URL of audio file (MP3/WAV) to play. Sent via X-Audio-Url.') parser_voice.add_argument('--connection-id', default=None, help='Override Telnyx Connection ID for the call.') parser_voice.add_argument('--webhook-url', default=None, help='Override webhook URL for call events.') parser_voice.add_argument('--caller-id-type', choices=['auto', 'alpha', 'numeric'], default='auto', help="Choose voice caller ID type: 'auto', 'alpha', or 'numeric'. Default: auto.") parser_voice.add_argument('--amd', choices=['disabled', 'detect', 'detect_beep', 'detect_words', 'greeting_end'], default='disabled', help="Answering Machine Detection mode (default: disabled).") parser_voice.add_argument('--custom-header', action='append', help='Add custom header. Format: NAME=VALUE. Multiple allowed.') # --- Subparser: send-rmq --- parser_rmq = subparsers.add_parser('send-rmq', help="Send formatted message to RabbitMQ 'alerts' queue.") parser_rmq.add_argument('--method', choices=['sms', 'email', 'phone'], default='sms', help='Method for consumer (default: sms).') parser_rmq.add_argument('--to', required=True, help='Destination (phone/email).') parser_rmq.add_argument('--body', required=True, help='Main content of message.') parser_rmq.add_argument('--subject', default='Notification', help='Subject line.') parser_rmq.add_argument('--timestamp', default=None, help='ISO 8601 timestamp (optional, defaults to now UTC).') # --- Subparser: peek-rmq --- parser_peek = subparsers.add_parser('peek-rmq', help="View messages in 'alerts' queue.") parser_peek.add_argument('--limit', type=int, default=5, help='Max messages to peek (default: 5).') # --- Subparser: process-one --- parser_process = subparsers.add_parser('process-one', help="Get message from RMQ, send via Telnyx SMS to specified number, then ACK.") parser_process.add_argument('--to', required=True, help='Phone number (E.164) to send SMS TO.') # --- Subparser: clear-rmq --- parser_clear = subparsers.add_parser('clear-rmq', help="Remove messages from 'alerts' queue.") parser_clear.add_argument('--mode', choices=['one', 'all'], required=True, help="'one': next message, 'all': ALL messages (confirm).") args = parser.parse_args() success = False if args.command == 'sms': logger.info("Action: Send Telnyx SMS") success = send_telnyx_sms(recipient_phone=args.to, message_body=args.message, caller_id_type=args.caller_id_type) # Pass new arg elif args.command == 'voice-tts-call': logger.info("Action: Initiate Telnyx Voice Call") success = make_telnyx_voice_call(to_phone=args.to, tts_message=args.message, audio_url=args.audio_url, connection_id_override=args.connection_id, webhook_url_override=args.webhook_url, caller_id_type=args.caller_id_type, amd_mode=args.amd, extra_custom_headers=args.custom_header) elif args.command == 'send-rmq': logger.info("Action: Send Message to RabbitMQ") timestamp = args.timestamp if args.timestamp else datetime.now(timezone.utc).isoformat() payload = {"send_method": args.method, "subject": args.subject, "body": args.body, "timestamp": timestamp, "destination": args.to} success = publish_to_rmq(payload) elif args.command == 'peek-rmq': logger.info("Action: Peek RabbitMQ Messages") peek_messages_rmq(limit=args.limit); success = True elif args.command == 'process-one': logger.info("Action: Process One RMQ Message as Telnyx SMS") success = get_and_process_one_rmq_sms(target_phone=args.to) elif args.command == 'clear-rmq': logger.info(f"Action: Clear RabbitMQ Messages (Mode: {args.mode})") success = clear_rmq_messages(mode=args.mode) else: logger.error(f"Unknown command: {args.command}"); parser.print_help(); sys.exit(1) if success: logger.info(f"Command '{args.command}' completed successfully."); sys.exit(0) else: logger.error(f"Command '{args.command}' failed."); sys.exit(1)