Annotation of parser3/src/classes/amqp.C, revision 1.5

1.1       moko        1: /** @file
                      2:        Parser: @b amqp parser class.
                      3: 
                      4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
                      5:        Authors: Konstantin Morshnev <moko@design.ru>
                      6: */
                      7: 
                      8: #include "pa_vmethod_frame.h"
                      9: 
                     10: #include "pa_request.h"
                     11: #include "pa_vstring.h"
                     12: #include "pa_vhash.h"
                     13: #include "pa_vbool.h"
                     14: #include "pa_vamqp.h"
                     15: 
                     16: #ifdef WITH_AMQP
                     17: #include <amqp.h>
                     18: #include <amqp_tcp_socket.h>
1.5     ! moko       19: #include <amqp_ssl_socket.h>
1.1       moko       20: #include <amqp_framing.h>
                     21: #include <stdlib.h>
                     22: #include <string.h>
                     23: #endif
                     24: 
1.5     ! moko       25: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.4 2025/11/07 23:00:11 moko Exp $" IDENT_PA_VAMQP_H;
1.1       moko       26: 
                     27: class MAmqp: public Methoded {
                     28: public: // VStateless_class
                     29:        Value* create_new_value(Pool&) { return new VAmqp(); }
                     30: public:
                     31:        MAmqp();
                     32: };
                     33: 
                     34: DECLARE_CLASS_VAR(amqp, new MAmqp);
                     35: 
                     36: static void _create(Request& r, MethodParams& params) {
                     37: VAmqp& self=GET_SELF(r, VAmqp);
                     38: 
                     39: #ifdef WITH_AMQP
                     40:        const char* host_c = "localhost";
                     41:        int port = 5672;
                     42:        const char* user_c = "guest";
                     43:        const char* pass_c = "guest";
                     44:        const char* vhost_c = "/";
                     45:        const char* locale_c = "en_US";
                     46:        int heartbeat = 30; // seconds
1.5     ! moko       47:        const char* tls_ca = 0;
        !            48:        const char* tls_cert = 0;
        !            49:        const char* tls_key = 0;
        !            50:        bool tls_specified = false;
        !            51:        bool tls_verify = true;
1.1       moko       52: 
                     53:        if(params.count()>0){
                     54:                if(HashStringValue* options=params.as_hash(0)){
                     55:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                     56:                                String::Body key=i.key();
                     57:                                Value* value=i.value();
                     58:                                if(key=="host"){
                     59:                                        host_c=value->as_string().cstr();
                     60:                                } else if(key=="port"){
                     61:                                        port=r.process(*value).as_int();
                     62:                                } else if(key=="user"){
                     63:                                        user_c=value->as_string().cstr();
                     64:                                } else if(key=="password"){
                     65:                                        pass_c=value->as_string().cstr();
                     66:                                } else if(key=="vhost"){
                     67:                                        vhost_c=value->as_string().cstr();
                     68:                                } else if(key=="locale"){
                     69:                                        locale_c=value->as_string().cstr();
                     70:                                } else if(key=="heartbeat"){
                     71:                                        heartbeat=r.process(*value).as_int();
1.5     ! moko       72:                                } else if(key=="tls"){
        !            73:                                        tls_specified = true;
        !            74:                                        if(HashStringValue* tls_options=value->get_hash()){
        !            75:                                                for(HashStringValue::Iterator t(*tls_options); t; t.next()){
        !            76:                                                        String::Body tkey=t.key();
        !            77:                                                        Value* tval=t.value();
        !            78:                                                        if(tkey=="ca"){
        !            79:                                                                tls_ca=tval->as_string().cstr();
        !            80:                                                        } else if(tkey=="cert"){
        !            81:                                                                tls_cert=tval->as_string().cstr();
        !            82:                                                        } else if(tkey=="key"){
        !            83:                                                                tls_key=tval->as_string().cstr();
        !            84:                                                        } else if(tkey=="verify"){
        !            85:                                                                tls_verify=r.process(*tval).as_bool();
        !            86:                                                        } else
        !            87:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !            88:                                                }
        !            89:                                        }
1.1       moko       90:                                } else
                     91:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                     92:                        }
                     93:                }
                     94:        }
                     95: 
                     96:        amqp_connection_state_t conn = amqp_new_connection();
1.5     ! moko       97:        amqp_socket_t* socket = 0;
        !            98:        
        !            99:        if(tls_specified) {
        !           100:                socket = amqp_ssl_socket_new(conn);
        !           101:                if(!socket)
        !           102:                        throw Exception("amqp", 0, "failed to create SSL socket");
        !           103:                
        !           104:                // Set CA certificate if provided
        !           105:                if(tls_ca)
        !           106:                        if(amqp_ssl_socket_set_cacert(socket, tls_ca))
        !           107:                                throw Exception("amqp", 0, "failed to set CA certificate");
        !           108:                
        !           109:                // Set client certificate and key if provided
        !           110:                if(tls_cert && tls_key) {
        !           111:                        if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
        !           112:                                throw Exception("amqp", 0, "failed to set client certificate/key");
        !           113:                } else if(tls_cert || tls_key) {
        !           114:                        throw Exception("amqp", 0, "both cert and key must be specified for TLS");
        !           115:                }
        !           116:                
        !           117:                // If CA is provided, peer verification will use it
        !           118:                amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
        !           119:                // If verify=true, enable hostname verification
        !           120:                amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
        !           121:        } else {
        !           122:                socket = amqp_tcp_socket_new(conn);
        !           123:                if(!socket)
        !           124:                        throw Exception("amqp", 0, "failed to create TCP socket");
        !           125:        }
        !           126:        
1.1       moko      127:        if(amqp_socket_open(socket, host_c, port))
1.5     ! moko      128:                throw Exception("amqp", 0, "failed to open socket");
1.1       moko      129: 
                    130:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
                    131:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
                    132:                amqp_destroy_connection(conn);
                    133:                throw Exception("amqp", 0, "login failed");
                    134:        }
                    135: 
                    136:        int channel = 1;
                    137:        amqp_channel_open(conn, channel);
                    138:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
                    139:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
                    140:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    141:                amqp_destroy_connection(conn);
                    142:                throw Exception("amqp", 0, "channel open failed");
                    143:        }
                    144: 
                    145:        self.fconnection = conn;
                    146:        self.fchannel = channel;
                    147: #else
                    148:        (void)params; (void)self;
                    149:        throw Exception("amqp", 0, "compiled without amqp support");
                    150: #endif
                    151: }
                    152: 
                    153: #ifdef WITH_AMQP
                    154: 
1.4       moko      155: static void check(amqp_rpc_reply_t rr){
                    156:        if(rr.reply_type == AMQP_RESPONSE_NORMAL)
                    157:                return;
                    158:        
                    159:        // Extract error message from reply
                    160:        const char* error_msg = 0;
                    161:        size_t error_len = 0;
                    162:        if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
                    163:                if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
                    164:                        amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
                    165:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                    166:                                error_msg = (const char*)m->reply_text.bytes;
                    167:                                error_len = m->reply_text.len;
                    168:                        }
                    169:                } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                    170:                        amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
                    171:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                    172:                                error_msg = (const char*)m->reply_text.bytes;
                    173:                                error_len = m->reply_text.len;
                    174:                        }
                    175:                }
                    176:        }
                    177:        
                    178:        if(error_msg) {
                    179:                throw Exception("amqp", 0, "failed: %.*s", (int)error_len, error_msg);
                    180:        } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
                    181:                throw Exception("amqp", 0, "failed: library error %d", rr.library_error);
                    182:        } else {
                    183:                throw Exception("amqp", 0, "failed");
                    184:        }
1.1       moko      185: }
                    186: 
1.2       moko      187: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
                    188: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
                    189: 
