--- parser3/src/classes/amqp.C 2025/11/07 23:53:42 1.5 +++ parser3/src/classes/amqp.C 2025/11/22 15:36:40 1.7 @@ -22,7 +22,7 @@ #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.5 2025/11/07 23:53:42 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.7 2025/11/22 15:36:40 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -33,6 +33,55 @@ public: DECLARE_CLASS_VAR(amqp, new MAmqp); +#ifdef WITH_AMQP + +static void status_check(int ret, const char *detail=""){ + if(ret == AMQP_STATUS_OK) + return; + + const char* error_str = amqp_error_string2(ret); + if(error_str) { + throw Exception("amqp", 0, "%sfailed: %s", detail, error_str); + } else { + throw Exception("amqp", 0, "%sfailed: error %d", detail, ret); + } +} + +static void check(amqp_rpc_reply_t rr, const char *detail=""){ + if(rr.reply_type == AMQP_RESPONSE_NORMAL) + return; + + // Extract error message from reply + const char* error_msg = 0; + size_t error_len = 0; + if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { + if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { + amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { + amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } + } + + if(error_msg) { + throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg); + } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { + status_check(rr.library_error, detail); + } + + throw Exception("amqp", 0, "%sfailed", detail); +} + +#endif // WITH_AMQP + + static void _create(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); @@ -105,7 +154,7 @@ VAmqp& self=GET_SELF(r, VAmqp); if(tls_ca) if(amqp_ssl_socket_set_cacert(socket, tls_ca)) throw Exception("amqp", 0, "failed to set CA certificate"); - + // Set client certificate and key if provided if(tls_cert && tls_key) { if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key)) @@ -113,7 +162,7 @@ VAmqp& self=GET_SELF(r, VAmqp); } else if(tls_cert || tls_key) { throw Exception("amqp", 0, "both cert and key must be specified for TLS"); } - + // If CA is provided, peer verification will use it amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca); // If verify=true, enable hostname verification @@ -124,13 +173,12 @@ VAmqp& self=GET_SELF(r, VAmqp); throw Exception("amqp", 0, "failed to create TCP socket"); } - if(amqp_socket_open(socket, host_c, port)) - throw Exception("amqp", 0, "failed to open socket"); + status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket "); amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c); if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){ amqp_destroy_connection(conn); - throw Exception("amqp", 0, "login failed"); + check(rlogin, "login "); } int channel = 1; @@ -139,7 +187,7 @@ VAmqp& self=GET_SELF(r, VAmqp); if(ropen.reply_type != AMQP_RESPONSE_NORMAL){ amqp_connection_close(conn, AMQP_REPLY_SUCCESS); amqp_destroy_connection(conn); - throw Exception("amqp", 0, "channel open failed"); + check(ropen, "open channel "); } self.fconnection = conn; @@ -147,43 +195,11 @@ VAmqp& self=GET_SELF(r, VAmqp); #else (void)params; (void)self; throw Exception("amqp", 0, "compiled without amqp support"); -#endif +#endif // WITH_AMQP } #ifdef WITH_AMQP -static void check(amqp_rpc_reply_t rr){ - if(rr.reply_type == AMQP_RESPONSE_NORMAL) - return; - - // Extract error message from reply - const char* error_msg = 0; - size_t error_len = 0; - if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { - if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { - amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded; - if(m->reply_text.len > 0 && m->reply_text.bytes) { - error_msg = (const char*)m->reply_text.bytes; - error_len = m->reply_text.len; - } - } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { - amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded; - if(m->reply_text.len > 0 && m->reply_text.bytes) { - error_msg = (const char*)m->reply_text.bytes; - error_len = m->reply_text.len; - } - } - } - - if(error_msg) { - throw Exception("amqp", 0, "failed: %.*s", (int)error_len, error_msg); - } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { - throw Exception("amqp", 0, "failed: library error %d", rr.library_error); - } else { - throw Exception("amqp", 0, "failed"); - } -} - #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l))) #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l)) @@ -313,8 +329,8 @@ static void _release(Request& r, MethodP static void _ack(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); - const String &tag_s=params.as_string(0, "delivery tag must not be code"); - int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0); + double tag=params.as_double(0, "delivery tag must be number", r); + int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0); if(ret!=AMQP_STATUS_OK) throw Exception("amqp", 0, "ack failed"); } @@ -585,7 +601,7 @@ static void _consume(Request& r, MethodP if(res.reply_type == AMQP_RESPONSE_NORMAL){ VHash &vh=*new VHash; HashStringValue* h=vh.get_hash(); h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len)); - h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag))); + h->put("delivery_tag", new VDouble((double)envelope.delivery_tag)); h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len)); h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len)); @@ -609,7 +625,7 @@ static void _stop_consume(Request& r, Me self.fstop=true; } -#endif +#endif // WITH_AMQP // constructor MAmqp::MAmqp(): Methoded("amqp") { @@ -629,5 +645,5 @@ MAmqp::MAmqp(): Methoded("amqp") { add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1); add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1); add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0); -#endif +#endif // WITH_AMQP }