--- parser3/src/classes/amqp.C 2025/11/22 15:36:40 1.7 +++ parser3/src/classes/amqp.C 2026/01/08 15:45:25 1.8 @@ -10,6 +10,7 @@ #include "pa_request.h" #include "pa_vstring.h" #include "pa_vhash.h" +#include "pa_varray.h" #include "pa_vbool.h" #include "pa_vamqp.h" @@ -22,7 +23,7 @@ #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.7 2025/11/22 15:36:40 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.8 2026/01/08 15:45:25 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -392,139 +393,105 @@ static void _reject(Request& r, MethodPa throw Exception("amqp", 0, "reject failed"); } -static void _declare_exchange(Request& r, MethodParams& params) { +static void _declare(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); - const char* name_c = 0; + const char* exchange_c = 0; + const char* queue_c = 0; const char* type_c = "direct"; bool passive=false, durable=false, auto_delete=true, nowait=false; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="name"){ - name_c=value->as_string().cstr(); - } else if(key=="type"){ - type_c=value->as_string().cstr(); - } else if(key=="passive"){ - passive=r.process(*value).as_bool(); - } else if(key=="durable"){ - durable=r.process(*value).as_bool(); - } else if(key=="auto_delete"){ - auto_delete=r.process(*value).as_bool(); - } else if(key=="nowait"){ - nowait=r.process(*value).as_bool(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="exchange"){ + exchange_c=value->as_string().cstr(); + } else if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else if(key=="type"){ + type_c=value->as_string().cstr(); + } else if(key=="passive"){ + passive=r.process(*value).as_bool(); + } else if(key=="durable"){ + durable=r.process(*value).as_bool(); + } else if(key=="auto_delete"){ + auto_delete=r.process(*value).as_bool(); + } else if(key=="nowait"){ + nowait=r.process(*value).as_bool(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } - if(!name_c) - throw Exception("amqp", 0, "name is required"); - amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table); - check(amqp_get_rpc_reply(self.connection())); -} + if(!exchange_c && !queue_c) + throw Exception("amqp", 0, "exchange or queue must be specified"); -static void _delete_exchange(Request& r, MethodParams& params) { - VAmqp& self=GET_SELF(r, VAmqp); - const char* name_c = 0; - bool if_unused=false, nowait=false; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="exchange"){ - name_c=value->as_string().cstr(); - } else if(key=="if_unused"){ - if_unused=r.process(*value).as_bool(); - } else if(key=="nowait"){ - nowait=r.process(*value).as_bool(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + if(exchange_c){ + amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table); + check(amqp_get_rpc_reply(self.connection())); + } + + if(queue_c){ + amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), *queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table); + check(amqp_get_rpc_reply(self.connection())); + if(!*queue_c && ok){ + r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len)); } } - if(!name_c) throw Exception("amqp", 0, "exchange is required"); - amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused); - check(amqp_get_rpc_reply(self.connection())); } -static void _declare_queue(Request& r, MethodParams& params) { +static void _delete(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); - const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="queue"){ - queue_c=value->as_string().cstr(); - } else if(key=="passive"){ - passive=r.process(*value).as_bool(); - } else if(key=="durable"){ - durable=r.process(*value).as_bool(); - } else if(key=="auto_delete"){ - auto_delete=r.process(*value).as_bool(); - } else if(key=="nowait"){ - nowait=r.process(*value).as_bool(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + const char* exchange_c = 0; + const char* queue_c = 0; + bool if_unused=false, if_empty=false, nowait=false; + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="exchange"){ + exchange_c=value->as_string().cstr(); + } else if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else if(key=="if_unused"){ + if_unused=r.process(*value).as_bool(); + } else if(key=="if_empty"){ + if_empty=r.process(*value).as_bool(); + } else if(key=="nowait"){ + nowait=r.process(*value).as_bool(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } - amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table); - check(amqp_get_rpc_reply(self.connection())); - if(!queue_c && ok){ - r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len)); + if(!exchange_c && !queue_c) + throw Exception("amqp", 0, "exchange or queue must be specified"); + + if(exchange_c){ + amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), if_unused); + check(amqp_get_rpc_reply(self.connection())); } -} -static void _delete_queue(Request& r, MethodParams& params) { - VAmqp& self=GET_SELF(r, VAmqp); - const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="queue"){ - queue_c=value->as_string().cstr(); - } else if(key=="if_unused"){ - if_unused=r.process(*value).as_bool(); - } else if(key=="if_empty"){ - if_empty=r.process(*value).as_bool(); - } else if(key=="nowait"){ - nowait=r.process(*value).as_bool(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } - } + if(queue_c){ + amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty); + check(amqp_get_rpc_reply(self.connection())); } - if(!queue_c) throw Exception("amqp", 0, "queue is required"); - amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty); - check(amqp_get_rpc_reply(self.connection())); } -static void _bind_queue(Request& r, MethodParams& params) { +static void _bind(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); const char* exchange_c=0; const char* queue_c=0; const char* routing_key_c=""; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="exchange"){ - exchange_c=value->as_string().cstr(); - } else if(key=="queue"){ - queue_c=value->as_string().cstr(); - } else if(key=="routing_key"){ - routing_key_c=value->as_string().cstr(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="exchange"){ + exchange_c=value->as_string().cstr(); + } else if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else if(key=="routing_key"){ + routing_key_c=value->as_string().cstr(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); @@ -532,25 +499,23 @@ static void _bind_queue(Request& r, Meth check(amqp_get_rpc_reply(self.connection())); } -static void _unbind_queue(Request& r, MethodParams& params) { +static void _unbind(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); const char* exchange_c=0; const char* queue_c=0; const char* routing_key_c=""; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="exchange"){ - exchange_c=value->as_string().cstr(); - } else if(key=="queue"){ - queue_c=value->as_string().cstr(); - } else if(key=="routing_key"){ - routing_key_c=value->as_string().cstr(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="exchange"){ + exchange_c=value->as_string().cstr(); + } else if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else if(key=="routing_key"){ + routing_key_c=value->as_string().cstr(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required"); @@ -558,11 +523,71 @@ static void _unbind_queue(Request& r, Me check(amqp_get_rpc_reply(self.connection())); } +static void _purge(Request& r, MethodParams& params) { + VAmqp& self=GET_SELF(r, VAmqp); + const char* queue_c = 0; + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } + } + if(!queue_c) + throw Exception("amqp", 0, "queue must be specified"); + + amqp_queue_purge(self.connection(), self.channel(), amqp_cstring_bytes(queue_c)); + check(amqp_get_rpc_reply(self.connection())); +} + +static void _info(Request& r, MethodParams& params) { + VAmqp& self = GET_SELF(r, VAmqp); + const char* queue_c = 0; + + if (HashStringValue* options=params.as_hash(0)) { + for (HashStringValue::Iterator i(*options); i; i.next()) { + String::Body key=i.key(); + Value* value=i.value(); + if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else { + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } + } + } + + if (!queue_c) + throw Exception("amqp", 0, "queue must be specified"); + + amqp_queue_declare_ok_t* ok = amqp_queue_declare(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), /*passive*/ 1, 0, 0, 0, amqp_empty_table); + check(amqp_get_rpc_reply(self.connection())); + + Value& result=*new VHash; + result.put_element(*new String("queue"), AMQP_VSTRING(ok->queue.bytes, ok->queue.len)); + result.put_element(*new String("messages"), new VInt(ok->message_count)); + result.put_element(*new String("consumers"), new VInt(ok->consumer_count)); + r.write(result); +} + +static VHash *amqp_message_hash(amqp_envelope_t &envelope) { + VHash *result=new VHash; + HashStringValue* h=result->get_hash(); + h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len)); + h->put("delivery_tag", new VInt(envelope.delivery_tag)); + h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len)); + h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len)); + return result; +} + static void _consume(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); const char* queue_c=0; const char* consumer_tag_c=0; bool no_ack=true, nowait=false; + int count=1; Junction* callback=0; if(HashStringValue* options=params.as_hash(0)){ @@ -579,44 +604,58 @@ static void _consume(Request& r, MethodP no_ack=r.process(*value).as_bool(); } else if(key=="nowait"){ nowait=r.process(*value).as_bool(); + } else if(key=="count"){ + count=r.process(*value).as_int(); } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } - if(!queue_c) throw Exception("amqp", 0, "queue is required"); - if(!callback) throw Exception("amqp", 0, "callback is required"); + if(!queue_c) throw Exception("amqp", 0, "queue must be specified"); amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes, 0 /*no_local*/, no_ack, nowait, amqp_empty_table); check(amqp_get_rpc_reply(self.connection())); - self.fstop=false; - while(!self.fstop){ - amqp_envelope_t envelope; - memset(&envelope, 0, sizeof(envelope)); - amqp_maybe_release_buffers(self.connection()); - amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0); - if(res.reply_type == AMQP_RESPONSE_NORMAL){ - VHash &vh=*new VHash; HashStringValue* h=vh.get_hash(); - h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len)); - h->put("delivery_tag", new VDouble((double)envelope.delivery_tag)); - h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len)); - h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len)); - - Value *params_cb[]={&vh}; - METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, { - frame.store_params(params_cb, 1); - r.call(frame); - }); - - amqp_destroy_envelope(&envelope); - } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { - continue; - } else { - break; + if(callback){ + self.fstop=false; + while(!self.fstop){ + amqp_envelope_t envelope; + memset(&envelope, 0, sizeof(envelope)); + amqp_maybe_release_buffers(self.connection()); + amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0); + if(res.reply_type == AMQP_RESPONSE_NORMAL){ + VHash *vh=amqp_message_hash(envelope); + Value *params_cb[]={vh}; + METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, { + frame.store_params(params_cb, 1); + r.call(frame); + }); + amqp_destroy_envelope(&envelope); + } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) { + continue; + } else { + break; + } + } + } else { + VArray& result=*new VArray(); + ArrayValue& result_array=result.array(); + + for(int i=0; i