1.1       moko      190: static void _publish(Request& r, MethodParams& params) {
                    191:        VAmqp& self=GET_SELF(r, VAmqp);
                    192:        const String &msg=params.as_string(0, "msg must be string");
                    193:        const char* exchange_c = ""; // default exchange
                    194:        const char* routing_key_c = 0;
                    195:        bool mandatory=false;
                    196: 
                    197:        amqp_basic_properties_t props;
                    198:        props._flags = 0;
                    199: 
                    200:        if(params.count()>1){
                    201:                if(HashStringValue* options=params.as_hash(1)){
                    202:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    203:                                String::Body key=i.key();
                    204:                                Value* value=i.value();
                    205:                                if(key=="exchange"){
                    206:                                        exchange_c=value->as_string().cstr();
                    207:                                } else if(key=="routing_key"){
                    208:                                        routing_key_c=value->as_string().cstr();
                    209:                                } else if(key=="queue"){
1.3       moko      210:                                        routing_key_c=value->as_string().cstr();
1.1       moko      211:                                } else if(key=="mandatory"){
                    212:                                        mandatory=r.process(*value).as_bool();
1.3       moko      213:                                } else if(key=="content_type"){
                    214:                                        const char* v=value->as_string().cstr();
                    215:                                        props.content_type=amqp_cstring_bytes(v);
                    216:                                        props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
                    217:                                } else if(key=="content_encoding"){
                    218:                                        const char* v=value->as_string().cstr();
                    219:                                        props.content_encoding=amqp_cstring_bytes(v);
                    220:                                        props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
                    221:                                } else if(key=="delivery_mode"){
                    222:                                        uint8_t dm=(uint8_t)value->as_int();
                    223:                                        props.delivery_mode=dm;
                    224:                                        props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
                    225:                                } else if(key=="priority"){
                    226:                                        uint8_t pr=(uint8_t)value->as_int();
                    227:                                        props.priority=pr;
                    228:                                        props._flags|=AMQP_BASIC_PRIORITY_FLAG;
                    229:                                } else if(key=="correlation_id"){
                    230:                                        const char* v=value->as_string().cstr();
                    231:                                        props.correlation_id=amqp_cstring_bytes(v);
                    232:                                        props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
                    233:                                } else if(key=="reply_to"){
                    234:                                        const char* v=value->as_string().cstr();
                    235:                                        props.reply_to=amqp_cstring_bytes(v);
                    236:                                        props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
                    237:                                } else if(key=="expiration"){
                    238:                                        const char* v=value->as_string().cstr();
                    239:                                        props.expiration=amqp_cstring_bytes(v);
                    240:                                        props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
                    241:                                } else if(key=="message_id"){
                    242:                                        const char* v=value->as_string().cstr();
                    243:                                        props.message_id=amqp_cstring_bytes(v);
                    244:                                        props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
                    245:                                } else if(key=="timestamp"){
                    246:                                        uint64_t ts=(uint64_t)value->as_double();
                    247:                                        props.timestamp=ts;
                    248:                                        props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
                    249:                                } else if(key=="type"){
                    250:                                        const char* v=value->as_string().cstr();
                    251:                                        props.type=amqp_cstring_bytes(v);
                    252:                                        props._flags|=AMQP_BASIC_TYPE_FLAG;
                    253:                                } else if(key=="user_id"){
                    254:                                        const char* v=value->as_string().cstr();
                    255:                                        props.user_id=amqp_cstring_bytes(v);
                    256:                                        props._flags|=AMQP_BASIC_USER_ID_FLAG;
                    257:                                } else if(key=="app_id"){
                    258:                                        const char* v=value->as_string().cstr();
                    259:                                        props.app_id=amqp_cstring_bytes(v);
                    260:                                        props._flags|=AMQP_BASIC_APP_ID_FLAG;
                    261:                                } else if(key=="headers"){
                    262: /*                                     if(HashStringValue* hh=pval->get_hash()){
                    263:                                                size_t count=hh->count();
                    264:                                                amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
                    265:                                                size_t idx=0;
                    266:                                                for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
                    267:                                                        String::Body hkey=hi.key();
                    268:                                                        const char* hv=hi.value()->as_string().cstr();
                    269:                                                        entries[idx].key=amqp_cstring_bytes(hkey.cstr());
                    270:                                                        entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
                    271:                                                        entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
                    272:                                                        idx++;
1.1       moko      273:                                                }
1.3       moko      274:                                                props.headers.num_entries=(int)count;
                    275:                                                props.headers.entries=entries;
                    276:                                                props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1       moko      277:                                        }
1.3       moko      278: */                             } else
1.1       moko      279:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    280:                        }
                    281:                }
                    282:        }
                    283: 
                    284:        if(!routing_key_c)
                    285:                throw Exception("amqp", 0, "routing_key or queue must be specified");
                    286: 
                    287:        amqp_bytes_t body;
                    288:        body.len = msg.length();
                    289:        body.bytes=(void*)msg.cstr();
                    290: 
