Annotation of parser3/src/classes/amqp.C, revision 1.1
1.1 ! moko 1: /** @file
! 2: Parser: @b amqp parser class.
! 3:
! 4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
! 5: Authors: Konstantin Morshnev <moko@design.ru>
! 6: */
! 7:
! 8: #include "pa_vmethod_frame.h"
! 9:
! 10: #include "pa_request.h"
! 11: #include "pa_vstring.h"
! 12: #include "pa_vhash.h"
! 13: #include "pa_vbool.h"
! 14: #include "pa_vamqp.h"
! 15:
! 16: #ifdef WITH_AMQP
! 17: #include <amqp.h>
! 18: #include <amqp_tcp_socket.h>
! 19: #include <amqp_framing.h>
! 20: #include <stdlib.h>
! 21: #include <string.h>
! 22: #endif
! 23:
! 24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.1 2025/10/05 00:00:00 moko Exp $" IDENT_PA_VAMQP_H;
! 25:
! 26: class MAmqp: public Methoded {
! 27: public: // VStateless_class
! 28: Value* create_new_value(Pool&) { return new VAmqp(); }
! 29: public:
! 30: MAmqp();
! 31: };
! 32:
! 33: DECLARE_CLASS_VAR(amqp, new MAmqp);
! 34:
! 35: static void _create(Request& r, MethodParams& params) {
! 36: VAmqp& self=GET_SELF(r, VAmqp);
! 37:
! 38: #ifdef WITH_AMQP
! 39: const char* host_c = "localhost";
! 40: int port = 5672;
! 41: const char* user_c = "guest";
! 42: const char* pass_c = "guest";
! 43: const char* vhost_c = "/";
! 44: const char* locale_c = "en_US";
! 45: int heartbeat = 30; // seconds
! 46:
! 47: if(params.count()>0){
! 48: if(HashStringValue* options=params.as_hash(0)){
! 49: for(HashStringValue::Iterator i(*options); i; i.next()){
! 50: String::Body key=i.key();
! 51: Value* value=i.value();
! 52: if(key=="host"){
! 53: host_c=value->as_string().cstr();
! 54: } else if(key=="port"){
! 55: port=r.process(*value).as_int();
! 56: } else if(key=="user"){
! 57: user_c=value->as_string().cstr();
! 58: } else if(key=="password"){
! 59: pass_c=value->as_string().cstr();
! 60: } else if(key=="vhost"){
! 61: vhost_c=value->as_string().cstr();
! 62: } else if(key=="locale"){
! 63: locale_c=value->as_string().cstr();
! 64: } else if(key=="heartbeat"){
! 65: heartbeat=r.process(*value).as_int();
! 66: } else
! 67: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 68: }
! 69: }
! 70: }
! 71:
! 72: amqp_connection_state_t conn = amqp_new_connection();
! 73: amqp_socket_t* socket = amqp_tcp_socket_new(conn);
! 74: if(!socket)
! 75: throw Exception("amqp", 0, "failed to create TCP socket");
! 76: if(amqp_socket_open(socket, host_c, port))
! 77: throw Exception("amqp", 0, "failed to open TCP socket");
! 78:
! 79: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
! 80: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
! 81: amqp_destroy_connection(conn);
! 82: throw Exception("amqp", 0, "login failed");
! 83: }
! 84:
! 85: int channel = 1;
! 86: amqp_channel_open(conn, channel);
! 87: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
! 88: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
! 89: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
! 90: amqp_destroy_connection(conn);
! 91: throw Exception("amqp", 0, "channel open failed");
! 92: }
! 93:
! 94: self.fconnection = conn;
! 95: self.fchannel = channel;
! 96: #else
! 97: (void)params; (void)self;
! 98: throw Exception("amqp", 0, "compiled without amqp support");
! 99: #endif
! 100: }
! 101:
! 102: #ifdef WITH_AMQP
! 103:
! 104: static void check(const char* action, amqp_rpc_reply_t rr){
! 105: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
! 106: throw Exception("amqp", 0, "%s failed", action);
! 107: }
! 108:
! 109: static void _publish(Request& r, MethodParams& params) {
! 110: VAmqp& self=GET_SELF(r, VAmqp);
! 111: const String &msg=params.as_string(0, "msg must be string");
! 112: const char* exchange_c = ""; // default exchange
! 113: const char* routing_key_c = 0;
! 114: bool have_queue_sugar=false;
! 115: bool mandatory=false;
! 116:
! 117: amqp_basic_properties_t props;
! 118: props._flags = 0;
! 119:
! 120: if(params.count()>1){
! 121: if(HashStringValue* options=params.as_hash(1)){
! 122: for(HashStringValue::Iterator i(*options); i; i.next()){
! 123: String::Body key=i.key();
! 124: Value* value=i.value();
! 125: if(key=="exchange"){
! 126: exchange_c=value->as_string().cstr();
! 127: } else if(key=="routing_key"){
! 128: routing_key_c=value->as_string().cstr();
! 129: } else if(key=="queue"){
! 130: routing_key_c=value->as_string().cstr(); have_queue_sugar=true;
! 131: } else if(key=="mandatory"){
! 132: mandatory=r.process(*value).as_bool();
! 133: } else if(key=="properties"){
! 134: // parse message properties
! 135: if(HashStringValue* ph=value->get_hash()){
! 136: for(HashStringValue::Iterator p(*ph); p; p.next()){
! 137: String::Body pkey=p.key();
! 138: Value* pval=p.value();
! 139: if(pkey=="content_type"){
! 140: const char* v=pval->as_string().cstr();
! 141: props.content_type=amqp_cstring_bytes(v);
! 142: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
! 143: } else if(pkey=="content_encoding"){
! 144: const char* v=pval->as_string().cstr();
! 145: props.content_encoding=amqp_cstring_bytes(v);
! 146: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
! 147: } else if(pkey=="delivery_mode"){
! 148: uint8_t dm=(uint8_t)pval->as_int();
! 149: props.delivery_mode=dm;
! 150: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
! 151: } else if(pkey=="priority"){
! 152: uint8_t pr=(uint8_t)pval->as_int();
! 153: props.priority=pr;
! 154: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
! 155: } else if(pkey=="correlation_id"){
! 156: const char* v=pval->as_string().cstr();
! 157: props.correlation_id=amqp_cstring_bytes(v);
! 158: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
! 159: } else if(pkey=="reply_to"){
! 160: const char* v=pval->as_string().cstr();
! 161: props.reply_to=amqp_cstring_bytes(v);
! 162: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
! 163: } else if(pkey=="expiration"){
! 164: const char* v=pval->as_string().cstr();
! 165: props.expiration=amqp_cstring_bytes(v);
! 166: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
! 167: } else if(pkey=="message_id"){
! 168: const char* v=pval->as_string().cstr();
! 169: props.message_id=amqp_cstring_bytes(v);
! 170: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
! 171: } else if(pkey=="timestamp"){
! 172: uint64_t ts=(uint64_t)pval->as_double();
! 173: props.timestamp=ts;
! 174: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
! 175: } else if(pkey=="type"){
! 176: const char* v=pval->as_string().cstr();
! 177: props.type=amqp_cstring_bytes(v);
! 178: props._flags|=AMQP_BASIC_TYPE_FLAG;
! 179: } else if(pkey=="user_id"){
! 180: const char* v=pval->as_string().cstr();
! 181: props.user_id=amqp_cstring_bytes(v);
! 182: props._flags|=AMQP_BASIC_USER_ID_FLAG;
! 183: } else if(pkey=="app_id"){
! 184: const char* v=pval->as_string().cstr();
! 185: props.app_id=amqp_cstring_bytes(v);
! 186: props._flags|=AMQP_BASIC_APP_ID_FLAG;
! 187: } else if(pkey=="headers"){
! 188: /* if(HashStringValue* hh=pval->get_hash()){
! 189: size_t count=hh->count();
! 190: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
! 191: size_t idx=0;
! 192: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
! 193: String::Body hkey=hi.key();
! 194: const char* hv=hi.value()->as_string().cstr();
! 195: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
! 196: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
! 197: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
! 198: idx++;
! 199: }
! 200: props.headers.num_entries=(int)count;
! 201: props.headers.entries=entries;
! 202: props._flags|=AMQP_BASIC_HEADERS_FLAG;
! 203: }
! 204: */ } else
! 205: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 206: }
! 207: }
! 208: } else
! 209: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 210: }
! 211: }
! 212: }
! 213:
! 214: if(!routing_key_c)
! 215: throw Exception("amqp", 0, "routing_key or queue must be specified");
! 216:
! 217: amqp_bytes_t body;
! 218: body.len = msg.length();
! 219: body.bytes=(void*)msg.cstr();
! 220:
! 221: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body);
! 222:
! 223: if(ret!=AMQP_STATUS_OK)
! 224: throw Exception("amqp", 0, "publish failed");
! 225:
! 226: // free temporary headers entries if allocated
! 227: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
! 228: // delete [] props.headers.entries;
! 229: }
! 230: (void)have_queue_sugar;
! 231: }
! 232:
! 233: static void _release(Request& r, MethodParams&) {
! 234: VAmqp& self=GET_SELF(r, VAmqp);
! 235: if(self.fconnection){
! 236: amqp_connection_state_t conn=self.fconnection;
! 237: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
! 238: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
! 239: amqp_destroy_connection(conn);
! 240: self.fconnection=0;
! 241: self.fchannel=0;
! 242: }
! 243: }
! 244:
! 245: static void _ack(Request& r, MethodParams& params) {
! 246: VAmqp& self=GET_SELF(r, VAmqp);
! 247: const String &tag_s=params.as_string(0, "delivery tag must not be code");
! 248: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
! 249: if(ret!=AMQP_STATUS_OK)
! 250: throw Exception("amqp", 0, "ack failed");
! 251: }
! 252:
! 253: static void _nack(Request& r, MethodParams& params) {
! 254: VAmqp& self=GET_SELF(r, VAmqp);
! 255: const String &tag_s=params.as_string(0, "delivery tag must not be code");
! 256: bool requeue=false;
! 257: if(params.count()>1){
! 258: if(HashStringValue* options=params.as_hash(1)){
! 259: for(HashStringValue::Iterator i(*options); i; i.next()){
! 260: if(i.key()=="requeue"){
! 261: requeue=r.process(*i.value()).as_bool();
! 262: } else
! 263: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 264: }
! 265: }
! 266: }
! 267: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
! 268: if(ret!=AMQP_STATUS_OK)
! 269: throw Exception("amqp", 0, "nack failed");
! 270: }
! 271:
! 272: static void _qos(Request& r, MethodParams& params) {
! 273: VAmqp& self=GET_SELF(r, VAmqp);
! 274: uint16_t prefetch_count=0;
! 275: if(params.count()>0){
! 276: if(HashStringValue* options=params.as_hash(0)){
! 277: for(HashStringValue::Iterator i(*options); i; i.next()){
! 278: if(i.key()=="prefetch_count"){
! 279: int pc=r.process(*i.value()).as_int();
! 280: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
! 281: } else
! 282: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 283: }
! 284: }
! 285: }
! 286: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
! 287: if(!ret)
! 288: throw Exception("amqp", 0, "qos failed");
! 289: }
! 290:
! 291: static void _reject(Request& r, MethodParams& params) {
! 292: VAmqp& self=GET_SELF(r, VAmqp);
! 293: const String &tag_s=params.as_string(0, "delivery tag must not be code");
! 294: bool requeue=true; // by default return to queue
! 295: if(params.count()>1){
! 296: if(HashStringValue* options = params.as_hash(1)){
! 297: for(HashStringValue::Iterator i(*options); i; i.next()){
! 298: if(i.key() == "requeue"){
! 299: requeue=r.process(*i.value()).as_bool();
! 300: } else
! 301: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 302: }
! 303: }
! 304: }
! 305: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
! 306: if(ret!=AMQP_STATUS_OK)
! 307: throw Exception("amqp", 0, "reject failed");
! 308: }
! 309:
! 310: static void _declare_exchange(Request& r, MethodParams& params) {
! 311: VAmqp& self=GET_SELF(r, VAmqp);
! 312: const char* name_c = 0;
! 313: const char* type_c = "direct";
! 314: bool passive=false, durable=false, auto_delete=true, nowait=false;
! 315: if(params.count()>0){
! 316: if(HashStringValue* options=params.as_hash(0)){
! 317: for(HashStringValue::Iterator i(*options); i; i.next()){
! 318: String::Body key=i.key();
! 319: Value* value=i.value();
! 320: if(key=="name"){
! 321: name_c=value->as_string().cstr();
! 322: } else if(key=="type"){
! 323: type_c=value->as_string().cstr();
! 324: } else if(key=="passive"){
! 325: passive=r.process(*value).as_bool();
! 326: } else if(key=="durable"){
! 327: durable=r.process(*value).as_bool();
! 328: } else if(key=="auto_delete"){
! 329: auto_delete=r.process(*value).as_bool();
! 330: } else if(key=="nowait"){
! 331: nowait=r.process(*value).as_bool();
! 332: } else
! 333: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 334: }
! 335: }
! 336: }
! 337: if(!name_c)
! 338: throw Exception("amqp", 0, "name is required");
! 339: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
! 340: check("declare exchange", amqp_get_rpc_reply(self.connection()));
! 341: }
! 342:
! 343: static void _delete_exchange(Request& r, MethodParams& params) {
! 344: VAmqp& self=GET_SELF(r, VAmqp);
! 345: const char* name_c = 0;
! 346: bool if_unused=false, nowait=false;
! 347: if(params.count()>0){
! 348: if(HashStringValue* options=params.as_hash(0)){
! 349: for(HashStringValue::Iterator i(*options); i; i.next()){
! 350: String::Body key=i.key();
! 351: Value* value=i.value();
! 352: if(key=="exchange"){
! 353: name_c=value->as_string().cstr();
! 354: } else if(key=="if_unused"){
! 355: if_unused=r.process(*value).as_bool();
! 356: } else if(key=="nowait"){
! 357: nowait=r.process(*value).as_bool();
! 358: } else
! 359: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 360: }
! 361: }
! 362: }
! 363: if(!name_c) throw Exception("amqp", 0, "exchange is required");
! 364: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
! 365: check("delete exchange", amqp_get_rpc_reply(self.connection()));
! 366: }
! 367:
! 368: static void _declare_queue(Request& r, MethodParams& params) {
! 369: VAmqp& self=GET_SELF(r, VAmqp);
! 370: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
! 371: if(params.count()>0){
! 372: if(HashStringValue* options=params.as_hash(0)){
! 373: for(HashStringValue::Iterator i(*options); i; i.next()){
! 374: String::Body key=i.key();
! 375: Value* value=i.value();
! 376: if(key=="queue"){
! 377: queue_c=value->as_string().cstr();
! 378: } else if(key=="passive"){
! 379: passive=r.process(*value).as_bool();
! 380: } else if(key=="durable"){
! 381: durable=r.process(*value).as_bool();
! 382: } else if(key=="auto_delete"){
! 383: auto_delete=r.process(*value).as_bool();
! 384: } else if(key=="nowait"){
! 385: nowait=r.process(*value).as_bool();
! 386: } else
! 387: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 388: }
! 389: }
! 390: }
! 391: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
! 392: check("declare queue", amqp_get_rpc_reply(self.connection()));
! 393: if(!queue_c && ok){
! 394: r.write(*new String(String::C(pa_strdup((const char *)ok->queue.bytes, ok->queue.len), ok->queue.len)));
! 395: }
! 396: }
! 397:
! 398: static void _delete_queue(Request& r, MethodParams& params) {
! 399: VAmqp& self=GET_SELF(r, VAmqp);
! 400: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
! 401: if(params.count()>0){
! 402: if(HashStringValue* options=params.as_hash(0)){
! 403: for(HashStringValue::Iterator i(*options); i; i.next()){
! 404: String::Body key=i.key();
! 405: Value* value=i.value();
! 406: if(key=="queue"){
! 407: queue_c=value->as_string().cstr();
! 408: } else if(key=="if_unused"){
! 409: if_unused=r.process(*value).as_bool();
! 410: } else if(key=="if_empty"){
! 411: if_empty=r.process(*value).as_bool();
! 412: } else if(key=="nowait"){
! 413: nowait=r.process(*value).as_bool();
! 414: } else
! 415: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 416: }
! 417: }
! 418: }
! 419: if(!queue_c) throw Exception("amqp", 0, "queue is required");
! 420: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
! 421: check("delete queue", amqp_get_rpc_reply(self.connection()));
! 422: }
! 423:
! 424: static void _bind_queue(Request& r, MethodParams& params) {
! 425: VAmqp& self=GET_SELF(r, VAmqp);
! 426: const char* exchange_c=0;
! 427: const char* queue_c=0;
! 428: const char* routing_key_c="";
! 429: if(params.count()>0){
! 430: if(HashStringValue* options=params.as_hash(0)){
! 431: for(HashStringValue::Iterator i(*options); i; i.next()){
! 432: String::Body key=i.key();
! 433: Value* value=i.value();
! 434: if(key=="exchange"){
! 435: exchange_c=value->as_string().cstr();
! 436: } else if(key=="queue"){
! 437: queue_c=value->as_string().cstr();
! 438: } else if(key=="routing_key"){
! 439: routing_key_c=value->as_string().cstr();
! 440: } else
! 441: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 442: }
! 443: }
! 444: }
! 445: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
! 446: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
! 447: check("bind queue", amqp_get_rpc_reply(self.connection()));
! 448: }
! 449:
! 450: static void _unbind_queue(Request& r, MethodParams& params) {
! 451: VAmqp& self=GET_SELF(r, VAmqp);
! 452: const char* exchange_c=0;
! 453: const char* queue_c=0;
! 454: const char* routing_key_c="";
! 455: if(params.count()>0){
! 456: if(HashStringValue* options=params.as_hash(0)){
! 457: for(HashStringValue::Iterator i(*options); i; i.next()){
! 458: String::Body key=i.key();
! 459: Value* value=i.value();
! 460: if(key=="exchange"){
! 461: exchange_c=value->as_string().cstr();
! 462: } else if(key=="queue"){
! 463: queue_c=value->as_string().cstr();
! 464: } else if(key=="routing_key"){
! 465: routing_key_c=value->as_string().cstr();
! 466: } else
! 467: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 468: }
! 469: }
! 470: }
! 471: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
! 472: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
! 473: check("unbind queue", amqp_get_rpc_reply(self.connection()));
! 474: }
! 475:
! 476: static void _consume(Request& r, MethodParams& params) {
! 477: VAmqp& self=GET_SELF(r, VAmqp);
! 478: const char* queue_c=0;
! 479: const char* consumer_tag_c=0;
! 480: bool no_ack=true, nowait=false;
! 481: Junction* callback=0;
! 482:
! 483: if(params.count()>0){
! 484: if(HashStringValue* options=params.as_hash(0)){
! 485: for(HashStringValue::Iterator i(*options); i; i.next()){
! 486: String::Body key=i.key();
! 487: Value* value=i.value();
! 488: if(key=="queue"){
! 489: queue_c=value->as_string().cstr();
! 490: } else if(key=="consumer_tag"){
! 491: consumer_tag_c=value->as_string().cstr();
! 492: } else if(key=="no_ack"){
! 493: no_ack=r.process(*value).as_bool();
! 494: } else if(key=="nowait"){
! 495: nowait=r.process(*value).as_bool();
! 496: } else if(key=="callback"){
! 497: callback=value->get_junction();
! 498: } else
! 499: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 500: }
! 501: }
! 502: }
! 503:
! 504: if(!queue_c) throw Exception("amqp", 0, "queue is required");
! 505: if(!callback) throw Exception("amqp", 0, "callback is required");
! 506:
! 507: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
! 508: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
! 509: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
! 510: amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection());
! 511: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
! 512: throw Exception("amqp", 0, "consume failed");
! 513:
! 514: self.fstop=false;
! 515: while(!self.fstop){
! 516: amqp_envelope_t envelope; amqp_maybe_release_buffers(self.connection());
! 517: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
! 518: if(res.reply_type == AMQP_RESPONSE_NORMAL){
! 519: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
! 520: String msg(String::Body((const unsigned char*)envelope.message.body.bytes, envelope.message.body.len), String::L_CLEAN);
! 521: h->put("msg", new VString(msg));
! 522: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
! 523: h->put("consumer_tag", new VString(String::Body((const unsigned char*)envelope.consumer_tag.bytes, envelope.consumer_tag.len)));
! 524: h->put("exchange", new VString(String::Body((const unsigned char*)envelope.exchange.bytes, envelope.exchange.len)));
! 525:
! 526: Value *params_cb[]={&vh};
! 527: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
! 528: frame.store_params(params_cb, 1);
! 529: r.call(frame);
! 530: });
! 531:
! 532: amqp_destroy_envelope(&envelope);
! 533: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
! 534: continue;
! 535: } else {
! 536: break;
! 537: }
! 538: }
! 539: }
! 540:
! 541: static void _stop_consume(Request& r, MethodParams&) {
! 542: VAmqp& self=GET_SELF(r, VAmqp);
! 543: self.fstop=true;
! 544: }
! 545:
! 546: #endif
! 547:
! 548: // constructor
! 549: MAmqp::MAmqp(): Methoded("amqp") {
! 550: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
! 551: #ifdef WITH_AMQP
! 552: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
! 553: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
! 554: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
! 555: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
! 556: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
! 557: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
! 558: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
! 559: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
! 560: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
! 561: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
! 562: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
! 563: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
! 564: add_native_method("consume", Method::CT_DYNAMIC, _consume, 0, 1);
! 565: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
! 566: #endif
! 567: }
! 568:
! 569:
E-mail: