Annotation of parser3/src/classes/amqp.C, revision 1.7

1.1       moko        1: /** @file
                      2:        Parser: @b amqp parser class.
                      3: 
                      4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
                      5:        Authors: Konstantin Morshnev <moko@design.ru>
                      6: */
                      7: 
                      8: #include "pa_vmethod_frame.h"
                      9: 
                     10: #include "pa_request.h"
                     11: #include "pa_vstring.h"
                     12: #include "pa_vhash.h"
                     13: #include "pa_vbool.h"
                     14: #include "pa_vamqp.h"
                     15: 
                     16: #ifdef WITH_AMQP
                     17: #include <amqp.h>
                     18: #include <amqp_tcp_socket.h>
1.5       moko       19: #include <amqp_ssl_socket.h>
1.1       moko       20: #include <amqp_framing.h>
                     21: #include <stdlib.h>
                     22: #include <string.h>
                     23: #endif
                     24: 
1.7     ! moko       25: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.6 2025/11/08 00:27:08 moko Exp $" IDENT_PA_VAMQP_H;
1.1       moko       26: 
                     27: class MAmqp: public Methoded {
                     28: public: // VStateless_class
                     29:        Value* create_new_value(Pool&) { return new VAmqp(); }
                     30: public:
                     31:        MAmqp();
                     32: };
                     33: 
                     34: DECLARE_CLASS_VAR(amqp, new MAmqp);
                     35: 
1.6       moko       36: #ifdef WITH_AMQP
                     37: 
                     38: static void status_check(int ret, const char *detail=""){
                     39:        if(ret == AMQP_STATUS_OK)
                     40:                return;
                     41: 
                     42:        const char* error_str = amqp_error_string2(ret);
                     43:        if(error_str) {
                     44:                throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
                     45:        } else {
                     46:                throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
                     47:        }
                     48: }
                     49: 
                     50: static void check(amqp_rpc_reply_t rr, const char *detail=""){
                     51:        if(rr.reply_type == AMQP_RESPONSE_NORMAL)
                     52:                return;
                     53: 
                     54:        // Extract error message from reply
                     55:        const char* error_msg = 0;
                     56:        size_t error_len = 0;
                     57:        if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
                     58:                if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
                     59:                        amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
                     60:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     61:                                error_msg = (const char*)m->reply_text.bytes;
                     62:                                error_len = m->reply_text.len;
                     63:                        }
                     64:                } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                     65:                        amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
                     66:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     67:                                error_msg = (const char*)m->reply_text.bytes;
                     68:                                error_len = m->reply_text.len;
                     69:                        }
                     70:                }
                     71:        }
                     72:        
                     73:        if(error_msg) {
                     74:                throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
                     75:        } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
                     76:                status_check(rr.library_error, detail);
                     77:        }
                     78: 
                     79:        throw Exception("amqp", 0, "%sfailed", detail);
                     80: }
                     81: 
                     82: #endif // WITH_AMQP
                     83: 
                     84: 
1.1       moko       85: static void _create(Request& r, MethodParams& params) {
                     86: VAmqp& self=GET_SELF(r, VAmqp);
                     87: 
                     88: #ifdef WITH_AMQP
                     89:        const char* host_c = "localhost";
                     90:        int port = 5672;
                     91:        const char* user_c = "guest";
                     92:        const char* pass_c = "guest";
                     93:        const char* vhost_c = "/";
                     94:        const char* locale_c = "en_US";
                     95:        int heartbeat = 30; // seconds
1.5       moko       96:        const char* tls_ca = 0;
                     97:        const char* tls_cert = 0;
                     98:        const char* tls_key = 0;
                     99:        bool tls_specified = false;
                    100:        bool tls_verify = true;
1.1       moko      101: 
                    102:        if(params.count()>0){
                    103:                if(HashStringValue* options=params.as_hash(0)){
                    104:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    105:                                String::Body key=i.key();
                    106:                                Value* value=i.value();
                    107:                                if(key=="host"){
                    108:                                        host_c=value->as_string().cstr();
                    109:                                } else if(key=="port"){
                    110:                                        port=r.process(*value).as_int();
                    111:                                } else if(key=="user"){
                    112:                                        user_c=value->as_string().cstr();
                    113:                                } else if(key=="password"){
                    114:                                        pass_c=value->as_string().cstr();
                    115:                                } else if(key=="vhost"){
                    116:                                        vhost_c=value->as_string().cstr();
                    117:                                } else if(key=="locale"){
                    118:                                        locale_c=value->as_string().cstr();
                    119:                                } else if(key=="heartbeat"){
                    120:                                        heartbeat=r.process(*value).as_int();
1.5       moko      121:                                } else if(key=="tls"){
                    122:                                        tls_specified = true;
                    123:                                        if(HashStringValue* tls_options=value->get_hash()){
                    124:                                                for(HashStringValue::Iterator t(*tls_options); t; t.next()){
                    125:                                                        String::Body tkey=t.key();
                    126:                                                        Value* tval=t.value();
                    127:                                                        if(tkey=="ca"){
                    128:                                                                tls_ca=tval->as_string().cstr();
                    129:                                                        } else if(tkey=="cert"){
                    130:                                                                tls_cert=tval->as_string().cstr();
                    131:                                                        } else if(tkey=="key"){
                    132:                                                                tls_key=tval->as_string().cstr();
                    133:                                                        } else if(tkey=="verify"){
                    134:                                                                tls_verify=r.process(*tval).as_bool();
                    135:                                                        } else
                    136:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    137:                                                }
                    138:                                        }
1.1       moko      139:                                } else
                    140:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    141:                        }
                    142:                }
                    143:        }
                    144: 
                    145:        amqp_connection_state_t conn = amqp_new_connection();
1.5       moko      146:        amqp_socket_t* socket = 0;
                    147:        
                    148:        if(tls_specified) {
                    149:                socket = amqp_ssl_socket_new(conn);
                    150:                if(!socket)
                    151:                        throw Exception("amqp", 0, "failed to create SSL socket");
                    152:                
                    153:                // Set CA certificate if provided
                    154:                if(tls_ca)
                    155:                        if(amqp_ssl_socket_set_cacert(socket, tls_ca))
                    156:                                throw Exception("amqp", 0, "failed to set CA certificate");
1.6       moko      157: 
1.5       moko      158:                // Set client certificate and key if provided
                    159:                if(tls_cert && tls_key) {
                    160:                        if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
                    161:                                throw Exception("amqp", 0, "failed to set client certificate/key");
                    162:                } else if(tls_cert || tls_key) {
                    163:                        throw Exception("amqp", 0, "both cert and key must be specified for TLS");
                    164:                }
1.6       moko      165: 
1.5       moko      166:                // If CA is provided, peer verification will use it
                    167:                amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
                    168:                // If verify=true, enable hostname verification
                    169:                amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
                    170:        } else {
                    171:                socket = amqp_tcp_socket_new(conn);
                    172:                if(!socket)
                    173:                        throw Exception("amqp", 0, "failed to create TCP socket");
                    174:        }
                    175:        
1.6       moko      176:        status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1       moko      177: 
                    178:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
                    179:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
                    180:                amqp_destroy_connection(conn);
1.6       moko      181:                check(rlogin, "login ");
1.1       moko      182:        }
                    183: 
                    184:        int channel = 1;
                    185:        amqp_channel_open(conn, channel);
                    186:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
                    187:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
                    188:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    189:                amqp_destroy_connection(conn);
1.6       moko      190:                check(ropen, "open channel ");
1.1       moko      191:        }
                    192: 
                    193:        self.fconnection = conn;
                    194:        self.fchannel = channel;
                    195: #else
                    196:        (void)params; (void)self;
                    197:        throw Exception("amqp", 0, "compiled without amqp support");
1.6       moko      198: #endif // WITH_AMQP
1.1       moko      199: }
                    200: 
                    201: #ifdef WITH_AMQP
                    202: 
1.2       moko      203: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
                    204: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
                    205: 
1.1       moko      206: static void _publish(Request& r, MethodParams& params) {
                    207:        VAmqp& self=GET_SELF(r, VAmqp);
                    208:        const String &msg=params.as_string(0, "msg must be string");
                    209:        const char* exchange_c = ""; // default exchange
                    210:        const char* routing_key_c = 0;
                    211:        bool mandatory=false;
                    212: 
                    213:        amqp_basic_properties_t props;
                    214:        props._flags = 0;
                    215: 
                    216:        if(params.count()>1){
                    217:                if(HashStringValue* options=params.as_hash(1)){
                    218:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    219:                                String::Body key=i.key();
                    220:                                Value* value=i.value();
                    221:                                if(key=="exchange"){
                    222:                                        exchange_c=value->as_string().cstr();
                    223:                                } else if(key=="routing_key"){
                    224:                                        routing_key_c=value->as_string().cstr();
                    225:                                } else if(key=="queue"){
1.3       moko      226:                                        routing_key_c=value->as_string().cstr();
1.1       moko      227:                                } else if(key=="mandatory"){
                    228:                                        mandatory=r.process(*value).as_bool();
1.3       moko      229:                                } else if(key=="content_type"){
                    230:                                        const char* v=value->as_string().cstr();
                    231:                                        props.content_type=amqp_cstring_bytes(v);
                    232:                                        props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
                    233:                                } else if(key=="content_encoding"){
                    234:                                        const char* v=value->as_string().cstr();
                    235:                                        props.content_encoding=amqp_cstring_bytes(v);
                    236:                                        props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
                    237:                                } else if(key=="delivery_mode"){
                    238:                                        uint8_t dm=(uint8_t)value->as_int();
                    239:                                        props.delivery_mode=dm;
                    240:                                        props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
                    241:                                } else if(key=="priority"){
                    242:                                        uint8_t pr=(uint8_t)value->as_int();
                    243:                                        props.priority=pr;
                    244:                                        props._flags|=AMQP_BASIC_PRIORITY_FLAG;
                    245:                                } else if(key=="correlation_id"){
                    246:                                        const char* v=value->as_string().cstr();
                    247:                                        props.correlation_id=amqp_cstring_bytes(v);
                    248:                                        props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
                    249:                                } else if(key=="reply_to"){
                    250:                                        const char* v=value->as_string().cstr();
                    251:                                        props.reply_to=amqp_cstring_bytes(v);
                    252:                                        props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
                    253:                                } else if(key=="expiration"){
                    254:                                        const char* v=value->as_string().cstr();
                    255:                                        props.expiration=amqp_cstring_bytes(v);
                    256:                                        props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
                    257:                                } else if(key=="message_id"){
                    258:                                        const char* v=value->as_string().cstr();
                    259:                                        props.message_id=amqp_cstring_bytes(v);
                    260:                                        props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
                    261:                                } else if(key=="timestamp"){
                    262:                                        uint64_t ts=(uint64_t)value->as_double();
                    263:                                        props.timestamp=ts;
                    264:                                        props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
                    265:                                } else if(key=="type"){
                    266:                                        const char* v=value->as_string().cstr();
                    267:                                        props.type=amqp_cstring_bytes(v);
                    268:                                        props._flags|=AMQP_BASIC_TYPE_FLAG;
                    269:                                } else if(key=="user_id"){
                    270:                                        const char* v=value->as_string().cstr();
                    271:                                        props.user_id=amqp_cstring_bytes(v);
                    272:                                        props._flags|=AMQP_BASIC_USER_ID_FLAG;
                    273:                                } else if(key=="app_id"){
                    274:                                        const char* v=value->as_string().cstr();
                    275:                                        props.app_id=amqp_cstring_bytes(v);
                    276:                                        props._flags|=AMQP_BASIC_APP_ID_FLAG;
                    277:                                } else if(key=="headers"){
                    278: /*                                     if(HashStringValue* hh=pval->get_hash()){
                    279:                                                size_t count=hh->count();
                    280:                                                amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
                    281:                                                size_t idx=0;
                    282:                                                for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
                    283:                                                        String::Body hkey=hi.key();
                    284:                                                        const char* hv=hi.value()->as_string().cstr();
                    285:                                                        entries[idx].key=amqp_cstring_bytes(hkey.cstr());
                    286:                                                        entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
                    287:                                                        entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
                    288:                                                        idx++;
1.1       moko      289:                                                }
1.3       moko      290:                                                props.headers.num_entries=(int)count;
                    291:                                                props.headers.entries=entries;
                    292:                                                props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1       moko      293:                                        }
1.3       moko      294: */                             } else
1.1       moko      295:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    296:                        }
                    297:                }
                    298:        }
                    299: 
                    300:        if(!routing_key_c)
                    301:                throw Exception("amqp", 0, "routing_key or queue must be specified");
                    302: 
                    303:        amqp_bytes_t body;
                    304:        body.len = msg.length();
                    305:        body.bytes=(void*)msg.cstr();
                    306: 
1.3       moko      307:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1       moko      308: 
                    309:        if(ret!=AMQP_STATUS_OK)
                    310:                throw Exception("amqp", 0, "publish failed");
                    311: 
                    312:        // free temporary headers entries if allocated
                    313:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
                    314: //             delete [] props.headers.entries;
                    315:        }
                    316: }
                    317: 
                    318: static void _release(Request& r, MethodParams&) {
                    319:        VAmqp& self=GET_SELF(r, VAmqp);
                    320:        if(self.fconnection){
                    321:                amqp_connection_state_t conn=self.fconnection;
                    322:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
                    323:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    324:                amqp_destroy_connection(conn);
                    325:                self.fconnection=0;
                    326:                self.fchannel=0;
                    327:        }
                    328: }
                    329: 
                    330: static void _ack(Request& r, MethodParams& params) {
                    331:        VAmqp& self=GET_SELF(r, VAmqp);
1.7     ! moko      332:        double tag=params.as_double(0, "delivery tag must be number", r);
        !           333:        int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0);
1.1       moko      334:        if(ret!=AMQP_STATUS_OK)
                    335:                throw Exception("amqp", 0, "ack failed");
                    336: }
                    337: 
                    338: static void _nack(Request& r, MethodParams& params) {
                    339:        VAmqp& self=GET_SELF(r, VAmqp);
                    340:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    341:        bool requeue=false;
                    342:        if(params.count()>1){
                    343:                if(HashStringValue* options=params.as_hash(1)){
                    344:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    345:                                if(i.key()=="requeue"){
                    346:                                        requeue=r.process(*i.value()).as_bool();
                    347:                                } else
                    348:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    349:                        }
                    350:                }
                    351:        }
                    352:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
                    353:        if(ret!=AMQP_STATUS_OK)
                    354:                throw Exception("amqp", 0, "nack failed");
                    355: }
                    356: 
                    357: static void _qos(Request& r, MethodParams& params) {
                    358:        VAmqp& self=GET_SELF(r, VAmqp);
                    359:        uint16_t prefetch_count=0;
                    360:        if(params.count()>0){
                    361:                if(HashStringValue* options=params.as_hash(0)){
                    362:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    363:                                if(i.key()=="prefetch_count"){
                    364:                                        int pc=r.process(*i.value()).as_int();
                    365:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
                    366:                                } else
                    367:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    368:                        }
                    369:                }
                    370:        }
                    371:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
                    372:        if(!ret)
                    373:                throw Exception("amqp", 0, "qos failed");
                    374: }
                    375: 
                    376: static void _reject(Request& r, MethodParams& params) {
                    377:        VAmqp& self=GET_SELF(r, VAmqp);
                    378:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    379:        bool requeue=true; // by default return to queue
                    380:        if(params.count()>1){
                    381:                if(HashStringValue* options = params.as_hash(1)){
                    382:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    383:                                if(i.key() == "requeue"){
                    384:                                        requeue=r.process(*i.value()).as_bool();
                    385:                                } else
                    386:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    387:                        }
                    388:                }
                    389:        }
                    390:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
                    391:        if(ret!=AMQP_STATUS_OK)
                    392:                throw Exception("amqp", 0, "reject failed");
                    393: }
                    394: 
                    395: static void _declare_exchange(Request& r, MethodParams& params) {
                    396:        VAmqp& self=GET_SELF(r, VAmqp);
                    397:        const char* name_c = 0;
                    398:        const char* type_c = "direct";
                    399:        bool passive=false, durable=false, auto_delete=true, nowait=false;
                    400:        if(params.count()>0){
                    401:                if(HashStringValue* options=params.as_hash(0)){
                    402:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    403:                                String::Body key=i.key();
                    404:                                Value* value=i.value();
                    405:                                if(key=="name"){
                    406:                                        name_c=value->as_string().cstr();
                    407:                                } else if(key=="type"){
                    408:                                        type_c=value->as_string().cstr();
                    409:                                } else if(key=="passive"){
                    410:                                        passive=r.process(*value).as_bool();
                    411:                                } else if(key=="durable"){
                    412:                                        durable=r.process(*value).as_bool();
                    413:                                } else if(key=="auto_delete"){
                    414:                                        auto_delete=r.process(*value).as_bool();
                    415:                                } else if(key=="nowait"){
                    416:                                        nowait=r.process(*value).as_bool();
                    417:                                } else
                    418:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    419:                        }
                    420:                }
                    421:        }
                    422:        if(!name_c)
                    423:                throw Exception("amqp", 0, "name is required");
                    424:        amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
1.4       moko      425:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      426: }
                    427: 
                    428: static void _delete_exchange(Request& r, MethodParams& params) {
                    429:        VAmqp& self=GET_SELF(r, VAmqp);
                    430:        const char* name_c = 0;
                    431:        bool if_unused=false, nowait=false;
                    432:        if(params.count()>0){
                    433:                if(HashStringValue* options=params.as_hash(0)){
                    434:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    435:                                String::Body key=i.key();
                    436:                                Value* value=i.value();
                    437:                                if(key=="exchange"){
                    438:                                        name_c=value->as_string().cstr();
                    439:                                } else if(key=="if_unused"){
                    440:                                        if_unused=r.process(*value).as_bool();
                    441:                                } else if(key=="nowait"){
                    442:                                        nowait=r.process(*value).as_bool();
                    443:                                } else
                    444:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    445:                        }
                    446:                }
                    447:        }
                    448:        if(!name_c) throw Exception("amqp", 0, "exchange is required");
                    449:        amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
1.4       moko      450:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      451: }
                    452: 
                    453: static void _declare_queue(Request& r, MethodParams& params) {
                    454:        VAmqp& self=GET_SELF(r, VAmqp);
                    455:        const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
                    456:        if(params.count()>0){
                    457:                if(HashStringValue* options=params.as_hash(0)){
                    458:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    459:                                String::Body key=i.key();
                    460:                                Value* value=i.value();
                    461:                                if(key=="queue"){
                    462:                                        queue_c=value->as_string().cstr();
                    463:                                } else if(key=="passive"){
                    464:                                        passive=r.process(*value).as_bool();
                    465:                                } else if(key=="durable"){
                    466:                                        durable=r.process(*value).as_bool();
                    467:                                } else if(key=="auto_delete"){
                    468:                                        auto_delete=r.process(*value).as_bool();
                    469:                                } else if(key=="nowait"){
                    470:                                        nowait=r.process(*value).as_bool();
                    471:                                } else
                    472:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    473:                        }
                    474:                }
                    475:        }
                    476:        amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
1.4       moko      477:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      478:        if(!queue_c && ok){
1.2       moko      479:                r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1       moko      480:        }
                    481: }
                    482: 
                    483: static void _delete_queue(Request& r, MethodParams& params) {
                    484:        VAmqp& self=GET_SELF(r, VAmqp);
                    485:        const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
                    486:        if(params.count()>0){
                    487:                if(HashStringValue* options=params.as_hash(0)){
                    488:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    489:                                String::Body key=i.key();
                    490:                                Value* value=i.value();
                    491:                                if(key=="queue"){
                    492:                                        queue_c=value->as_string().cstr();
                    493:                                } else if(key=="if_unused"){
                    494:                                        if_unused=r.process(*value).as_bool();
                    495:                                } else if(key=="if_empty"){
                    496:                                        if_empty=r.process(*value).as_bool();
                    497:                                } else if(key=="nowait"){
                    498:                                        nowait=r.process(*value).as_bool();
                    499:                                } else
                    500:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    501:                        }
                    502:                }
                    503:        }
                    504:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    505:        amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
1.4       moko      506:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      507: }
                    508: 
                    509: static void _bind_queue(Request& r, MethodParams& params) {
                    510:        VAmqp& self=GET_SELF(r, VAmqp);
                    511:        const char* exchange_c=0;
                    512:        const char* queue_c=0;
                    513:        const char* routing_key_c="";
                    514:        if(params.count()>0){
                    515:                if(HashStringValue* options=params.as_hash(0)){
                    516:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    517:                                String::Body key=i.key();
                    518:                                Value* value=i.value();
                    519:                                if(key=="exchange"){
                    520:                                        exchange_c=value->as_string().cstr();
                    521:                                } else if(key=="queue"){
                    522:                                        queue_c=value->as_string().cstr();
                    523:                                } else if(key=="routing_key"){
                    524:                                        routing_key_c=value->as_string().cstr();
                    525:                                } else
                    526:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    527:                        }
                    528:                }
                    529:        }
                    530:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    531:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      532:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      533: }
                    534: 
                    535: static void _unbind_queue(Request& r, MethodParams& params) {
                    536:        VAmqp& self=GET_SELF(r, VAmqp);
                    537:        const char* exchange_c=0;
                    538:        const char* queue_c=0;
                    539:        const char* routing_key_c="";
                    540:        if(params.count()>0){
                    541:                if(HashStringValue* options=params.as_hash(0)){
                    542:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    543:                                String::Body key=i.key();
                    544:                                Value* value=i.value();
                    545:                                if(key=="exchange"){
                    546:                                        exchange_c=value->as_string().cstr();
                    547:                                } else if(key=="queue"){
                    548:                                        queue_c=value->as_string().cstr();
                    549:                                } else if(key=="routing_key"){
                    550:                                        routing_key_c=value->as_string().cstr();
                    551:                                } else
                    552:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    553:                        }
                    554:                }
                    555:        }
                    556:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    557:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      558:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      559: }
                    560: 
                    561: static void _consume(Request& r, MethodParams& params) {
                    562:        VAmqp& self=GET_SELF(r, VAmqp);
                    563:        const char* queue_c=0;
                    564:        const char* consumer_tag_c=0;
                    565:        bool no_ack=true, nowait=false;
                    566:        Junction* callback=0;
                    567: 
1.2       moko      568:        if(HashStringValue* options=params.as_hash(0)){
                    569:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    570:                        String::Body key=i.key();
                    571:                        Value* value=i.value();
1.3       moko      572:                        if(key=="callback"){
                    573:                                callback=value->get_junction();
                    574:                        } else if(key=="queue"){
1.2       moko      575:                                queue_c=value->as_string().cstr();
                    576:                        } else if(key=="consumer_tag"){
                    577:                                consumer_tag_c=value->as_string().cstr();
                    578:                        } else if(key=="no_ack"){
                    579:                                no_ack=r.process(*value).as_bool();
                    580:                        } else if(key=="nowait"){
                    581:                                nowait=r.process(*value).as_bool();
                    582:                        } else
                    583:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      584:                }
                    585:        }
                    586: 
                    587:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    588:        if(!callback) throw Exception("amqp", 0, "callback is required");
                    589: 
                    590:        amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
                    591:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
                    592:                0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4       moko      593:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      594: 
                    595:        self.fstop=false;
                    596:        while(!self.fstop){
1.2       moko      597:                amqp_envelope_t envelope;
                    598:                memset(&envelope, 0, sizeof(envelope));
                    599:                amqp_maybe_release_buffers(self.connection());
1.1       moko      600:                amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
                    601:                if(res.reply_type == AMQP_RESPONSE_NORMAL){
                    602:                        VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2       moko      603:                        h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.7     ! moko      604:                        h->put("delivery_tag", new VDouble((double)envelope.delivery_tag));
1.2       moko      605:                        h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
                    606:                        h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1       moko      607: 
                    608:                        Value *params_cb[]={&vh};
                    609:                        METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
                    610:                                frame.store_params(params_cb, 1);
                    611:                                r.call(frame);
                    612:                        });
                    613: 
                    614:                        amqp_destroy_envelope(&envelope);
                    615:                } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
                    616:                        continue;
                    617:                } else {
                    618:                        break;
                    619:                }
                    620:        }
                    621: }
                    622: 
                    623: static void _stop_consume(Request& r, MethodParams&) {
                    624:        VAmqp& self=GET_SELF(r, VAmqp);
                    625:        self.fstop=true;
                    626: }
                    627: 
1.6       moko      628: #endif // WITH_AMQP
1.1       moko      629: 
                    630: // constructor
                    631: MAmqp::MAmqp(): Methoded("amqp") {
                    632:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
                    633: #ifdef WITH_AMQP
                    634:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
                    635:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
                    636:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
                    637:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
                    638:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
                    639:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
                    640:        add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
                    641:        add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
                    642:        add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
                    643:        add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
                    644:        add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
                    645:        add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2       moko      646:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1       moko      647:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6       moko      648: #endif // WITH_AMQP
1.1       moko      649: }

E-mail: