--- parser3/src/classes/amqp.C 2025/11/07 23:00:11 1.4 +++ parser3/src/classes/amqp.C 2025/11/07 23:53:42 1.5 @@ -16,12 +16,13 @@ #ifdef WITH_AMQP #include #include +#include #include #include #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.4 2025/11/07 23:00:11 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.5 2025/11/07 23:53:42 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -43,6 +44,11 @@ VAmqp& self=GET_SELF(r, VAmqp); const char* vhost_c = "/"; const char* locale_c = "en_US"; int heartbeat = 30; // seconds + const char* tls_ca = 0; + const char* tls_cert = 0; + const char* tls_key = 0; + bool tls_specified = false; + bool tls_verify = true; if(params.count()>0){ if(HashStringValue* options=params.as_hash(0)){ @@ -63,6 +69,24 @@ VAmqp& self=GET_SELF(r, VAmqp); locale_c=value->as_string().cstr(); } else if(key=="heartbeat"){ heartbeat=r.process(*value).as_int(); + } else if(key=="tls"){ + tls_specified = true; + if(HashStringValue* tls_options=value->get_hash()){ + for(HashStringValue::Iterator t(*tls_options); t; t.next()){ + String::Body tkey=t.key(); + Value* tval=t.value(); + if(tkey=="ca"){ + tls_ca=tval->as_string().cstr(); + } else if(tkey=="cert"){ + tls_cert=tval->as_string().cstr(); + } else if(tkey=="key"){ + tls_key=tval->as_string().cstr(); + } else if(tkey=="verify"){ + tls_verify=r.process(*tval).as_bool(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } + } } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } @@ -70,11 +94,38 @@ VAmqp& self=GET_SELF(r, VAmqp); } amqp_connection_state_t conn = amqp_new_connection(); - amqp_socket_t* socket = amqp_tcp_socket_new(conn); - if(!socket) - throw Exception("amqp", 0, "failed to create TCP socket"); + amqp_socket_t* socket = 0; + + if(tls_specified) { + socket = amqp_ssl_socket_new(conn); + if(!socket) + throw Exception("amqp", 0, "failed to create SSL socket"); + + // Set CA certificate if provided + if(tls_ca) + if(amqp_ssl_socket_set_cacert(socket, tls_ca)) + throw Exception("amqp", 0, "failed to set CA certificate"); + + // Set client certificate and key if provided + if(tls_cert && tls_key) { + if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key)) + throw Exception("amqp", 0, "failed to set client certificate/key"); + } else if(tls_cert || tls_key) { + throw Exception("amqp", 0, "both cert and key must be specified for TLS"); + } + + // If CA is provided, peer verification will use it + amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca); + // If verify=true, enable hostname verification + amqp_ssl_socket_set_verify_hostname(socket, tls_verify); + } else { + socket = amqp_tcp_socket_new(conn); + if(!socket) + throw Exception("amqp", 0, "failed to create TCP socket"); + } + if(amqp_socket_open(socket, host_c, port)) - throw Exception("amqp", 0, "failed to open TCP socket"); + throw Exception("amqp", 0, "failed to open socket"); amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c); if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){