Annotation of parser3/src/classes/amqp.C, revision 1.12

1.1       moko        1: /** @file
                      2:        Parser: @b amqp parser class.
                      3: 
                      4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
                      5:        Authors: Konstantin Morshnev <moko@design.ru>
                      6: */
                      7: 
                      8: #include "pa_vmethod_frame.h"
                      9: 
                     10: #include "pa_request.h"
                     11: #include "pa_vstring.h"
                     12: #include "pa_vhash.h"
1.8       moko       13: #include "pa_varray.h"
1.1       moko       14: #include "pa_vbool.h"
                     15: #include "pa_vamqp.h"
                     16: 
                     17: #ifdef WITH_AMQP
                     18: #include <amqp.h>
                     19: #include <amqp_tcp_socket.h>
1.5       moko       20: #include <amqp_ssl_socket.h>
1.1       moko       21: #include <amqp_framing.h>
                     22: #include <stdlib.h>
                     23: #include <string.h>
                     24: #endif
                     25: 
1.12    ! moko       26: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.11 2026/01/09 03:30:39 moko Exp $" IDENT_PA_VAMQP_H;
1.1       moko       27: 
                     28: class MAmqp: public Methoded {
                     29: public: // VStateless_class
                     30:        Value* create_new_value(Pool&) { return new VAmqp(); }
                     31: public:
                     32:        MAmqp();
                     33: };
                     34: 
                     35: DECLARE_CLASS_VAR(amqp, new MAmqp);
                     36: 
1.6       moko       37: #ifdef WITH_AMQP
                     38: 
                     39: static void status_check(int ret, const char *detail=""){
                     40:        if(ret == AMQP_STATUS_OK)
                     41:                return;
                     42: 
                     43:        const char* error_str = amqp_error_string2(ret);
                     44:        if(error_str) {
                     45:                throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
                     46:        } else {
                     47:                throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
                     48:        }
                     49: }
                     50: 
                     51: static void check(amqp_rpc_reply_t rr, const char *detail=""){
                     52:        if(rr.reply_type == AMQP_RESPONSE_NORMAL)
                     53:                return;
                     54: 
                     55:        // Extract error message from reply
                     56:        const char* error_msg = 0;
                     57:        size_t error_len = 0;
                     58:        if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
                     59:                if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
                     60:                        amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
                     61:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     62:                                error_msg = (const char*)m->reply_text.bytes;
                     63:                                error_len = m->reply_text.len;
                     64:                        }
                     65:                } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                     66:                        amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
                     67:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     68:                                error_msg = (const char*)m->reply_text.bytes;
                     69:                                error_len = m->reply_text.len;
                     70:                        }
                     71:                }
                     72:        }
                     73:        
                     74:        if(error_msg) {
                     75:                throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
                     76:        } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
                     77:                status_check(rr.library_error, detail);
                     78:        }
                     79: 
                     80:        throw Exception("amqp", 0, "%sfailed", detail);
                     81: }
                     82: 
                     83: #endif // WITH_AMQP
                     84: 
                     85: 
1.1       moko       86: static void _create(Request& r, MethodParams& params) {
                     87: VAmqp& self=GET_SELF(r, VAmqp);
                     88: 
                     89: #ifdef WITH_AMQP
                     90:        const char* host_c = "localhost";
                     91:        int port = 5672;
                     92:        const char* user_c = "guest";
                     93:        const char* pass_c = "guest";
                     94:        const char* vhost_c = "/";
                     95:        const char* locale_c = "en_US";
                     96:        int heartbeat = 30; // seconds
1.5       moko       97:        const char* tls_ca = 0;
                     98:        const char* tls_cert = 0;
                     99:        const char* tls_key = 0;
                    100:        bool tls_specified = false;
                    101:        bool tls_verify = true;
1.1       moko      102: 
                    103:        if(params.count()>0){
                    104:                if(HashStringValue* options=params.as_hash(0)){
                    105:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    106:                                String::Body key=i.key();
                    107:                                Value* value=i.value();
                    108:                                if(key=="host"){
                    109:                                        host_c=value->as_string().cstr();
                    110:                                } else if(key=="port"){
                    111:                                        port=r.process(*value).as_int();
                    112:                                } else if(key=="user"){
                    113:                                        user_c=value->as_string().cstr();
                    114:                                } else if(key=="password"){
                    115:                                        pass_c=value->as_string().cstr();
                    116:                                } else if(key=="vhost"){
                    117:                                        vhost_c=value->as_string().cstr();
                    118:                                } else if(key=="locale"){
                    119:                                        locale_c=value->as_string().cstr();
                    120:                                } else if(key=="heartbeat"){
                    121:                                        heartbeat=r.process(*value).as_int();
1.5       moko      122:                                } else if(key=="tls"){
                    123:                                        tls_specified = true;
                    124:                                        if(HashStringValue* tls_options=value->get_hash()){
                    125:                                                for(HashStringValue::Iterator t(*tls_options); t; t.next()){
                    126:                                                        String::Body tkey=t.key();
                    127:                                                        Value* tval=t.value();
                    128:                                                        if(tkey=="ca"){
                    129:                                                                tls_ca=tval->as_string().cstr();
                    130:                                                        } else if(tkey=="cert"){
                    131:                                                                tls_cert=tval->as_string().cstr();
                    132:                                                        } else if(tkey=="key"){
                    133:                                                                tls_key=tval->as_string().cstr();
                    134:                                                        } else if(tkey=="verify"){
                    135:                                                                tls_verify=r.process(*tval).as_bool();
                    136:                                                        } else
                    137:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    138:                                                }
                    139:                                        }
1.1       moko      140:                                } else
                    141:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    142:                        }
                    143:                }
                    144:        }
                    145: 
                    146:        amqp_connection_state_t conn = amqp_new_connection();
1.5       moko      147:        amqp_socket_t* socket = 0;
                    148:        
                    149:        if(tls_specified) {
                    150:                socket = amqp_ssl_socket_new(conn);
                    151:                if(!socket)
                    152:                        throw Exception("amqp", 0, "failed to create SSL socket");
                    153:                
                    154:                // Set CA certificate if provided
                    155:                if(tls_ca)
                    156:                        if(amqp_ssl_socket_set_cacert(socket, tls_ca))
                    157:                                throw Exception("amqp", 0, "failed to set CA certificate");
1.6       moko      158: 
1.5       moko      159:                // Set client certificate and key if provided
                    160:                if(tls_cert && tls_key) {
                    161:                        if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
                    162:                                throw Exception("amqp", 0, "failed to set client certificate/key");
                    163:                } else if(tls_cert || tls_key) {
                    164:                        throw Exception("amqp", 0, "both cert and key must be specified for TLS");
                    165:                }
1.6       moko      166: 
1.5       moko      167:                // If CA is provided, peer verification will use it
                    168:                amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
                    169:                // If verify=true, enable hostname verification
                    170:                amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
                    171:        } else {
                    172:                socket = amqp_tcp_socket_new(conn);
                    173:                if(!socket)
                    174:                        throw Exception("amqp", 0, "failed to create TCP socket");
                    175:        }
                    176:        
1.6       moko      177:        status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1       moko      178: 
                    179:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
                    180:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
                    181:                amqp_destroy_connection(conn);
1.6       moko      182:                check(rlogin, "login ");
1.1       moko      183:        }
                    184: 
                    185:        int channel = 1;
                    186:        amqp_channel_open(conn, channel);
                    187:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
                    188:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
                    189:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    190:                amqp_destroy_connection(conn);
1.6       moko      191:                check(ropen, "open channel ");
1.1       moko      192:        }
                    193: 
                    194:        self.fconnection = conn;
                    195:        self.fchannel = channel;
                    196: #else
                    197:        (void)params; (void)self;
                    198:        throw Exception("amqp", 0, "compiled without amqp support");
1.6       moko      199: #endif // WITH_AMQP
1.1       moko      200: }
                    201: 
                    202: #ifdef WITH_AMQP
                    203: 
1.2       moko      204: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
                    205: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
                    206: 
1.1       moko      207: static void _publish(Request& r, MethodParams& params) {
                    208:        VAmqp& self=GET_SELF(r, VAmqp);
                    209:        const String &msg=params.as_string(0, "msg must be string");
                    210:        const char* exchange_c = ""; // default exchange
                    211:        const char* routing_key_c = 0;
                    212:        bool mandatory=false;
                    213: 
                    214:        amqp_basic_properties_t props;
                    215:        props._flags = 0;
                    216: 
                    217:        if(params.count()>1){
                    218:                if(HashStringValue* options=params.as_hash(1)){
                    219:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    220:                                String::Body key=i.key();
                    221:                                Value* value=i.value();
                    222:                                if(key=="exchange"){
                    223:                                        exchange_c=value->as_string().cstr();
                    224:                                } else if(key=="routing_key"){
                    225:                                        routing_key_c=value->as_string().cstr();
                    226:                                } else if(key=="queue"){
1.3       moko      227:                                        routing_key_c=value->as_string().cstr();
1.1       moko      228:                                } else if(key=="mandatory"){
                    229:                                        mandatory=r.process(*value).as_bool();
1.3       moko      230:                                } else if(key=="content_type"){
                    231:                                        const char* v=value->as_string().cstr();
                    232:                                        props.content_type=amqp_cstring_bytes(v);
                    233:                                        props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
                    234:                                } else if(key=="content_encoding"){
                    235:                                        const char* v=value->as_string().cstr();
                    236:                                        props.content_encoding=amqp_cstring_bytes(v);
                    237:                                        props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
                    238:                                } else if(key=="delivery_mode"){
                    239:                                        uint8_t dm=(uint8_t)value->as_int();
                    240:                                        props.delivery_mode=dm;
                    241:                                        props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
                    242:                                } else if(key=="priority"){
                    243:                                        uint8_t pr=(uint8_t)value->as_int();
                    244:                                        props.priority=pr;
                    245:                                        props._flags|=AMQP_BASIC_PRIORITY_FLAG;
                    246:                                } else if(key=="correlation_id"){
                    247:                                        const char* v=value->as_string().cstr();
                    248:                                        props.correlation_id=amqp_cstring_bytes(v);
                    249:                                        props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
                    250:                                } else if(key=="reply_to"){
                    251:                                        const char* v=value->as_string().cstr();
                    252:                                        props.reply_to=amqp_cstring_bytes(v);
                    253:                                        props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
                    254:                                } else if(key=="expiration"){
                    255:                                        const char* v=value->as_string().cstr();
                    256:                                        props.expiration=amqp_cstring_bytes(v);
                    257:                                        props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
                    258:                                } else if(key=="message_id"){
                    259:                                        const char* v=value->as_string().cstr();
                    260:                                        props.message_id=amqp_cstring_bytes(v);
                    261:                                        props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
                    262:                                } else if(key=="timestamp"){
                    263:                                        uint64_t ts=(uint64_t)value->as_double();
                    264:                                        props.timestamp=ts;
                    265:                                        props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
                    266:                                } else if(key=="type"){
                    267:                                        const char* v=value->as_string().cstr();
                    268:                                        props.type=amqp_cstring_bytes(v);
                    269:                                        props._flags|=AMQP_BASIC_TYPE_FLAG;
                    270:                                } else if(key=="user_id"){
                    271:                                        const char* v=value->as_string().cstr();
                    272:                                        props.user_id=amqp_cstring_bytes(v);
                    273:                                        props._flags|=AMQP_BASIC_USER_ID_FLAG;
                    274:                                } else if(key=="app_id"){
                    275:                                        const char* v=value->as_string().cstr();
                    276:                                        props.app_id=amqp_cstring_bytes(v);
                    277:                                        props._flags|=AMQP_BASIC_APP_ID_FLAG;
                    278:                                } else if(key=="headers"){
                    279: /*                                     if(HashStringValue* hh=pval->get_hash()){
                    280:                                                size_t count=hh->count();
                    281:                                                amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
                    282:                                                size_t idx=0;
                    283:                                                for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
                    284:                                                        String::Body hkey=hi.key();
                    285:                                                        const char* hv=hi.value()->as_string().cstr();
                    286:                                                        entries[idx].key=amqp_cstring_bytes(hkey.cstr());
                    287:                                                        entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
                    288:                                                        entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
                    289:                                                        idx++;
1.1       moko      290:                                                }
1.3       moko      291:                                                props.headers.num_entries=(int)count;
                    292:                                                props.headers.entries=entries;
                    293:                                                props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1       moko      294:                                        }
1.3       moko      295: */                             } else
1.1       moko      296:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    297:                        }
                    298:                }
                    299:        }
                    300: 
                    301:        if(!routing_key_c)
                    302:                throw Exception("amqp", 0, "routing_key or queue must be specified");
                    303: 
                    304:        amqp_bytes_t body;
                    305:        body.len = msg.length();
                    306:        body.bytes=(void*)msg.cstr();
                    307: 
1.3       moko      308:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1       moko      309: 
                    310:        if(ret!=AMQP_STATUS_OK)
                    311:                throw Exception("amqp", 0, "publish failed");
                    312: 
                    313:        // free temporary headers entries if allocated
                    314:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
                    315: //             delete [] props.headers.entries;
                    316:        }
                    317: }
                    318: 
                    319: static void _release(Request& r, MethodParams&) {
                    320:        VAmqp& self=GET_SELF(r, VAmqp);
                    321:        if(self.fconnection){
                    322:                amqp_connection_state_t conn=self.fconnection;
                    323:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
                    324:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    325:                amqp_destroy_connection(conn);
                    326:                self.fconnection=0;
                    327:                self.fchannel=0;
                    328:        }
                    329: }
                    330: 
                    331: static void _ack(Request& r, MethodParams& params) {
                    332:        VAmqp& self=GET_SELF(r, VAmqp);
1.7       moko      333:        double tag=params.as_double(0, "delivery tag must be number", r);
                    334:        int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0);
1.1       moko      335:        if(ret!=AMQP_STATUS_OK)
                    336:                throw Exception("amqp", 0, "ack failed");
                    337: }
                    338: 
                    339: static void _nack(Request& r, MethodParams& params) {
                    340:        VAmqp& self=GET_SELF(r, VAmqp);
                    341:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    342:        bool requeue=false;
                    343:        if(params.count()>1){
                    344:                if(HashStringValue* options=params.as_hash(1)){
                    345:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    346:                                if(i.key()=="requeue"){
                    347:                                        requeue=r.process(*i.value()).as_bool();
                    348:                                } else
                    349:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    350:                        }
                    351:                }
                    352:        }
                    353:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
                    354:        if(ret!=AMQP_STATUS_OK)
                    355:                throw Exception("amqp", 0, "nack failed");
                    356: }
                    357: 
                    358: static void _qos(Request& r, MethodParams& params) {
                    359:        VAmqp& self=GET_SELF(r, VAmqp);
                    360:        uint16_t prefetch_count=0;
                    361:        if(params.count()>0){
                    362:                if(HashStringValue* options=params.as_hash(0)){
                    363:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    364:                                if(i.key()=="prefetch_count"){
                    365:                                        int pc=r.process(*i.value()).as_int();
                    366:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
                    367:                                } else
                    368:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    369:                        }
                    370:                }
                    371:        }
                    372:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
                    373:        if(!ret)
                    374:                throw Exception("amqp", 0, "qos failed");
                    375: }
                    376: 
                    377: static void _reject(Request& r, MethodParams& params) {
                    378:        VAmqp& self=GET_SELF(r, VAmqp);
                    379:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    380:        bool requeue=true; // by default return to queue
                    381:        if(params.count()>1){
                    382:                if(HashStringValue* options = params.as_hash(1)){
                    383:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    384:                                if(i.key() == "requeue"){
                    385:                                        requeue=r.process(*i.value()).as_bool();
                    386:                                } else
                    387:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    388:                        }
                    389:                }
                    390:        }
                    391:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
                    392:        if(ret!=AMQP_STATUS_OK)
                    393:                throw Exception("amqp", 0, "reject failed");
                    394: }
                    395: 
1.8       moko      396: static void _declare(Request& r, MethodParams& params) {
1.1       moko      397:        VAmqp& self=GET_SELF(r, VAmqp);
1.8       moko      398:        const char* exchange_c = 0;
                    399:        const char* queue_c = 0;
1.1       moko      400:        const char* type_c = "direct";
1.12    ! moko      401:        bool passive=false, durable=false, auto_delete=false, internal=false, exclusive=false;
1.8       moko      402:        if(HashStringValue* options=params.as_hash(0)){
                    403:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    404:                        String::Body key=i.key();
                    405:                        Value* value=i.value();
                    406:                        if(key=="exchange"){
                    407:                                exchange_c=value->as_string().cstr();
                    408:                        } else if(key=="queue"){
                    409:                                queue_c=value->as_string().cstr();
                    410:                        } else if(key=="type"){
                    411:                                type_c=value->as_string().cstr();
                    412:                        } else if(key=="passive"){
                    413:                                passive=r.process(*value).as_bool();
                    414:                        } else if(key=="durable"){
                    415:                                durable=r.process(*value).as_bool();
                    416:                        } else if(key=="auto_delete"){
                    417:                                auto_delete=r.process(*value).as_bool();
1.11      moko      418:                        } else if(key=="internal"){
                    419:                                internal=r.process(*value).as_bool();
                    420:                        } else if(key=="exclusive"){
                    421:                                exclusive=r.process(*value).as_bool();
1.8       moko      422:                        } else
                    423:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      424:                }
                    425:        }
1.8       moko      426:        if(!exchange_c && !queue_c)
                    427:                throw Exception("amqp", 0, "exchange or queue must be specified");
                    428: 
                    429:        if(exchange_c){
1.11      moko      430:                amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, internal, amqp_empty_table);
1.8       moko      431:                check(amqp_get_rpc_reply(self.connection()));
                    432:        }
1.1       moko      433: 
1.8       moko      434:        if(queue_c){
1.11      moko      435:                amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), *queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, exclusive, auto_delete, amqp_empty_table);
1.8       moko      436:                check(amqp_get_rpc_reply(self.connection()));
                    437:                if(!*queue_c && ok){
                    438:                        r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1       moko      439:                }
                    440:        }
                    441: }
                    442: 
1.8       moko      443: static void _delete(Request& r, MethodParams& params) {
1.1       moko      444:        VAmqp& self=GET_SELF(r, VAmqp);
1.8       moko      445:        const char* exchange_c = 0;
                    446:        const char* queue_c = 0;
1.11      moko      447:        bool if_unused=false, if_empty=false;
1.8       moko      448:        if(HashStringValue* options=params.as_hash(0)){
                    449:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    450:                        String::Body key=i.key();
                    451:                        Value* value=i.value();
                    452:                        if(key=="exchange"){
                    453:                                exchange_c=value->as_string().cstr();
                    454:                        } else if(key=="queue"){
                    455:                                queue_c=value->as_string().cstr();
                    456:                        } else if(key=="if_unused"){
                    457:                                if_unused=r.process(*value).as_bool();
                    458:                        } else if(key=="if_empty"){
                    459:                                if_empty=r.process(*value).as_bool();
                    460:                        } else
                    461:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      462:                }
                    463:        }
1.8       moko      464:        if(!exchange_c && !queue_c)
                    465:                throw Exception("amqp", 0, "exchange or queue must be specified");
                    466: 
                    467:        if(exchange_c){
                    468:                amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), if_unused);
                    469:                check(amqp_get_rpc_reply(self.connection()));
1.1       moko      470:        }
                    471: 
1.8       moko      472:        if(queue_c){
                    473:                amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
                    474:                check(amqp_get_rpc_reply(self.connection()));
1.1       moko      475:        }
                    476: }
                    477: 
1.8       moko      478: static void _bind(Request& r, MethodParams& params) {
1.1       moko      479:        VAmqp& self=GET_SELF(r, VAmqp);
                    480:        const char* exchange_c=0;
                    481:        const char* queue_c=0;
                    482:        const char* routing_key_c="";
1.8       moko      483:        if(HashStringValue* options=params.as_hash(0)){
                    484:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    485:                        String::Body key=i.key();
                    486:                        Value* value=i.value();
                    487:                        if(key=="exchange"){
                    488:                                exchange_c=value->as_string().cstr();
                    489:                        } else if(key=="queue"){
                    490:                                queue_c=value->as_string().cstr();
                    491:                        } else if(key=="routing_key"){
                    492:                                routing_key_c=value->as_string().cstr();
                    493:                        } else
                    494:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      495:                }
                    496:        }
                    497:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    498:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      499:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      500: }
                    501: 
1.8       moko      502: static void _unbind(Request& r, MethodParams& params) {
1.1       moko      503:        VAmqp& self=GET_SELF(r, VAmqp);
                    504:        const char* exchange_c=0;
                    505:        const char* queue_c=0;
                    506:        const char* routing_key_c="";
1.8       moko      507:        if(HashStringValue* options=params.as_hash(0)){
                    508:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    509:                        String::Body key=i.key();
                    510:                        Value* value=i.value();
                    511:                        if(key=="exchange"){
                    512:                                exchange_c=value->as_string().cstr();
                    513:                        } else if(key=="queue"){
                    514:                                queue_c=value->as_string().cstr();
                    515:                        } else if(key=="routing_key"){
                    516:                                routing_key_c=value->as_string().cstr();
                    517:                        } else
                    518:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      519:                }
                    520:        }
                    521:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    522:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      523:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      524: }
                    525: 
1.8       moko      526: static void _purge(Request& r, MethodParams& params) {
                    527:        VAmqp& self=GET_SELF(r, VAmqp);
                    528:        const char* queue_c = 0;
                    529:        if(HashStringValue* options=params.as_hash(0)){
                    530:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    531:                        String::Body key=i.key();
                    532:                        Value* value=i.value();
                    533:                        if(key=="queue"){
                    534:                                queue_c=value->as_string().cstr();
                    535:                        } else
                    536:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    537:                }
                    538:        }
                    539:        if(!queue_c)
                    540:                throw Exception("amqp", 0, "queue must be specified");
                    541: 
1.9       moko      542:        amqp_queue_purge_ok_t *ok = amqp_queue_purge(self.connection(), self.channel(), amqp_cstring_bytes(queue_c));
1.8       moko      543:        check(amqp_get_rpc_reply(self.connection()));
1.9       moko      544:        r.write(*new VInt(ok ? ok->message_count : 0));
1.8       moko      545: }
                    546: 
                    547: static void _info(Request& r, MethodParams& params) {
                    548:        VAmqp& self = GET_SELF(r, VAmqp);
                    549:        const char* queue_c = 0;
                    550: 
                    551:        if (HashStringValue* options=params.as_hash(0)) {
                    552:                for (HashStringValue::Iterator i(*options); i; i.next()) {
                    553:                        String::Body key=i.key();
                    554:                        Value* value=i.value();
                    555:                        if(key=="queue"){
                    556:                                queue_c=value->as_string().cstr();
                    557:                        } else {
                    558:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    559:                        }
                    560:                }
                    561:        }
                    562: 
                    563:        if (!queue_c)
                    564:                throw Exception("amqp", 0, "queue must be specified");
                    565: 
                    566:        amqp_queue_declare_ok_t* ok = amqp_queue_declare(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), /*passive*/ 1, 0, 0, 0, amqp_empty_table);
                    567:        check(amqp_get_rpc_reply(self.connection()));
                    568: 
                    569:        Value& result=*new VHash;
1.9       moko      570:        if(ok){
                    571:                result.put_element(*new String("queue"), AMQP_VSTRING(ok->queue.bytes, ok->queue.len));
                    572:                result.put_element(*new String("messages"), new VInt(ok->message_count));
                    573:                result.put_element(*new String("consumers"), new VInt(ok->consumer_count));
                    574:        }
1.8       moko      575:        r.write(result);
                    576: }
                    577: 
                    578: static VHash *amqp_message_hash(amqp_envelope_t &envelope) {
                    579:        VHash *result=new VHash;
                    580:        HashStringValue* h=result->get_hash();
                    581:        h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
                    582:        h->put("delivery_tag", new VInt(envelope.delivery_tag));
                    583:        h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
                    584:        h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
                    585:        return result;
                    586: }
                    587: 
1.1       moko      588: static void _consume(Request& r, MethodParams& params) {
                    589:        VAmqp& self=GET_SELF(r, VAmqp);
                    590:        const char* queue_c=0;
                    591:        const char* consumer_tag_c=0;
1.10      moko      592:        bool no_ack=true, exclusive=false;
1.8       moko      593:        int count=1;
1.1       moko      594:        Junction* callback=0;
                    595: 
1.2       moko      596:        if(HashStringValue* options=params.as_hash(0)){
                    597:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    598:                        String::Body key=i.key();
                    599:                        Value* value=i.value();
1.3       moko      600:                        if(key=="callback"){
                    601:                                callback=value->get_junction();
                    602:                        } else if(key=="queue"){
1.2       moko      603:                                queue_c=value->as_string().cstr();
                    604:                        } else if(key=="consumer_tag"){
                    605:                                consumer_tag_c=value->as_string().cstr();
                    606:                        } else if(key=="no_ack"){
                    607:                                no_ack=r.process(*value).as_bool();
1.10      moko      608:                        } else if(key=="exclusive"){
                    609:                                exclusive=r.process(*value).as_bool();
1.8       moko      610:                        } else if(key=="count"){
                    611:                                count=r.process(*value).as_int();
1.2       moko      612:                        } else
                    613:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      614:                }
                    615:        }
                    616: 
1.8       moko      617:        if(!queue_c) throw Exception("amqp", 0, "queue must be specified");
1.1       moko      618: 
1.9       moko      619:        amqp_basic_consume_ok_t *ok = amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
1.1       moko      620:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
1.10      moko      621:                0 /*no_local*/, no_ack, exclusive, amqp_empty_table);
1.4       moko      622:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      623: 
1.8       moko      624:        if(callback){
                    625:                self.fstop=false;
                    626:                while(!self.fstop){
                    627:                        amqp_envelope_t envelope;
                    628:                        memset(&envelope, 0, sizeof(envelope));
                    629:                        amqp_maybe_release_buffers(self.connection());
                    630:                        amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
                    631:                        if(res.reply_type == AMQP_RESPONSE_NORMAL){
                    632:                                VHash *vh=amqp_message_hash(envelope);
                    633:                                Value *params_cb[]={vh};
                    634:                                METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
                    635:                                        frame.store_params(params_cb, 1);
                    636:                                        r.call(frame);
                    637:                                });
                    638:                                amqp_destroy_envelope(&envelope);
                    639:                        } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
                    640:                                continue;
                    641:                        } else {
                    642:                                break;
                    643:                        }
                    644:                }
                    645:        } else {
                    646:                VArray& result=*new VArray();
                    647:                ArrayValue& result_array=result.array();
                    648: 
                    649:                for(int i=0; i<count; i++){
                    650:                        amqp_envelope_t envelope;
                    651:                        memset(&envelope, 0, sizeof(envelope));
                    652:                        amqp_maybe_release_buffers(self.connection());
                    653:                        amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
                    654:                        if(res.reply_type == AMQP_RESPONSE_NORMAL){
                    655:                                result_array+=amqp_message_hash(envelope);
                    656:                                amqp_destroy_envelope(&envelope);
                    657:                        } else {
                    658:                                check(res);
                    659:                        }
1.1       moko      660:                }
1.8       moko      661:                r.write(result);
1.1       moko      662:        }
1.9       moko      663: 
                    664:        if(ok){
                    665:                amqp_basic_cancel(self.connection(), self.channel(), ok->consumer_tag);
                    666:                check(amqp_get_rpc_reply(self.connection()));
                    667:        }
1.1       moko      668: }
                    669: 
                    670: static void _stop_consume(Request& r, MethodParams&) {
                    671:        VAmqp& self=GET_SELF(r, VAmqp);
                    672:        self.fstop=true;
                    673: }
                    674: 
1.6       moko      675: #endif // WITH_AMQP
1.1       moko      676: 
                    677: // constructor
                    678: MAmqp::MAmqp(): Methoded("amqp") {
                    679:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
                    680: #ifdef WITH_AMQP
                    681:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
                    682:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
                    683:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
                    684:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
                    685:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
                    686:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
1.8       moko      687:        add_native_method("declare", Method::CT_DYNAMIC, _declare, 1, 1);
                    688:        add_native_method("delete", Method::CT_DYNAMIC, _delete, 1, 1);
                    689:        add_native_method("bind", Method::CT_DYNAMIC, _bind, 1, 1);
                    690:        add_native_method("unbind", Method::CT_DYNAMIC, _unbind, 1, 1);
                    691:        add_native_method("purge", Method::CT_DYNAMIC, _purge, 1, 1);
                    692:        add_native_method("info", Method::CT_DYNAMIC, _info, 1, 1);
1.2       moko      693:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1       moko      694:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6       moko      695: #endif // WITH_AMQP
1.1       moko      696: }

E-mail: