--- parser3/src/classes/amqp.C 2025/11/07 01:26:13 1.2 +++ parser3/src/classes/amqp.C 2025/11/22 15:36:40 1.7 @@ -16,12 +16,13 @@ #ifdef WITH_AMQP #include #include +#include #include #include #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.2 2025/11/07 01:26:13 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.7 2025/11/22 15:36:40 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -32,6 +33,55 @@ public: DECLARE_CLASS_VAR(amqp, new MAmqp); +#ifdef WITH_AMQP + +static void status_check(int ret, const char *detail=""){ + if(ret == AMQP_STATUS_OK) + return; + + const char* error_str = amqp_error_string2(ret); + if(error_str) { + throw Exception("amqp", 0, "%sfailed: %s", detail, error_str); + } else { + throw Exception("amqp", 0, "%sfailed: error %d", detail, ret); + } +} + +static void check(amqp_rpc_reply_t rr, const char *detail=""){ + if(rr.reply_type == AMQP_RESPONSE_NORMAL) + return; + + // Extract error message from reply + const char* error_msg = 0; + size_t error_len = 0; + if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { + if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { + amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { + amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } + } + + if(error_msg) { + throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg); + } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { + status_check(rr.library_error, detail); + } + + throw Exception("amqp", 0, "%sfailed", detail); +} + +#endif // WITH_AMQP + + static void _create(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); @@ -43,6 +93,11 @@ VAmqp& self=GET_SELF(r, VAmqp); const char* vhost_c = "/"; const char* locale_c = "en_US"; int heartbeat = 30; // seconds + const char* tls_ca = 0; + const char* tls_cert = 0; + const char* tls_key = 0; + bool tls_specified = false; + bool tls_verify = true; if(params.count()>0){ if(HashStringValue* options=params.as_hash(0)){ @@ -63,6 +118,24 @@ VAmqp& self=GET_SELF(r, VAmqp); locale_c=value->as_string().cstr(); } else if(key=="heartbeat"){ heartbeat=r.process(*value).as_int(); + } else if(key=="tls"){ + tls_specified = true; + if(HashStringValue* tls_options=value->get_hash()){ + for(HashStringValue::Iterator t(*tls_options); t; t.next()){ + String::Body tkey=t.key(); + Value* tval=t.value(); + if(tkey=="ca"){ + tls_ca=tval->as_string().cstr(); + } else if(tkey=="cert"){ + tls_cert=tval->as_string().cstr(); + } else if(tkey=="key"){ + tls_key=tval->as_string().cstr(); + } else if(tkey=="verify"){ + tls_verify=r.process(*tval).as_bool(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } + } } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } @@ -70,16 +143,42 @@ VAmqp& self=GET_SELF(r, VAmqp); } amqp_connection_state_t conn = amqp_new_connection(); - amqp_socket_t* socket = amqp_tcp_socket_new(conn); - if(!socket) - throw Exception("amqp", 0, "failed to create TCP socket"); - if(amqp_socket_open(socket, host_c, port)) - throw Exception("amqp", 0, "failed to open TCP socket"); + amqp_socket_t* socket = 0; + + if(tls_specified) { + socket = amqp_ssl_socket_new(conn); + if(!socket) + throw Exception("amqp", 0, "failed to create SSL socket"); + + // Set CA certificate if provided + if(tls_ca) + if(amqp_ssl_socket_set_cacert(socket, tls_ca)) + throw Exception("amqp", 0, "failed to set CA certificate"); + + // Set client certificate and key if provided + if(tls_cert && tls_key) { + if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key)) + throw Exception("amqp", 0, "failed to set client certificate/key"); + } else if(tls_cert || tls_key) { + throw Exception("amqp", 0, "both cert and key must be specified for TLS"); + } + + // If CA is provided, peer verification will use it + amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca); + // If verify=true, enable hostname verification + amqp_ssl_socket_set_verify_hostname(socket, tls_verify); + } else { + socket = amqp_tcp_socket_new(conn); + if(!socket) + throw Exception("amqp", 0, "failed to create TCP socket"); + } + + status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket "); amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c); if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){ amqp_destroy_connection(conn); - throw Exception("amqp", 0, "login failed"); + check(rlogin, "login "); } int channel = 1; @@ -88,7 +187,7 @@ VAmqp& self=GET_SELF(r, VAmqp); if(ropen.reply_type != AMQP_RESPONSE_NORMAL){ amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); - throw Exception("amqp", 0, "channel open failed"); + check(ropen, "open channel "); } self.fconnection = conn; @@ -96,16 +195,11 @@ VAmqp& self=GET_SELF(r, VAmqp); #else (void)params; (void)self; throw Exception("amqp", 0, "compiled without amqp support"); -#endif +#endif // WITH_AMQP } #ifdef WITH_AMQP -static void check(const char* action, amqp_rpc_reply_t rr){ - if(rr.reply_type != AMQP_RESPONSE_NORMAL) - throw Exception("amqp", 0, "%s failed", action); -} - #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l))) #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l)) @@ -114,7 +208,6 @@ static void _publish(Request& r, MethodP const String &msg=params.as_string(0, "msg must be string"); const char* exchange_c = ""; // default exchange const char* routing_key_c = 0; - bool have_queue_sugar=false; bool mandatory=false; amqp_basic_properties_t props; @@ -130,85 +223,75 @@ static void _publish(Request& r, MethodP } else if(key=="routing_key"){ routing_key_c=value->as_string().cstr(); } else if(key=="queue"){ - routing_key_c=value->as_string().cstr(); have_queue_sugar=true; + routing_key_c=value->as_string().cstr(); } else if(key=="mandatory"){ mandatory=r.process(*value).as_bool(); - } else if(key=="properties"){ - // parse message properties - if(HashStringValue* ph=value->get_hash()){ - for(HashStringValue::Iterator p(*ph); p; p.next()){ - String::Body pkey=p.key(); - Value* pval=p.value(); - if(pkey=="content_type"){ - const char* v=pval->as_string().cstr(); - props.content_type=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG; - } else if(pkey=="content_encoding"){ - const char* v=pval->as_string().cstr(); - props.content_encoding=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG; - } else if(pkey=="delivery_mode"){ - uint8_t dm=(uint8_t)pval->as_int(); - props.delivery_mode=dm; - props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG; - } else if(pkey=="priority"){ - uint8_t pr=(uint8_t)pval->as_int(); - props.priority=pr; - props._flags|=AMQP_BASIC_PRIORITY_FLAG; - } else if(pkey=="correlation_id"){ - const char* v=pval->as_string().cstr(); - props.correlation_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG; - } else if(pkey=="reply_to"){ - const char* v=pval->as_string().cstr(); - props.reply_to=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_REPLY_TO_FLAG; - } else if(pkey=="expiration"){ - const char* v=pval->as_string().cstr(); - props.expiration=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_EXPIRATION_FLAG; - } else if(pkey=="message_id"){ - const char* v=pval->as_string().cstr(); - props.message_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG; - } else if(pkey=="timestamp"){ - uint64_t ts=(uint64_t)pval->as_double(); - props.timestamp=ts; - props._flags|=AMQP_BASIC_TIMESTAMP_FLAG; - } else if(pkey=="type"){ - const char* v=pval->as_string().cstr(); - props.type=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_TYPE_FLAG; - } else if(pkey=="user_id"){ - const char* v=pval->as_string().cstr(); - props.user_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_USER_ID_FLAG; - } else if(pkey=="app_id"){ - const char* v=pval->as_string().cstr(); - props.app_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_APP_ID_FLAG; - } else if(pkey=="headers"){ -/* if(HashStringValue* hh=pval->get_hash()){ - size_t count=hh->count(); - amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0; - size_t idx=0; - for(HashStringValue::Iterator hi(*hh); hi; hi.next()){ - String::Body hkey=hi.key(); - const char* hv=hi.value()->as_string().cstr(); - entries[idx].key=amqp_cstring_bytes(hkey.cstr()); - entries[idx].value.kind=AMQP_FIELD_KIND_UTF8; - entries[idx].value.value.bytes=amqp_cstring_bytes(hv); - idx++; - } - props.headers.num_entries=(int)count; - props.headers.entries=entries; - props._flags|=AMQP_BASIC_HEADERS_FLAG; - } -*/ } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } else if(key=="content_type"){ + const char* v=value->as_string().cstr(); + props.content_type=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG; + } else if(key=="content_encoding"){ + const char* v=value->as_string().cstr(); + props.content_encoding=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG; + } else if(key=="delivery_mode"){ + uint8_t dm=(uint8_t)value->as_int(); + props.delivery_mode=dm; + props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG; + } else if(key=="priority"){ + uint8_t pr=(uint8_t)value->as_int(); + props.priority=pr; + props._flags|=AMQP_BASIC_PRIORITY_FLAG; + } else if(key=="correlation_id"){ + const char* v=value->as_string().cstr(); + props.correlation_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG; + } else if(key=="reply_to"){ + const char* v=value->as_string().cstr(); + props.reply_to=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_REPLY_TO_FLAG; + } else if(key=="expiration"){ + const char* v=value->as_string().cstr(); + props.expiration=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_EXPIRATION_FLAG; + } else if(key=="message_id"){ + const char* v=value->as_string().cstr(); + props.message_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG; + } else if(key=="timestamp"){ + uint64_t ts=(uint64_t)value->as_double(); + props.timestamp=ts; + props._flags|=AMQP_BASIC_TIMESTAMP_FLAG; + } else if(key=="type"){ + const char* v=value->as_string().cstr(); + props.type=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_TYPE_FLAG; + } else if(key=="user_id"){ + const char* v=value->as_string().cstr(); + props.user_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_USER_ID_FLAG; + } else if(key=="app_id"){ + const char* v=value->as_string().cstr(); + props.app_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_APP_ID_FLAG; + } else if(key=="headers"){ +/* if(HashStringValue* hh=pval->get_hash()){ + size_t count=hh->count(); + amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0; + size_t idx=0; + for(HashStringValue::Iterator hi(*hh); hi; hi.next()){ + String::Body hkey=hi.key(); + const char* hv=hi.value()->as_string().cstr(); + entries[idx].key=amqp_cstring_bytes(hkey.cstr()); + entries[idx].value.kind=AMQP_FIELD_KIND_UTF8; + entries[idx].value.value.bytes=amqp_cstring_bytes(hv); + idx++; } + props.headers.num_entries=(int)count; + props.headers.entries=entries; + props._flags|=AMQP_BASIC_HEADERS_FLAG; } - } else +*/ } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } @@ -221,7 +304,7 @@ static void _publish(Request& r, MethodP body.len = msg.length(); body.bytes=(void*)msg.cstr(); - int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body); + int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body); if(ret!=AMQP_STATUS_OK) throw Exception("amqp", 0, "publish failed"); @@ -230,7 +313,6 @@ static void _publish(Request& r, MethodP if(props._flags & AMQP_BASIC_HEADERS_FLAG){ // delete [] props.headers.entries; } - (void)have_queue_sugar; } static void _release(Request& r, MethodParams&) { @@ -247,8 +329,8 @@ static void _release(Request& r, MethodP static void _ack(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); - const String &tag_s=params.as_string(0, "delivery tag must not be code"); - int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0); + double tag=params.as_double(0, "delivery tag must be number", r); + int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0); if(ret!=AMQP_STATUS_OK) throw Exception("amqp", 0, "ack failed"); } @@ -340,7 +422,7 @@ static void _declare_exchange(Request& r if(!name_c) throw Exception("amqp", 0, "name is required"); amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table); - check("declare exchange", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _delete_exchange(Request& r, MethodParams& params) { @@ -365,7 +447,7 @@ static void _delete_exchange(Request& r, } if(!name_c) throw Exception("amqp", 0, "exchange is required"); amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused); - check("delete exchange", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _declare_queue(Request& r, MethodParams& params) { @@ -392,7 +474,7 @@ static void _declare_queue(Request& r, M } } amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table); - check("declare queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); if(!queue_c && ok){ r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len)); } @@ -421,7 +503,7 @@ static void _delete_queue(Request& r, Me } if(!queue_c) throw Exception("amqp", 0, "queue is required"); amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty); - check("delete queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _bind_queue(Request& r, MethodParams& params) { @@ -447,7 +529,7 @@ static void _bind_queue(Request& r, Meth } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table); - check("bind queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _unbind_queue(Request& r, MethodParams& params) { @@ -473,7 +555,7 @@ static void _unbind_queue(Request& r, Me } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table); - check("unbind queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _consume(Request& r, MethodParams& params) { @@ -487,7 +569,9 @@ static void _consume(Request& r, MethodP for(HashStringValue::Iterator i(*options); i; i.next()){ String::Body key=i.key(); Value* value=i.value(); - if(key=="queue"){ + if(key=="callback"){ + callback=value->get_junction(); + } else if(key=="queue"){ queue_c=value->as_string().cstr(); } else if(key=="consumer_tag"){ consumer_tag_c=value->as_string().cstr(); @@ -495,8 +579,6 @@ static void _consume(Request& r, MethodP no_ack=r.process(*value).as_bool(); } else if(key=="nowait"){ nowait=r.process(*value).as_bool(); - } else if(key=="callback"){ - callback=value->get_junction(); } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } @@ -508,9 +590,7 @@ static void _consume(Request& r, MethodP amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes, 0 /*no_local*/, no_ack, nowait, amqp_empty_table); - amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection()); - if(rr.reply_type != AMQP_RESPONSE_NORMAL) - throw Exception("amqp", 0, "consume failed"); + check(amqp_get_rpc_reply(self.connection())); self.fstop=false; while(!self.fstop){ @@ -521,7 +601,7 @@ static void _consume(Request& r, MethodP if(res.reply_type == AMQP_RESPONSE_NORMAL){ VHash &vh=*new VHash; HashStringValue* h=vh.get_hash(); h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len)); - h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag))); + h->put("delivery_tag", new VDouble((double)envelope.delivery_tag)); h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len)); h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len)); @@ -545,7 +625,7 @@ static void _stop_consume(Request& r, Me self.fstop=true; } -#endif +#endif // WITH_AMQP // constructor MAmqp::MAmqp(): Methoded("amqp") { @@ -565,7 +645,5 @@ MAmqp::MAmqp(): Methoded("amqp") { add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1); add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1); add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0); -#endif +#endif // WITH_AMQP } - -