Annotation of parser3/src/classes/amqp.C, revision 1.2

1.1       moko        1: /** @file
                      2:        Parser: @b amqp parser class.
                      3: 
                      4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
                      5:        Authors: Konstantin Morshnev <moko@design.ru>
                      6: */
                      7: 
                      8: #include "pa_vmethod_frame.h"
                      9: 
                     10: #include "pa_request.h"
                     11: #include "pa_vstring.h"
                     12: #include "pa_vhash.h"
                     13: #include "pa_vbool.h"
                     14: #include "pa_vamqp.h"
                     15: 
                     16: #ifdef WITH_AMQP
                     17: #include <amqp.h>
                     18: #include <amqp_tcp_socket.h>
                     19: #include <amqp_framing.h>
                     20: #include <stdlib.h>
                     21: #include <string.h>
                     22: #endif
                     23: 
1.2     ! moko       24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.1 2025/11/06 22:08:03 moko Exp $" IDENT_PA_VAMQP_H;
1.1       moko       25: 
                     26: class MAmqp: public Methoded {
                     27: public: // VStateless_class
                     28:        Value* create_new_value(Pool&) { return new VAmqp(); }
                     29: public:
                     30:        MAmqp();
                     31: };
                     32: 
                     33: DECLARE_CLASS_VAR(amqp, new MAmqp);
                     34: 
                     35: static void _create(Request& r, MethodParams& params) {
                     36: VAmqp& self=GET_SELF(r, VAmqp);
                     37: 
                     38: #ifdef WITH_AMQP
                     39:        const char* host_c = "localhost";
                     40:        int port = 5672;
                     41:        const char* user_c = "guest";
                     42:        const char* pass_c = "guest";
                     43:        const char* vhost_c = "/";
                     44:        const char* locale_c = "en_US";
                     45:        int heartbeat = 30; // seconds
                     46: 
                     47:        if(params.count()>0){
                     48:                if(HashStringValue* options=params.as_hash(0)){
                     49:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                     50:                                String::Body key=i.key();
                     51:                                Value* value=i.value();
                     52:                                if(key=="host"){
                     53:                                        host_c=value->as_string().cstr();
                     54:                                } else if(key=="port"){
                     55:                                        port=r.process(*value).as_int();
                     56:                                } else if(key=="user"){
                     57:                                        user_c=value->as_string().cstr();
                     58:                                } else if(key=="password"){
                     59:                                        pass_c=value->as_string().cstr();
                     60:                                } else if(key=="vhost"){
                     61:                                        vhost_c=value->as_string().cstr();
                     62:                                } else if(key=="locale"){
                     63:                                        locale_c=value->as_string().cstr();
                     64:                                } else if(key=="heartbeat"){
                     65:                                        heartbeat=r.process(*value).as_int();
                     66:                                } else
                     67:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                     68:                        }
                     69:                }
                     70:        }
                     71: 
                     72:        amqp_connection_state_t conn = amqp_new_connection();
                     73:        amqp_socket_t* socket = amqp_tcp_socket_new(conn);
                     74:        if(!socket)
                     75:                throw Exception("amqp", 0, "failed to create TCP socket");
                     76:        if(amqp_socket_open(socket, host_c, port))
                     77:                throw Exception("amqp", 0, "failed to open TCP socket");
                     78: 
                     79:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
                     80:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
                     81:                amqp_destroy_connection(conn);
                     82:                throw Exception("amqp", 0, "login failed");
                     83:        }
                     84: 
                     85:        int channel = 1;
                     86:        amqp_channel_open(conn, channel);
                     87:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
                     88:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
                     89:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                     90:                amqp_destroy_connection(conn);
                     91:                throw Exception("amqp", 0, "channel open failed");
                     92:        }
                     93: 
                     94:        self.fconnection = conn;
                     95:        self.fchannel = channel;
                     96: #else
                     97:        (void)params; (void)self;
                     98:        throw Exception("amqp", 0, "compiled without amqp support");
                     99: #endif
                    100: }
                    101: 
                    102: #ifdef WITH_AMQP
                    103: 
                    104: static void check(const char* action, amqp_rpc_reply_t rr){
                    105:        if(rr.reply_type != AMQP_RESPONSE_NORMAL)
                    106:                throw Exception("amqp", 0, "%s failed", action);
                    107: }
                    108: 
1.2     ! moko      109: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
        !           110: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
        !           111: 
1.1       moko      112: static void _publish(Request& r, MethodParams& params) {
                    113:        VAmqp& self=GET_SELF(r, VAmqp);
                    114:        const String &msg=params.as_string(0, "msg must be string");
                    115:        const char* exchange_c = ""; // default exchange
                    116:        const char* routing_key_c = 0;
                    117:        bool have_queue_sugar=false;
                    118:        bool mandatory=false;
                    119: 
                    120:        amqp_basic_properties_t props;
                    121:        props._flags = 0;
                    122: 
                    123:        if(params.count()>1){
                    124:                if(HashStringValue* options=params.as_hash(1)){
                    125:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    126:                                String::Body key=i.key();
                    127:                                Value* value=i.value();
                    128:                                if(key=="exchange"){
                    129:                                        exchange_c=value->as_string().cstr();
                    130:                                } else if(key=="routing_key"){
                    131:                                        routing_key_c=value->as_string().cstr();
                    132:                                } else if(key=="queue"){
                    133:                                        routing_key_c=value->as_string().cstr(); have_queue_sugar=true;
                    134:                                } else if(key=="mandatory"){
                    135:                                        mandatory=r.process(*value).as_bool();
                    136:                                } else if(key=="properties"){
                    137:                                        // parse message properties
                    138:                                        if(HashStringValue* ph=value->get_hash()){
                    139:                                                for(HashStringValue::Iterator p(*ph); p; p.next()){
                    140:                                                        String::Body pkey=p.key();
                    141:                                                        Value* pval=p.value();
                    142:                                                        if(pkey=="content_type"){
                    143:                                                                const char* v=pval->as_string().cstr();
                    144:                                                                props.content_type=amqp_cstring_bytes(v);
                    145:                                                                props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
                    146:                                                        } else if(pkey=="content_encoding"){
                    147:                                                                const char* v=pval->as_string().cstr();
                    148:                                                                props.content_encoding=amqp_cstring_bytes(v);
                    149:                                                                props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
                    150:                                                        } else if(pkey=="delivery_mode"){
                    151:                                                                uint8_t dm=(uint8_t)pval->as_int();
                    152:                                                                props.delivery_mode=dm;
                    153:                                                                props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
                    154:                                                        } else if(pkey=="priority"){
                    155:                                                                uint8_t pr=(uint8_t)pval->as_int();
                    156:                                                                props.priority=pr;
                    157:                                                                props._flags|=AMQP_BASIC_PRIORITY_FLAG;
                    158:                                                        } else if(pkey=="correlation_id"){
                    159:                                                                const char* v=pval->as_string().cstr();
                    160:                                                                props.correlation_id=amqp_cstring_bytes(v);
                    161:                                                                props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
                    162:                                                        } else if(pkey=="reply_to"){
                    163:                                                                const char* v=pval->as_string().cstr();
                    164:                                                                props.reply_to=amqp_cstring_bytes(v);
                    165:                                                                props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
                    166:                                                        } else if(pkey=="expiration"){
                    167:                                                                const char* v=pval->as_string().cstr();
                    168:                                                                props.expiration=amqp_cstring_bytes(v);
                    169:                                                                props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
                    170:                                                        } else if(pkey=="message_id"){
                    171:                                                                const char* v=pval->as_string().cstr();
                    172:                                                                props.message_id=amqp_cstring_bytes(v);
                    173:                                                                props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
                    174:                                                        } else if(pkey=="timestamp"){
                    175:                                                                uint64_t ts=(uint64_t)pval->as_double();
                    176:                                                                props.timestamp=ts;
                    177:                                                                props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
                    178:                                                        } else if(pkey=="type"){
                    179:                                                                const char* v=pval->as_string().cstr();
                    180:                                                                props.type=amqp_cstring_bytes(v);
                    181:                                                                props._flags|=AMQP_BASIC_TYPE_FLAG;
                    182:                                                        } else if(pkey=="user_id"){
                    183:                                                                const char* v=pval->as_string().cstr();
                    184:                                                                props.user_id=amqp_cstring_bytes(v);
                    185:                                                                props._flags|=AMQP_BASIC_USER_ID_FLAG;
                    186:                                                        } else if(pkey=="app_id"){
                    187:                                                                const char* v=pval->as_string().cstr();
                    188:                                                                props.app_id=amqp_cstring_bytes(v);
                    189:                                                                props._flags|=AMQP_BASIC_APP_ID_FLAG;
                    190:                                                        } else if(pkey=="headers"){
                    191: /*                                                             if(HashStringValue* hh=pval->get_hash()){
                    192:                                                                        size_t count=hh->count();
                    193:                                                                        amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
                    194:                                                                        size_t idx=0;
                    195:                                                                        for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
                    196:                                                                                String::Body hkey=hi.key();
                    197:                                                                                const char* hv=hi.value()->as_string().cstr();
                    198:                                                                                entries[idx].key=amqp_cstring_bytes(hkey.cstr());
                    199:                                                                                entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
                    200:                                                                                entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
                    201:                                                                                idx++;
                    202:                                                                        }
                    203:                                                                        props.headers.num_entries=(int)count;
                    204:                                                                        props.headers.entries=entries;
                    205:                                                                        props._flags|=AMQP_BASIC_HEADERS_FLAG;
                    206:                                                                }
                    207: */                                                     } else
                    208:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    209:                                                }
                    210:                                        }
                    211:                                } else
                    212:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    213:                        }
                    214:                }
                    215:        }
                    216: 
                    217:        if(!routing_key_c)
                    218:                throw Exception("amqp", 0, "routing_key or queue must be specified");
                    219: 
                    220:        amqp_bytes_t body;
                    221:        body.len = msg.length();
                    222:        body.bytes=(void*)msg.cstr();
                    223: 
                    224:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body);
                    225: 
                    226:        if(ret!=AMQP_STATUS_OK)
                    227:                throw Exception("amqp", 0, "publish failed");
                    228: 
                    229:        // free temporary headers entries if allocated
                    230:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
                    231: //             delete [] props.headers.entries;
                    232:        }
                    233:        (void)have_queue_sugar;
                    234: }
                    235: 
                    236: static void _release(Request& r, MethodParams&) {
                    237:        VAmqp& self=GET_SELF(r, VAmqp);
                    238:        if(self.fconnection){
                    239:                amqp_connection_state_t conn=self.fconnection;
                    240:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
                    241:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
                    242:                amqp_destroy_connection(conn);
                    243:                self.fconnection=0;
                    244:                self.fchannel=0;
                    245:        }
                    246: }
                    247: 
                    248: static void _ack(Request& r, MethodParams& params) {
                    249:        VAmqp& self=GET_SELF(r, VAmqp);
                    250:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    251:        int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
                    252:        if(ret!=AMQP_STATUS_OK)
                    253:                throw Exception("amqp", 0, "ack failed");
                    254: }
                    255: 
                    256: static void _nack(Request& r, MethodParams& params) {
                    257:        VAmqp& self=GET_SELF(r, VAmqp);
                    258:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    259:        bool requeue=false;
                    260:        if(params.count()>1){
                    261:                if(HashStringValue* options=params.as_hash(1)){
                    262:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    263:                                if(i.key()=="requeue"){
                    264:                                        requeue=r.process(*i.value()).as_bool();
                    265:                                } else
                    266:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    267:                        }
                    268:                }
                    269:        }
                    270:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
                    271:        if(ret!=AMQP_STATUS_OK)
                    272:                throw Exception("amqp", 0, "nack failed");
                    273: }
                    274: 
                    275: static void _qos(Request& r, MethodParams& params) {
                    276:        VAmqp& self=GET_SELF(r, VAmqp);
                    277:        uint16_t prefetch_count=0;
                    278:        if(params.count()>0){
                    279:                if(HashStringValue* options=params.as_hash(0)){
                    280:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    281:                                if(i.key()=="prefetch_count"){
                    282:                                        int pc=r.process(*i.value()).as_int();
                    283:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
                    284:                                } else
                    285:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    286:                        }
                    287:                }
                    288:        }
                    289:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
                    290:        if(!ret)
                    291:                throw Exception("amqp", 0, "qos failed");
                    292: }
                    293: 
                    294: static void _reject(Request& r, MethodParams& params) {
                    295:        VAmqp& self=GET_SELF(r, VAmqp);
                    296:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
                    297:        bool requeue=true; // by default return to queue
                    298:        if(params.count()>1){
                    299:                if(HashStringValue* options = params.as_hash(1)){
                    300:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    301:                                if(i.key() == "requeue"){
                    302:                                        requeue=r.process(*i.value()).as_bool();
                    303:                                } else
                    304:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    305:                        }
                    306:                }
                    307:        }
                    308:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
                    309:        if(ret!=AMQP_STATUS_OK)
                    310:                throw Exception("amqp", 0, "reject failed");
                    311: }
                    312: 
                    313: static void _declare_exchange(Request& r, MethodParams& params) {
                    314:        VAmqp& self=GET_SELF(r, VAmqp);
                    315:        const char* name_c = 0;
                    316:        const char* type_c = "direct";
                    317:        bool passive=false, durable=false, auto_delete=true, nowait=false;
                    318:        if(params.count()>0){
                    319:                if(HashStringValue* options=params.as_hash(0)){
                    320:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    321:                                String::Body key=i.key();
                    322:                                Value* value=i.value();
                    323:                                if(key=="name"){
                    324:                                        name_c=value->as_string().cstr();
                    325:                                } else if(key=="type"){
                    326:                                        type_c=value->as_string().cstr();
                    327:                                } else if(key=="passive"){
                    328:                                        passive=r.process(*value).as_bool();
                    329:                                } else if(key=="durable"){
                    330:                                        durable=r.process(*value).as_bool();
                    331:                                } else if(key=="auto_delete"){
                    332:                                        auto_delete=r.process(*value).as_bool();
                    333:                                } else if(key=="nowait"){
                    334:                                        nowait=r.process(*value).as_bool();
                    335:                                } else
                    336:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    337:                        }
                    338:                }
                    339:        }
                    340:        if(!name_c)
                    341:                throw Exception("amqp", 0, "name is required");
                    342:        amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
                    343:        check("declare exchange", amqp_get_rpc_reply(self.connection()));
                    344: }
                    345: 
                    346: static void _delete_exchange(Request& r, MethodParams& params) {
                    347:        VAmqp& self=GET_SELF(r, VAmqp);
                    348:        const char* name_c = 0;
                    349:        bool if_unused=false, nowait=false;
                    350:        if(params.count()>0){
                    351:                if(HashStringValue* options=params.as_hash(0)){
                    352:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    353:                                String::Body key=i.key();
                    354:                                Value* value=i.value();
                    355:                                if(key=="exchange"){
                    356:                                        name_c=value->as_string().cstr();
                    357:                                } else if(key=="if_unused"){
                    358:                                        if_unused=r.process(*value).as_bool();
                    359:                                } else if(key=="nowait"){
                    360:                                        nowait=r.process(*value).as_bool();
                    361:                                } else
                    362:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    363:                        }
                    364:                }
                    365:        }
                    366:        if(!name_c) throw Exception("amqp", 0, "exchange is required");
                    367:        amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
                    368:        check("delete exchange", amqp_get_rpc_reply(self.connection()));
                    369: }
                    370: 
                    371: static void _declare_queue(Request& r, MethodParams& params) {
                    372:        VAmqp& self=GET_SELF(r, VAmqp);
                    373:        const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
                    374:        if(params.count()>0){
                    375:                if(HashStringValue* options=params.as_hash(0)){
                    376:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    377:                                String::Body key=i.key();
                    378:                                Value* value=i.value();
                    379:                                if(key=="queue"){
                    380:                                        queue_c=value->as_string().cstr();
                    381:                                } else if(key=="passive"){
                    382:                                        passive=r.process(*value).as_bool();
                    383:                                } else if(key=="durable"){
                    384:                                        durable=r.process(*value).as_bool();
                    385:                                } else if(key=="auto_delete"){
                    386:                                        auto_delete=r.process(*value).as_bool();
                    387:                                } else if(key=="nowait"){
                    388:                                        nowait=r.process(*value).as_bool();
                    389:                                } else
                    390:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    391:                        }
                    392:                }
                    393:        }
                    394:        amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
                    395:        check("declare queue", amqp_get_rpc_reply(self.connection()));
                    396:        if(!queue_c && ok){
1.2     ! moko      397:                r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1       moko      398:        }
                    399: }
                    400: 
                    401: static void _delete_queue(Request& r, MethodParams& params) {
                    402:        VAmqp& self=GET_SELF(r, VAmqp);
                    403:        const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
                    404:        if(params.count()>0){
                    405:                if(HashStringValue* options=params.as_hash(0)){
                    406:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    407:                                String::Body key=i.key();
                    408:                                Value* value=i.value();
                    409:                                if(key=="queue"){
                    410:                                        queue_c=value->as_string().cstr();
                    411:                                } else if(key=="if_unused"){
                    412:                                        if_unused=r.process(*value).as_bool();
                    413:                                } else if(key=="if_empty"){
                    414:                                        if_empty=r.process(*value).as_bool();
                    415:                                } else if(key=="nowait"){
                    416:                                        nowait=r.process(*value).as_bool();
                    417:                                } else
                    418:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    419:                        }
                    420:                }
                    421:        }
                    422:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    423:        amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
                    424:        check("delete queue", amqp_get_rpc_reply(self.connection()));
                    425: }
                    426: 
                    427: static void _bind_queue(Request& r, MethodParams& params) {
                    428:        VAmqp& self=GET_SELF(r, VAmqp);
                    429:        const char* exchange_c=0;
                    430:        const char* queue_c=0;
                    431:        const char* routing_key_c="";
                    432:        if(params.count()>0){
                    433:                if(HashStringValue* options=params.as_hash(0)){
                    434:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    435:                                String::Body key=i.key();
                    436:                                Value* value=i.value();
                    437:                                if(key=="exchange"){
                    438:                                        exchange_c=value->as_string().cstr();
                    439:                                } else if(key=="queue"){
                    440:                                        queue_c=value->as_string().cstr();
                    441:                                } else if(key=="routing_key"){
                    442:                                        routing_key_c=value->as_string().cstr();
                    443:                                } else
                    444:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    445:                        }
                    446:                }
                    447:        }
                    448:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    449:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
                    450:        check("bind queue", amqp_get_rpc_reply(self.connection()));
                    451: }
                    452: 
                    453: static void _unbind_queue(Request& r, MethodParams& params) {
                    454:        VAmqp& self=GET_SELF(r, VAmqp);
                    455:        const char* exchange_c=0;
                    456:        const char* queue_c=0;
                    457:        const char* routing_key_c="";
                    458:        if(params.count()>0){
                    459:                if(HashStringValue* options=params.as_hash(0)){
                    460:                        for(HashStringValue::Iterator i(*options); i; i.next()){
                    461:                                String::Body key=i.key();
                    462:                                Value* value=i.value();
                    463:                                if(key=="exchange"){
                    464:                                        exchange_c=value->as_string().cstr();
                    465:                                } else if(key=="queue"){
                    466:                                        queue_c=value->as_string().cstr();
                    467:                                } else if(key=="routing_key"){
                    468:                                        routing_key_c=value->as_string().cstr();
                    469:                                } else
                    470:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
                    471:                        }
                    472:                }
                    473:        }
                    474:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
                    475:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
                    476:        check("unbind queue", amqp_get_rpc_reply(self.connection()));
                    477: }
                    478: 
                    479: static void _consume(Request& r, MethodParams& params) {
                    480:        VAmqp& self=GET_SELF(r, VAmqp);
                    481:        const char* queue_c=0;
                    482:        const char* consumer_tag_c=0;
                    483:        bool no_ack=true, nowait=false;
                    484:        Junction* callback=0;
                    485: 
1.2     ! moko      486:        if(HashStringValue* options=params.as_hash(0)){
        !           487:                for(HashStringValue::Iterator i(*options); i; i.next()){
        !           488:                        String::Body key=i.key();
        !           489:                        Value* value=i.value();
        !           490:                        if(key=="queue"){
        !           491:                                queue_c=value->as_string().cstr();
        !           492:                        } else if(key=="consumer_tag"){
        !           493:                                consumer_tag_c=value->as_string().cstr();
        !           494:                        } else if(key=="no_ack"){
        !           495:                                no_ack=r.process(*value).as_bool();
        !           496:                        } else if(key=="nowait"){
        !           497:                                nowait=r.process(*value).as_bool();
        !           498:                        } else if(key=="callback"){
        !           499:                                callback=value->get_junction();
        !           500:                        } else
        !           501:                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1       moko      502:                }
                    503:        }
                    504: 
                    505:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
                    506:        if(!callback) throw Exception("amqp", 0, "callback is required");
                    507: 
                    508:        amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
                    509:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
                    510:                0 /*no_local*/, no_ack, nowait, amqp_empty_table);
                    511:        amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection());
                    512:        if(rr.reply_type != AMQP_RESPONSE_NORMAL)
                    513:                throw Exception("amqp", 0, "consume failed");
                    514: 
                    515:        self.fstop=false;
                    516:        while(!self.fstop){
1.2     ! moko      517:                amqp_envelope_t envelope;
        !           518:                memset(&envelope, 0, sizeof(envelope));
        !           519:                amqp_maybe_release_buffers(self.connection());
1.1       moko      520:                amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
                    521:                if(res.reply_type == AMQP_RESPONSE_NORMAL){
                    522:                        VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2     ! moko      523:                        h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1       moko      524:                        h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2     ! moko      525:                        h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
        !           526:                        h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1       moko      527: 
                    528:                        Value *params_cb[]={&vh};
                    529:                        METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
                    530:                                frame.store_params(params_cb, 1);
                    531:                                r.call(frame);
                    532:                        });
                    533: 
                    534:                        amqp_destroy_envelope(&envelope);
                    535:                } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
                    536:                        continue;
                    537:                } else {
                    538:                        break;
                    539:                }
                    540:        }
                    541: }
                    542: 
                    543: static void _stop_consume(Request& r, MethodParams&) {
                    544:        VAmqp& self=GET_SELF(r, VAmqp);
                    545:        self.fstop=true;
                    546: }
                    547: 
                    548: #endif
                    549: 
                    550: // constructor
                    551: MAmqp::MAmqp(): Methoded("amqp") {
                    552:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
                    553: #ifdef WITH_AMQP
                    554:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
                    555:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
                    556:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
                    557:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
                    558:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
                    559:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
                    560:        add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
                    561:        add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
                    562:        add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
                    563:        add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
                    564:        add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
                    565:        add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2     ! moko      566:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1       moko      567:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
                    568: #endif
                    569: }
                    570: 
                    571: 

E-mail: