--- parser3/src/classes/amqp.C 2025/11/07 22:36:35 1.3 +++ parser3/src/classes/amqp.C 2025/11/07 23:00:11 1.4 @@ -21,7 +21,7 @@ #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.3 2025/11/07 22:36:35 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.4 2025/11/07 23:00:11 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -101,9 +101,36 @@ VAmqp& self=GET_SELF(r, VAmqp); #ifdef WITH_AMQP -static void check(const char* action, amqp_rpc_reply_t rr){ - if(rr.reply_type != AMQP_RESPONSE_NORMAL) - throw Exception("amqp", 0, "%s failed", action); +static void check(amqp_rpc_reply_t rr){ + if(rr.reply_type == AMQP_RESPONSE_NORMAL) + return; + + // Extract error message from reply + const char* error_msg = 0; + size_t error_len = 0; + if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) { + if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) { + amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) { + amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded; + if(m->reply_text.len > 0 && m->reply_text.bytes) { + error_msg = (const char*)m->reply_text.bytes; + error_len = m->reply_text.len; + } + } + } + + if(error_msg) { + throw Exception("amqp", 0, "failed: %.*s", (int)error_len, error_msg); + } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) { + throw Exception("amqp", 0, "failed: library error %d", rr.library_error); + } else { + throw Exception("amqp", 0, "failed"); + } } #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l))) @@ -328,7 +355,7 @@ static void _declare_exchange(Request& r if(!name_c) throw Exception("amqp", 0, "name is required"); amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table); - check("declare exchange", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _delete_exchange(Request& r, MethodParams& params) { @@ -353,7 +380,7 @@ static void _delete_exchange(Request& r, } if(!name_c) throw Exception("amqp", 0, "exchange is required"); amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused); - check("delete exchange", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _declare_queue(Request& r, MethodParams& params) { @@ -380,7 +407,7 @@ static void _declare_queue(Request& r, M } } amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table); - check("declare queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); if(!queue_c && ok){ r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len)); } @@ -409,7 +436,7 @@ static void _delete_queue(Request& r, Me } if(!queue_c) throw Exception("amqp", 0, "queue is required"); amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty); - check("delete queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _bind_queue(Request& r, MethodParams& params) { @@ -435,7 +462,7 @@ static void _bind_queue(Request& r, Meth } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table); - check("bind queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _unbind_queue(Request& r, MethodParams& params) { @@ -461,7 +488,7 @@ static void _unbind_queue(Request& r, Me } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table); - check("unbind queue", amqp_get_rpc_reply(self.connection())); + check(amqp_get_rpc_reply(self.connection())); } static void _consume(Request& r, MethodParams& params) { @@ -496,9 +523,7 @@ static void _consume(Request& r, MethodP amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes, 0 /*no_local*/, no_ack, nowait, amqp_empty_table); - amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection()); - if(rr.reply_type != AMQP_RESPONSE_NORMAL) - throw Exception("amqp", 0, "consume failed"); + check(amqp_get_rpc_reply(self.connection())); self.fstop=false; while(!self.fstop){