1.3       moko      291:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1       moko      292: 
                    293:        if(ret!=AMQP_STATUS_OK)
                    294:                throw Exception("amqp", 0, "publish failed");
                    295: 
                    296:        // free temporary headers entries if allocated
                    297:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
                    298: //             delete [] props.headers.entries;
                    299:        }
                    300: }
                    301: 
                    302: static void _release(Request& r, MethodParams&) {
                    303:        VAmqp& self=GET_SELF(r, VAmqp);
                    304:        if(self.fconnection){
                    305:                amqp_connection_state_t conn=self.fconnection;
                    306:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
                    307:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    308:                amqp_destroy_connection(conn);
                    309:                self.fconnection=0;
                    310:                self.fchannel=0;
                    311:        }
                    312: }
                    313: 
                    314: static void _ack(Request& r, MethodParams& params) {
                    315:        VAmqp& self=GET_SELF(r, VAmqp);
                    316:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    317:        int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
                    318:        if(ret!=AMQP_STATUS_OK)
                    319:                throw Exception("amqp", 0, "ack failed");
                    320: }
                    321: 
                    322: static void _nack(Request& r, MethodParams& params) {
                    323:        VAmqp& self=GET_SELF(r, VAmqp);
                    324:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    325:        bool requeue=false;
                    326:        if(params.count()>1){
                    327:                if(HashStringValue* options=params.as_hash(1)){
                    328:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    329:                                if(i.key()=="requeue"){
                    330:                                        requeue=r.process(*i.value()).as_bool();
                    331:                                } else
                    332:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    333:                        }
                    334:                }
                    335:        }
                    336:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
                    337:        if(ret!=AMQP_STATUS_OK)
                    338:                throw Exception("amqp", 0, "nack failed");
                    339: }
                    340: 
                    341: static void _qos(Request& r, MethodParams& params) {
                    342:        VAmqp& self=GET_SELF(r, VAmqp);
                    343:        uint16_t prefetch_count=0;
                    344:        if(params.count()>0){
                    345:                if(HashStringValue* options=params.as_hash(0)){
                    346:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    347:                                if(i.key()=="prefetch_count"){
                    348:                                        int pc=r.process(*i.value()).as_int();
                    349:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
                    350:                                } else
                    351:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    352:                        }
                    353:                }
                    354:        }
                    355:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
                    356:        if(!ret)
                    357:                throw Exception("amqp", 0, "qos failed");
                    358: }
                    359: 
                    360: static void _reject(Request& r, MethodParams& params) {
                    361:        VAmqp& self=GET_SELF(r, VAmqp);
                    362:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    363:        bool requeue=true; // by default return to queue
                    364:        if(params.count()>1){
                    365:                if(HashStringValue* options = params.as_hash(1)){
                    366:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    367:                                if(i.key() == "requeue"){
                    368:                                        requeue=r.process(*i.value()).as_bool();
                    369:                                } else
                    370:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    371:                        }
                    372:                }
                    373:        }
                    374:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
                    375:        if(ret!=AMQP_STATUS_OK)
                    376:                throw Exception("amqp", 0, "reject failed");
                    377: }
                    378: 
                    379: static void _declare_exchange(Request& r, MethodParams& params) {
                    380:        VAmqp& self=GET_SELF(r, VAmqp);
                    381:        const char* name_c = 0;
                    382:        const char* type_c = "direct";
                    383:        bool passive=false, durable=false, auto_delete=true, nowait=false;
                    384:        if(params.count()>0){
                    385:                if(HashStringValue* options=params.as_hash(0)){
                    386:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    387:                                String::Body key=i.key();
                    388:                                Value* value=i.value();
                    389:                                if(key=="name"){
                    390:                                        name_c=value->as_string().cstr();
                    391:                                } else if(key=="type"){
                    392:                                        type_c=value->as_string().cstr();
                    393:                                } else if(key=="passive"){
                    394:                                        passive=r.process(*value).as_bool();
                    395:                                } else if(key=="durable"){
                    396:                                        durable=r.process(*value).as_bool();
                    397:                                } else if(key=="auto_delete"){
                    398:                                        auto_delete=r.process(*value).as_bool();
                    399:                                } else if(key=="nowait"){
                    400:                                        nowait=r.process(*value).as_bool();
                    401:                                } else
                    402:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    403:                        }
                    404:                }
                    405:        }
                    406:        if(!name_c)
                    407:                throw Exception("amqp", 0, "name is required");
                    408:        amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
1.4       moko      409:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      410: }
                    411: 
                    412: static void _delete_exchange(Request& r, MethodParams& params) {
                    413:        VAmqp& self=GET_SELF(r, VAmqp);
                    414:        const char* name_c = 0;
                    415:        bool if_unused=false, nowait=false;
                    416:        if(params.count()>0){
                    417:                if(HashStringValue* options=params.as_hash(0)){
                    418:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    419:                                String::Body key=i.key();
                    420:                                Value* value=i.value();
                    421:                                if(key=="exchange"){
                    422:                                        name_c=value->as_string().cstr();
                    423:                                } else if(key=="if_unused"){
                    424:                                        if_unused=r.process(*value).as_bool();
                    425:                                } else if(key=="nowait"){
                    426:                                        nowait=r.process(*value).as_bool();
                    427:                                } else
                    428:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    429:                        }
                    430:                }
                    431:        }
                    432:        if(!name_c) throw Exception("amqp", 0, "exchange is required");
                    433:        amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
1.4       moko      434:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      435: }
                    436: 
                    437: static void _declare_queue(Request& r, MethodParams& params) {
                    438:        VAmqp& self=GET_SELF(r, VAmqp);
                    439:        const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
                    440:        if(params.count()>0){
                    441:                if(HashStringValue* options=params.as_hash(0)){
                    442:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    443:                                String::Body key=i.key();
                    444:                                Value* value=i.value();
                    445:                                if(key=="queue"){
                    446:                                        queue_c=value->as_string().cstr();
                    447:                                } else if(key=="passive"){
                    448:                                        passive=r.process(*value).as_bool();
                    449:                                } else if(key=="durable"){
                    450:                                        durable=r.process(*value).as_bool();
                    451:                                } else if(key=="auto_delete"){
                    452:                                        auto_delete=r.process(*value).as_bool();
                    453:                                } else if(key=="nowait"){
                    454:                                        nowait=r.process(*value).as_bool();
                    455:                                } else
                    456:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    457:                        }
                    458:                }
                    459:        }
                    460:        amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
1.4       moko      461:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      462:        if(!queue_c && ok){
1.2       moko      463:                r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1       moko      464:        }
                    465: }
                    466: 
                    467: static void _delete_queue(Request& r, MethodParams& params) {
                    468:        VAmqp& self=GET_SELF(r, VAmqp);
                    469:        const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
                    470:        if(params.count()>0){
                    471:                if(HashStringValue* options=params.as_hash(0)){
                    472:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    473:                                String::Body key=i.key();
                    474:                                Value* value=i.value();
                    475:                                if(key=="queue"){
                    476:                                        queue_c=value->as_string().cstr();
                    477:                                } else if(key=="if_unused"){
                    478:                                        if_unused=r.process(*value).as_bool();
                    479:                                } else if(key=="if_empty"){
                    480:                                        if_empty=r.process(*value).as_bool();
                    481:                                } else if(key=="nowait"){
                    482:                                        nowait=r.process(*value).as_bool();
                    483:                                } else
                    484:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    485:                        }
                    486:                }
                    487:        }
                    488:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    489:        amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
1.4       moko      490:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      491: }
                    492: 
                    493: static void _bind_queue(Request& r, MethodParams& params) {
                    494:        VAmqp& self=GET_SELF(r, VAmqp);
                    495:        const char* exchange_c=0;
                    496:        const char* queue_c=0;
                    497:        const char* routing_key_c="";
                    498:        if(params.count()>0){
                    499:                if(HashStringValue* options=params.as_hash(0)){
                    500:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    501:                                String::Body key=i.key();
                    502:                                Value* value=i.value();
                    503:                                if(key=="exchange"){
                    504:                                        exchange_c=value->as_string().cstr();
                    505:                                } else if(key=="queue"){
                    506:                                        queue_c=value->as_string().cstr();
                    507:                                } else if(key=="routing_key"){
                    508:                                        routing_key_c=value->as_string().cstr();
                    509:                                } else
                    510:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    511:                        }
                    512:                }
                    513:        }
                    514:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    515:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      516:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      517: }
                    518: 
                    519: static void _unbind_queue(Request& r, MethodParams& params) {
                    520:        VAmqp& self=GET_SELF(r, VAmqp);
                    521:        const char* exchange_c=0;
                    522:        const char* queue_c=0;
                    523:        const char* routing_key_c="";
                    524:        if(params.count()>0){
                    525:                if(HashStringValue* options=params.as_hash(0)){
                    526:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    527:                                String::Body key=i.key();
                    528:                                Value* value=i.value();
                    529:                                if(key=="exchange"){
                    530:                                        exchange_c=value->as_string().cstr();
                    531:                                } else if(key=="queue"){
                    532:                                        queue_c=value->as_string().cstr();
                    533:                                } else if(key=="routing_key"){
                    534:                                        routing_key_c=value->as_string().cstr();
                    535:                                } else
                    536:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    537:                        }
                    538:                }
                    539:        }
                    540:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    541:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      542:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      543: }
                    544: 
                    545: static void _consume(Request& r, MethodParams& params) {
                    546:        VAmqp& self=GET_SELF(r, VAmqp);
                    547:        const char* queue_c=0;
                    548:        const char* consumer_tag_c=0;
                    549:        bool no_ack=true, nowait=false;
                    550:        Junction* callback=0;
                    551: 
1.2       moko      552:        if(HashStringValue* options=params.as_hash(0)){
                    553:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    554:                        String::Body key=i.key();
                    555:                        Value* value=i.value();
1.3       moko      556:                        if(key=="callback"){
                    557:                                callback=value->get_junction();
                    558:                        } else if(key=="queue"){
1.2       moko      559:                                queue_c=value->as_string().cstr();
                    560:                        } else if(key=="consumer_tag"){
                    561:                                consumer_tag_c=value->as_string().cstr();
                    562:                        } else if(key=="no_ack"){
                    563:                                no_ack=r.process(*value).as_bool();
                    564:                        } else if(key=="nowait"){
                    565:                                nowait=r.process(*value).as_bool();
                    566:                        } else
                    567:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      568:                }
                    569:        }
                    570: 
                    571:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    572:        if(!callback) throw Exception("amqp", 0, "callback is required");
                    573: 
                    574:        amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
                    575:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
                    576:                0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4       moko      577:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      578: 
                    579:        self.fstop=false;
                    580:        while(!self.fstop){
1.2       moko      581:                amqp_envelope_t envelope;
                    582:                memset(&envelope, 0, sizeof(envelope));
                    583:                amqp_maybe_release_buffers(self.connection());
1.1       moko      584:                amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
                    585:                if(res.reply_type == AMQP_RESPONSE_NORMAL){
                    586:                        VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2       moko      587:                        h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1       moko      588:                        h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2       moko      589:                        h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
                    590:                        h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1       moko      591: 
                    592:                        Value *params_cb[]={&vh};
                    593:                        METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
                    594:                                frame.store_params(params_cb, 1);
                    595:                                r.call(frame);
                    596:                        });
                    597: 
                    598:                        amqp_destroy_envelope(&envelope);
                    599:                } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
                    600:                        continue;
                    601:                } else {
                    602:                        break;
                    603:                }
                    604:        }
                    605: }
                    606: 
                    607: static void _stop_consume(Request& r, MethodParams&) {
                    608:        VAmqp& self=GET_SELF(r, VAmqp);
                    609:        self.fstop=true;
                    610: }
                    611: 
                    612: #endif
                    613: 
                    614: // constructor
                    615: MAmqp::MAmqp(): Methoded("amqp") {
                    616:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
                    617: #ifdef WITH_AMQP
                    618:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
                    619:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
                    620:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
                    621:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
                    622:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
                    623:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
                    624:        add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
                    625:        add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
                    626:        add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
                    627:        add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
                    628:        add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
                    629:        add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2       moko      630:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1       moko      631:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
                    632: #endif
                    633: }

E-mail: