Annotation of parser3/src/classes/amqp.C, revision 1.8

1.1       moko        1: /** @file
                      2:        Parser: @b amqp parser class.
                      3: 
                      4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
                      5:        Authors: Konstantin Morshnev <moko@design.ru>
                      6: */
                      7: 
                      8: #include "pa_vmethod_frame.h"
                      9: 
                     10: #include "pa_request.h"
                     11: #include "pa_vstring.h"
                     12: #include "pa_vhash.h"
1.8     ! moko       13: #include "pa_varray.h"
1.1       moko       14: #include "pa_vbool.h"
                     15: #include "pa_vamqp.h"
                     16: 
                     17: #ifdef WITH_AMQP
                     18: #include <amqp.h>
                     19: #include <amqp_tcp_socket.h>
1.5       moko       20: #include <amqp_ssl_socket.h>
1.1       moko       21: #include <amqp_framing.h>
                     22: #include <stdlib.h>
                     23: #include <string.h>
                     24: #endif
                     25: 
1.8     ! moko       26: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.7 2025/11/22 15:36:40 moko Exp $" IDENT_PA_VAMQP_H;
1.1       moko       27: 
                     28: class MAmqp: public Methoded {
                     29: public: // VStateless_class
                     30:        Value* create_new_value(Pool&) { return new VAmqp(); }
                     31: public:
                     32:        MAmqp();
                     33: };
                     34: 
                     35: DECLARE_CLASS_VAR(amqp, new MAmqp);
                     36: 
1.6       moko       37: #ifdef WITH_AMQP
                     38: 
                     39: static void status_check(int ret, const char *detail=""){
                     40:        if(ret == AMQP_STATUS_OK)
                     41:                return;
                     42: 
                     43:        const char* error_str = amqp_error_string2(ret);
                     44:        if(error_str) {
                     45:                throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
                     46:        } else {
                     47:                throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
                     48:        }
                     49: }
                     50: 
                     51: static void check(amqp_rpc_reply_t rr, const char *detail=""){
                     52:        if(rr.reply_type == AMQP_RESPONSE_NORMAL)
                     53:                return;
                     54: 
                     55:        // Extract error message from reply
                     56:        const char* error_msg = 0;
                     57:        size_t error_len = 0;
                     58:        if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
                     59:                if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
                     60:                        amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
                     61:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     62:                                error_msg = (const char*)m->reply_text.bytes;
                     63:                                error_len = m->reply_text.len;
                     64:                        }
                     65:                } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
                     66:                        amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
                     67:                        if(m->reply_text.len > 0 && m->reply_text.bytes) {
                     68:                                error_msg = (const char*)m->reply_text.bytes;
                     69:                                error_len = m->reply_text.len;
                     70:                        }
                     71:                }
                     72:        }
                     73:        
                     74:        if(error_msg) {
                     75:                throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
                     76:        } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
                     77:                status_check(rr.library_error, detail);
                     78:        }
                     79: 
                     80:        throw Exception("amqp", 0, "%sfailed", detail);
                     81: }
                     82: 
                     83: #endif // WITH_AMQP
                     84: 
                     85: 
1.1       moko       86: static void _create(Request& r, MethodParams& params) {
                     87: VAmqp& self=GET_SELF(r, VAmqp);
                     88: 
                     89: #ifdef WITH_AMQP
                     90:        const char* host_c = "localhost";
                     91:        int port = 5672;
                     92:        const char* user_c = "guest";
                     93:        const char* pass_c = "guest";
                     94:        const char* vhost_c = "/";
                     95:        const char* locale_c = "en_US";
                     96:        int heartbeat = 30; // seconds
1.5       moko       97:        const char* tls_ca = 0;
                     98:        const char* tls_cert = 0;
                     99:        const char* tls_key = 0;
                    100:        bool tls_specified = false;
                    101:        bool tls_verify = true;
1.1       moko      102: 
                    103:        if(params.count()>0){
                    104:                if(HashStringValue* options=params.as_hash(0)){
                    105:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    106:                                String::Body key=i.key();
                    107:                                Value* value=i.value();
                    108:                                if(key=="host"){
                    109:                                        host_c=value->as_string().cstr();
                    110:                                } else if(key=="port"){
                    111:                                        port=r.process(*value).as_int();
                    112:                                } else if(key=="user"){
                    113:                                        user_c=value->as_string().cstr();
                    114:                                } else if(key=="password"){
                    115:                                        pass_c=value->as_string().cstr();
                    116:                                } else if(key=="vhost"){
                    117:                                        vhost_c=value->as_string().cstr();
                    118:                                } else if(key=="locale"){
                    119:                                        locale_c=value->as_string().cstr();
                    120:                                } else if(key=="heartbeat"){
                    121:                                        heartbeat=r.process(*value).as_int();
1.5       moko      122:                                } else if(key=="tls"){
                    123:                                        tls_specified = true;
                    124:                                        if(HashStringValue* tls_options=value->get_hash()){
                    125:                                                for(HashStringValue::Iterator t(*tls_options); t; t.next()){
                    126:                                                        String::Body tkey=t.key();
                    127:                                                        Value* tval=t.value();
                    128:                                                        if(tkey=="ca"){
                    129:                                                                tls_ca=tval->as_string().cstr();
                    130:                                                        } else if(tkey=="cert"){
                    131:                                                                tls_cert=tval->as_string().cstr();
                    132:                                                        } else if(tkey=="key"){
                    133:                                                                tls_key=tval->as_string().cstr();
                    134:                                                        } else if(tkey=="verify"){
                    135:                                                                tls_verify=r.process(*tval).as_bool();
                    136:                                                        } else
                    137:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    138:                                                }
                    139:                                        }
1.1       moko      140:                                } else
                    141:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    142:                        }
                    143:                }
                    144:        }
                    145: 
                    146:        amqp_connection_state_t conn = amqp_new_connection();
1.5       moko      147:        amqp_socket_t* socket = 0;
                    148:        
                    149:        if(tls_specified) {
                    150:                socket = amqp_ssl_socket_new(conn);
                    151:                if(!socket)
                    152:                        throw Exception("amqp", 0, "failed to create SSL socket");
                    153:                
                    154:                // Set CA certificate if provided
                    155:                if(tls_ca)
                    156:                        if(amqp_ssl_socket_set_cacert(socket, tls_ca))
                    157:                                throw Exception("amqp", 0, "failed to set CA certificate");
1.6       moko      158: 
1.5       moko      159:                // Set client certificate and key if provided
                    160:                if(tls_cert && tls_key) {
                    161:                        if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
                    162:                                throw Exception("amqp", 0, "failed to set client certificate/key");
                    163:                } else if(tls_cert || tls_key) {
                    164:                        throw Exception("amqp", 0, "both cert and key must be specified for TLS");
                    165:                }
1.6       moko      166: 
1.5       moko      167:                // If CA is provided, peer verification will use it
                    168:                amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
                    169:                // If verify=true, enable hostname verification
                    170:                amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
                    171:        } else {
                    172:                socket = amqp_tcp_socket_new(conn);
                    173:                if(!socket)
                    174:                        throw Exception("amqp", 0, "failed to create TCP socket");
                    175:        }
                    176:        
1.6       moko      177:        status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1       moko      178: 
                    179:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
                    180:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
                    181:                amqp_destroy_connection(conn);
1.6       moko      182:                check(rlogin, "login ");
1.1       moko      183:        }
                    184: 
                    185:        int channel = 1;
                    186:        amqp_channel_open(conn, channel);
                    187:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
                    188:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
                    189:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    190:                amqp_destroy_connection(conn);
1.6       moko      191:                check(ropen, "open channel ");
1.1       moko      192:        }
                    193: 
                    194:        self.fconnection = conn;
                    195:        self.fchannel = channel;
                    196: #else
                    197:        (void)params; (void)self;
                    198:        throw Exception("amqp", 0, "compiled without amqp support");
1.6       moko      199: #endif // WITH_AMQP
1.1       moko      200: }
                    201: 
                    202: #ifdef WITH_AMQP
                    203: 
1.2       moko      204: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
                    205: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
                    206: 
1.1       moko      207: static void _publish(Request& r, MethodParams& params) {
                    208:        VAmqp& self=GET_SELF(r, VAmqp);
                    209:        const String &msg=params.as_string(0, "msg must be string");
                    210:        const char* exchange_c = ""; // default exchange
                    211:        const char* routing_key_c = 0;
                    212:        bool mandatory=false;
                    213: 
                    214:        amqp_basic_properties_t props;
                    215:        props._flags = 0;
                    216: 
                    217:        if(params.count()>1){
                    218:                if(HashStringValue* options=params.as_hash(1)){
                    219:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    220:                                String::Body key=i.key();
                    221:                                Value* value=i.value();
                    222:                                if(key=="exchange"){
                    223:                                        exchange_c=value->as_string().cstr();
                    224:                                } else if(key=="routing_key"){
                    225:                                        routing_key_c=value->as_string().cstr();
                    226:                                } else if(key=="queue"){
1.3       moko      227:                                        routing_key_c=value->as_string().cstr();
1.1       moko      228:                                } else if(key=="mandatory"){
                    229:                                        mandatory=r.process(*value).as_bool();
1.3       moko      230:                                } else if(key=="content_type"){
                    231:                                        const char* v=value->as_string().cstr();
                    232:                                        props.content_type=amqp_cstring_bytes(v);
                    233:                                        props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
                    234:                                } else if(key=="content_encoding"){
                    235:                                        const char* v=value->as_string().cstr();
                    236:                                        props.content_encoding=amqp_cstring_bytes(v);
                    237:                                        props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
                    238:                                } else if(key=="delivery_mode"){
                    239:                                        uint8_t dm=(uint8_t)value->as_int();
                    240:                                        props.delivery_mode=dm;
                    241:                                        props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
                    242:                                } else if(key=="priority"){
                    243:                                        uint8_t pr=(uint8_t)value->as_int();
                    244:                                        props.priority=pr;
                    245:                                        props._flags|=AMQP_BASIC_PRIORITY_FLAG;
                    246:                                } else if(key=="correlation_id"){
                    247:                                        const char* v=value->as_string().cstr();
                    248:                                        props.correlation_id=amqp_cstring_bytes(v);
                    249:                                        props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
                    250:                                } else if(key=="reply_to"){
                    251:                                        const char* v=value->as_string().cstr();
                    252:                                        props.reply_to=amqp_cstring_bytes(v);
                    253:                                        props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
                    254:                                } else if(key=="expiration"){
                    255:                                        const char* v=value->as_string().cstr();
                    256:                                        props.expiration=amqp_cstring_bytes(v);
                    257:                                        props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
                    258:                                } else if(key=="message_id"){
                    259:                                        const char* v=value->as_string().cstr();
                    260:                                        props.message_id=amqp_cstring_bytes(v);
                    261:                                        props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
                    262:                                } else if(key=="timestamp"){
                    263:                                        uint64_t ts=(uint64_t)value->as_double();
                    264:                                        props.timestamp=ts;
                    265:                                        props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
                    266:                                } else if(key=="type"){
                    267:                                        const char* v=value->as_string().cstr();
                    268:                                        props.type=amqp_cstring_bytes(v);
                    269:                                        props._flags|=AMQP_BASIC_TYPE_FLAG;
                    270:                                } else if(key=="user_id"){
                    271:                                        const char* v=value->as_string().cstr();
                    272:                                        props.user_id=amqp_cstring_bytes(v);
                    273:                                        props._flags|=AMQP_BASIC_USER_ID_FLAG;
                    274:                                } else if(key=="app_id"){
                    275:                                        const char* v=value->as_string().cstr();
                    276:                                        props.app_id=amqp_cstring_bytes(v);
                    277:                                        props._flags|=AMQP_BASIC_APP_ID_FLAG;
                    278:                                } else if(key=="headers"){
                    279: /*                                     if(HashStringValue* hh=pval->get_hash()){
                    280:                                                size_t count=hh->count();
                    281:                                                amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
                    282:                                                size_t idx=0;
                    283:                                                for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
                    284:                                                        String::Body hkey=hi.key();
                    285:                                                        const char* hv=hi.value()->as_string().cstr();
                    286:                                                        entries[idx].key=amqp_cstring_bytes(hkey.cstr());
                    287:                                                        entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
                    288:                                                        entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
                    289:                                                        idx++;
1.1       moko      290:                                                }
1.3       moko      291:                                                props.headers.num_entries=(int)count;
                    292:                                                props.headers.entries=entries;
                    293:                                                props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1       moko      294:                                        }
1.3       moko      295: */                             } else
1.1       moko      296:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    297:                        }
                    298:                }
                    299:        }
                    300: 
                    301:        if(!routing_key_c)
                    302:                throw Exception("amqp", 0, "routing_key or queue must be specified");
                    303: 
                    304:        amqp_bytes_t body;
                    305:        body.len = msg.length();
                    306:        body.bytes=(void*)msg.cstr();
                    307: 
1.3       moko      308:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1       moko      309: 
                    310:        if(ret!=AMQP_STATUS_OK)
                    311:                throw Exception("amqp", 0, "publish failed");
                    312: 
                    313:        // free temporary headers entries if allocated
                    314:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
                    315: //             delete [] props.headers.entries;
                    316:        }
                    317: }
                    318: 
                    319: static void _release(Request& r, MethodParams&) {
                    320:        VAmqp& self=GET_SELF(r, VAmqp);
                    321:        if(self.fconnection){
                    322:                amqp_connection_state_t conn=self.fconnection;
                    323:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
                    324:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    325:                amqp_destroy_connection(conn);
                    326:                self.fconnection=0;
                    327:                self.fchannel=0;
                    328:        }
                    329: }
                    330: 
                    331: static void _ack(Request& r, MethodParams& params) {
                    332:        VAmqp& self=GET_SELF(r, VAmqp);
1.7       moko      333:        double tag=params.as_double(0, "delivery tag must be number", r);
                    334:        int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0);
1.1       moko      335:        if(ret!=AMQP_STATUS_OK)
                    336:                throw Exception("amqp", 0, "ack failed");
                    337: }
                    338: 
                    339: static void _nack(Request& r, MethodParams& params) {
                    340:        VAmqp& self=GET_SELF(r, VAmqp);
                    341:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    342:        bool requeue=false;
                    343:        if(params.count()>1){
                    344:                if(HashStringValue* options=params.as_hash(1)){
                    345:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    346:                                if(i.key()=="requeue"){
                    347:                                        requeue=r.process(*i.value()).as_bool();
                    348:                                } else
                    349:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    350:                        }
                    351:                }
                    352:        }
                    353:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
                    354:        if(ret!=AMQP_STATUS_OK)
                    355:                throw Exception("amqp", 0, "nack failed");
                    356: }
                    357: 
                    358: static void _qos(Request& r, MethodParams& params) {
                    359:        VAmqp& self=GET_SELF(r, VAmqp);
                    360:        uint16_t prefetch_count=0;
                    361:        if(params.count()>0){
                    362:                if(HashStringValue* options=params.as_hash(0)){
                    363:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    364:                                if(i.key()=="prefetch_count"){
                    365:                                        int pc=r.process(*i.value()).as_int();
                    366:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
                    367:                                } else
                    368:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    369:                        }
                    370:                }
                    371:        }
                    372:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
                    373:        if(!ret)
                    374:                throw Exception("amqp", 0, "qos failed");
                    375: }
                    376: 
                    377: static void _reject(Request& r, MethodParams& params) {
                    378:        VAmqp& self=GET_SELF(r, VAmqp);
                    379:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    380:        bool requeue=true; // by default return to queue
                    381:        if(params.count()>1){
                    382:                if(HashStringValue* options = params.as_hash(1)){
                    383:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    384:                                if(i.key() == "requeue"){
                    385:                                        requeue=r.process(*i.value()).as_bool();
                    386:                                } else
                    387:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    388:                        }
                    389:                }
                    390:        }
                    391:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
                    392:        if(ret!=AMQP_STATUS_OK)
                    393:                throw Exception("amqp", 0, "reject failed");
                    394: }
                    395: 
1.8     ! moko      396: static void _declare(Request& r, MethodParams& params) {
1.1       moko      397:        VAmqp& self=GET_SELF(r, VAmqp);
1.8     ! moko      398:        const char* exchange_c = 0;
        !           399:        const char* queue_c = 0;
1.1       moko      400:        const char* type_c = "direct";
                    401:        bool passive=false, durable=false, auto_delete=true, nowait=false;
1.8     ! moko      402:        if(HashStringValue* options=params.as_hash(0)){
        !           403:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           404:                        String::Body key=i.key();
        !           405:                        Value* value=i.value();
        !           406:                        if(key=="exchange"){
        !           407:                                exchange_c=value->as_string().cstr();
        !           408:                        } else if(key=="queue"){
        !           409:                                queue_c=value->as_string().cstr();
        !           410:                        } else if(key=="type"){
        !           411:                                type_c=value->as_string().cstr();
        !           412:                        } else if(key=="passive"){
        !           413:                                passive=r.process(*value).as_bool();
        !           414:                        } else if(key=="durable"){
        !           415:                                durable=r.process(*value).as_bool();
        !           416:                        } else if(key=="auto_delete"){
        !           417:                                auto_delete=r.process(*value).as_bool();
        !           418:                        } else if(key=="nowait"){
        !           419:                                nowait=r.process(*value).as_bool();
        !           420:                        } else
        !           421:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      422:                }
                    423:        }
1.8     ! moko      424:        if(!exchange_c && !queue_c)
        !           425:                throw Exception("amqp", 0, "exchange or queue must be specified");
        !           426: 
        !           427:        if(exchange_c){
        !           428:                amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
        !           429:                check(amqp_get_rpc_reply(self.connection()));
        !           430:        }
1.1       moko      431: 
1.8     ! moko      432:        if(queue_c){
        !           433:                amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), *queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
        !           434:                check(amqp_get_rpc_reply(self.connection()));
        !           435:                if(!*queue_c && ok){
        !           436:                        r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1       moko      437:                }
                    438:        }
                    439: }
                    440: 
1.8     ! moko      441: static void _delete(Request& r, MethodParams& params) {
1.1       moko      442:        VAmqp& self=GET_SELF(r, VAmqp);
1.8     ! moko      443:        const char* exchange_c = 0;
        !           444:        const char* queue_c = 0;
        !           445:        bool if_unused=false, if_empty=false, nowait=false;
        !           446:        if(HashStringValue* options=params.as_hash(0)){
        !           447:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           448:                        String::Body key=i.key();
        !           449:                        Value* value=i.value();
        !           450:                        if(key=="exchange"){
        !           451:                                exchange_c=value->as_string().cstr();
        !           452:                        } else if(key=="queue"){
        !           453:                                queue_c=value->as_string().cstr();
        !           454:                        } else if(key=="if_unused"){
        !           455:                                if_unused=r.process(*value).as_bool();
        !           456:                        } else if(key=="if_empty"){
        !           457:                                if_empty=r.process(*value).as_bool();
        !           458:                        } else if(key=="nowait"){
        !           459:                                nowait=r.process(*value).as_bool();
        !           460:                        } else
        !           461:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      462:                }
                    463:        }
1.8     ! moko      464:        if(!exchange_c && !queue_c)
        !           465:                throw Exception("amqp", 0, "exchange or queue must be specified");
        !           466: 
        !           467:        if(exchange_c){
        !           468:                amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), if_unused);
        !           469:                check(amqp_get_rpc_reply(self.connection()));
1.1       moko      470:        }
                    471: 
1.8     ! moko      472:        if(queue_c){
        !           473:                amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
        !           474:                check(amqp_get_rpc_reply(self.connection()));
1.1       moko      475:        }
                    476: }
                    477: 
1.8     ! moko      478: static void _bind(Request& r, MethodParams& params) {
1.1       moko      479:        VAmqp& self=GET_SELF(r, VAmqp);
                    480:        const char* exchange_c=0;
                    481:        const char* queue_c=0;
                    482:        const char* routing_key_c="";
1.8     ! moko      483:        if(HashStringValue* options=params.as_hash(0)){
        !           484:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           485:                        String::Body key=i.key();
        !           486:                        Value* value=i.value();
        !           487:                        if(key=="exchange"){
        !           488:                                exchange_c=value->as_string().cstr();
        !           489:                        } else if(key=="queue"){
        !           490:                                queue_c=value->as_string().cstr();
        !           491:                        } else if(key=="routing_key"){
        !           492:                                routing_key_c=value->as_string().cstr();
        !           493:                        } else
        !           494:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      495:                }
                    496:        }
                    497:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    498:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      499:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      500: }
                    501: 
1.8     ! moko      502: static void _unbind(Request& r, MethodParams& params) {
1.1       moko      503:        VAmqp& self=GET_SELF(r, VAmqp);
                    504:        const char* exchange_c=0;
                    505:        const char* queue_c=0;
                    506:        const char* routing_key_c="";
1.8     ! moko      507:        if(HashStringValue* options=params.as_hash(0)){
        !           508:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           509:                        String::Body key=i.key();
        !           510:                        Value* value=i.value();
        !           511:                        if(key=="exchange"){
        !           512:                                exchange_c=value->as_string().cstr();
        !           513:                        } else if(key=="queue"){
        !           514:                                queue_c=value->as_string().cstr();
        !           515:                        } else if(key=="routing_key"){
        !           516:                                routing_key_c=value->as_string().cstr();
        !           517:                        } else
        !           518:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      519:                }
                    520:        }
                    521:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    522:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4       moko      523:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      524: }
                    525: 
1.8     ! moko      526: static void _purge(Request& r, MethodParams& params) {
        !           527:        VAmqp& self=GET_SELF(r, VAmqp);
        !           528:        const char* queue_c = 0;
        !           529:        if(HashStringValue* options=params.as_hash(0)){
        !           530:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           531:                        String::Body key=i.key();
        !           532:                        Value* value=i.value();
        !           533:                        if(key=="queue"){
        !           534:                                queue_c=value->as_string().cstr();
        !           535:                        } else
        !           536:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           537:                }
        !           538:        }
        !           539:        if(!queue_c)
        !           540:                throw Exception("amqp", 0, "queue must be specified");
        !           541: 
        !           542:        amqp_queue_purge(self.connection(), self.channel(), amqp_cstring_bytes(queue_c));
        !           543:        check(amqp_get_rpc_reply(self.connection()));
        !           544: }
        !           545: 
        !           546: static void _info(Request& r, MethodParams& params) {
        !           547:        VAmqp& self = GET_SELF(r, VAmqp);
        !           548:        const char* queue_c = 0;
        !           549: 
        !           550:        if (HashStringValue* options=params.as_hash(0)) {
        !           551:                for (HashStringValue::Iterator i(*options); i; i.next()) {
        !           552:                        String::Body key=i.key();
        !           553:                        Value* value=i.value();
        !           554:                        if(key=="queue"){
        !           555:                                queue_c=value->as_string().cstr();
        !           556:                        } else {
        !           557:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           558:                        }
        !           559:                }
        !           560:        }
        !           561: 
        !           562:        if (!queue_c)
        !           563:                throw Exception("amqp", 0, "queue must be specified");
        !           564: 
        !           565:        amqp_queue_declare_ok_t* ok = amqp_queue_declare(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), /*passive*/ 1, 0, 0, 0, amqp_empty_table);
        !           566:        check(amqp_get_rpc_reply(self.connection()));
        !           567: 
        !           568:        Value& result=*new VHash;
        !           569:        result.put_element(*new String("queue"), AMQP_VSTRING(ok->queue.bytes, ok->queue.len));
        !           570:        result.put_element(*new String("messages"), new VInt(ok->message_count));
        !           571:        result.put_element(*new String("consumers"), new VInt(ok->consumer_count));
        !           572:        r.write(result);
        !           573: }
        !           574: 
        !           575: static VHash *amqp_message_hash(amqp_envelope_t &envelope) {
        !           576:        VHash *result=new VHash;
        !           577:        HashStringValue* h=result->get_hash();
        !           578:        h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
        !           579:        h->put("delivery_tag", new VInt(envelope.delivery_tag));
        !           580:        h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
        !           581:        h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
        !           582:        return result;
        !           583: }
        !           584: 
1.1       moko      585: static void _consume(Request& r, MethodParams& params) {
                    586:        VAmqp& self=GET_SELF(r, VAmqp);
                    587:        const char* queue_c=0;
                    588:        const char* consumer_tag_c=0;
                    589:        bool no_ack=true, nowait=false;
1.8     ! moko      590:        int count=1;
1.1       moko      591:        Junction* callback=0;
                    592: 
1.2       moko      593:        if(HashStringValue* options=params.as_hash(0)){
                    594:                for(HashStringValue::Iterator i(*options); i; i.next()){
                    595:                        String::Body key=i.key();
                    596:                        Value* value=i.value();
1.3       moko      597:                        if(key=="callback"){
                    598:                                callback=value->get_junction();
                    599:                        } else if(key=="queue"){
1.2       moko      600:                                queue_c=value->as_string().cstr();
                    601:                        } else if(key=="consumer_tag"){
                    602:                                consumer_tag_c=value->as_string().cstr();
                    603:                        } else if(key=="no_ack"){
                    604:                                no_ack=r.process(*value).as_bool();
                    605:                        } else if(key=="nowait"){
                    606:                                nowait=r.process(*value).as_bool();
1.8     ! moko      607:                        } else if(key=="count"){
        !           608:                                count=r.process(*value).as_int();
1.2       moko      609:                        } else
                    610:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      611:                }
                    612:        }
                    613: 
1.8     ! moko      614:        if(!queue_c) throw Exception("amqp", 0, "queue must be specified");
1.1       moko      615: 
                    616:        amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
                    617:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
                    618:                0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4       moko      619:        check(amqp_get_rpc_reply(self.connection()));
1.1       moko      620: 
1.8     ! moko      621:        if(callback){
        !           622:                self.fstop=false;
        !           623:                while(!self.fstop){
        !           624:                        amqp_envelope_t envelope;
        !           625:                        memset(&envelope, 0, sizeof(envelope));
        !           626:                        amqp_maybe_release_buffers(self.connection());
        !           627:                        amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
        !           628:                        if(res.reply_type == AMQP_RESPONSE_NORMAL){
        !           629:                                VHash *vh=amqp_message_hash(envelope);
        !           630:                                Value *params_cb[]={vh};
        !           631:                                METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
        !           632:                                        frame.store_params(params_cb, 1);
        !           633:                                        r.call(frame);
        !           634:                                });
        !           635:                                amqp_destroy_envelope(&envelope);
        !           636:                        } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
        !           637:                                continue;
        !           638:                        } else {
        !           639:                                break;
        !           640:                        }
        !           641:                }
        !           642:        } else {
        !           643:                VArray& result=*new VArray();
        !           644:                ArrayValue& result_array=result.array();
        !           645: 
        !           646:                for(int i=0; i<count; i++){
        !           647:                        amqp_envelope_t envelope;
        !           648:                        memset(&envelope, 0, sizeof(envelope));
        !           649:                        amqp_maybe_release_buffers(self.connection());
        !           650:                        amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
        !           651:                        if(res.reply_type == AMQP_RESPONSE_NORMAL){
        !           652:                                result_array+=amqp_message_hash(envelope);
        !           653:                                amqp_destroy_envelope(&envelope);
        !           654:                        } else {
        !           655:                                check(res);
        !           656:                        }
1.1       moko      657:                }
1.8     ! moko      658:                r.write(result);
1.1       moko      659:        }
                    660: }
                    661: 
                    662: static void _stop_consume(Request& r, MethodParams&) {
                    663:        VAmqp& self=GET_SELF(r, VAmqp);
                    664:        self.fstop=true;
                    665: }
                    666: 
1.6       moko      667: #endif // WITH_AMQP
1.1       moko      668: 
                    669: // constructor
                    670: MAmqp::MAmqp(): Methoded("amqp") {
                    671:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
                    672: #ifdef WITH_AMQP
                    673:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
                    674:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
                    675:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
                    676:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
                    677:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
                    678:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
1.8     ! moko      679:        add_native_method("declare", Method::CT_DYNAMIC, _declare, 1, 1);
        !           680:        add_native_method("delete", Method::CT_DYNAMIC, _delete, 1, 1);
        !           681:        add_native_method("bind", Method::CT_DYNAMIC, _bind, 1, 1);
        !           682:        add_native_method("unbind", Method::CT_DYNAMIC, _unbind, 1, 1);
        !           683:        add_native_method("purge", Method::CT_DYNAMIC, _purge, 1, 1);
        !           684:        add_native_method("info", Method::CT_DYNAMIC, _info, 1, 1);
1.2       moko      685:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1       moko      686:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6       moko      687: #endif // WITH_AMQP
1.1       moko      688: }

E-mail: