Annotation of parser3/src/classes/amqp.C, revision 1.1

1.1     ! moko        1: /** @file
        !             2:        Parser: @b amqp parser class.
        !             3: 
        !             4:        Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
        !             5:        Authors: Konstantin Morshnev <moko@design.ru>
        !             6: */
        !             7: 
        !             8: #include "pa_vmethod_frame.h"
        !             9: 
        !            10: #include "pa_request.h"
        !            11: #include "pa_vstring.h"
        !            12: #include "pa_vhash.h"
        !            13: #include "pa_vbool.h"
        !            14: #include "pa_vamqp.h"
        !            15: 
        !            16: #ifdef WITH_AMQP
        !            17: #include <amqp.h>
        !            18: #include <amqp_tcp_socket.h>
        !            19: #include <amqp_framing.h>
        !            20: #include <stdlib.h>
        !            21: #include <string.h>
        !            22: #endif
        !            23: 
        !            24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.1 2025/10/05 00:00:00 moko Exp $" IDENT_PA_VAMQP_H;
        !            25: 
        !            26: class MAmqp: public Methoded {
        !            27: public: // VStateless_class
        !            28:        Value* create_new_value(Pool&) { return new VAmqp(); }
        !            29: public:
        !            30:        MAmqp();
        !            31: };
        !            32: 
        !            33: DECLARE_CLASS_VAR(amqp, new MAmqp);
        !            34: 
        !            35: static void _create(Request& r, MethodParams& params) {
        !            36: VAmqp& self=GET_SELF(r, VAmqp);
        !            37: 
        !            38: #ifdef WITH_AMQP
        !            39:        const char* host_c = "localhost";
        !            40:        int port = 5672;
        !            41:        const char* user_c = "guest";
        !            42:        const char* pass_c = "guest";
        !            43:        const char* vhost_c = "/";
        !            44:        const char* locale_c = "en_US";
        !            45:        int heartbeat = 30; // seconds
        !            46: 
        !            47:        if(params.count()>0){
        !            48:                if(HashStringValue* options=params.as_hash(0)){
        !            49:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !            50:                                String::Body key=i.key();
        !            51:                                Value* value=i.value();
        !            52:                                if(key=="host"){
        !            53:                                        host_c=value->as_string().cstr();
        !            54:                                } else if(key=="port"){
        !            55:                                        port=r.process(*value).as_int();
        !            56:                                } else if(key=="user"){
        !            57:                                        user_c=value->as_string().cstr();
        !            58:                                } else if(key=="password"){
        !            59:                                        pass_c=value->as_string().cstr();
        !            60:                                } else if(key=="vhost"){
        !            61:                                        vhost_c=value->as_string().cstr();
        !            62:                                } else if(key=="locale"){
        !            63:                                        locale_c=value->as_string().cstr();
        !            64:                                } else if(key=="heartbeat"){
        !            65:                                        heartbeat=r.process(*value).as_int();
        !            66:                                } else
        !            67:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !            68:                        }
        !            69:                }
        !            70:        }
        !            71: 
        !            72:        amqp_connection_state_t conn = amqp_new_connection();
        !            73:        amqp_socket_t* socket = amqp_tcp_socket_new(conn);
        !            74:        if(!socket)
        !            75:                throw Exception("amqp", 0, "failed to create TCP socket");
        !            76:        if(amqp_socket_open(socket, host_c, port))
        !            77:                throw Exception("amqp", 0, "failed to open TCP socket");
        !            78: 
        !            79:        amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
        !            80:        if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
        !            81:                amqp_destroy_connection(conn);
        !            82:                throw Exception("amqp", 0, "login failed");
        !            83:        }
        !            84: 
        !            85:        int channel = 1;
        !            86:        amqp_channel_open(conn, channel);
        !            87:        amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
        !            88:        if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
        !            89:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
        !            90:                amqp_destroy_connection(conn);
        !            91:                throw Exception("amqp", 0, "channel open failed");
        !            92:        }
        !            93: 
        !            94:        self.fconnection = conn;
        !            95:        self.fchannel = channel;
        !            96: #else
        !            97:        (void)params; (void)self;
        !            98:        throw Exception("amqp", 0, "compiled without amqp support");
        !            99: #endif
        !           100: }
        !           101: 
        !           102: #ifdef WITH_AMQP
        !           103: 
        !           104: static void check(const char* action, amqp_rpc_reply_t rr){
        !           105:        if(rr.reply_type != AMQP_RESPONSE_NORMAL)
        !           106:                throw Exception("amqp", 0, "%s failed", action);
        !           107: }
        !           108: 
        !           109: static void _publish(Request& r, MethodParams& params) {
        !           110:        VAmqp& self=GET_SELF(r, VAmqp);
        !           111:        const String &msg=params.as_string(0, "msg must be string");
        !           112:        const char* exchange_c = ""; // default exchange
        !           113:        const char* routing_key_c = 0;
        !           114:        bool have_queue_sugar=false;
        !           115:        bool mandatory=false;
        !           116: 
        !           117:        amqp_basic_properties_t props;
        !           118:        props._flags = 0;
        !           119: 
        !           120:        if(params.count()>1){
        !           121:                if(HashStringValue* options=params.as_hash(1)){
        !           122:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           123:                                String::Body key=i.key();
        !           124:                                Value* value=i.value();
        !           125:                                if(key=="exchange"){
        !           126:                                        exchange_c=value->as_string().cstr();
        !           127:                                } else if(key=="routing_key"){
        !           128:                                        routing_key_c=value->as_string().cstr();
        !           129:                                } else if(key=="queue"){
        !           130:                                        routing_key_c=value->as_string().cstr(); have_queue_sugar=true;
        !           131:                                } else if(key=="mandatory"){
        !           132:                                        mandatory=r.process(*value).as_bool();
        !           133:                                } else if(key=="properties"){
        !           134:                                        // parse message properties
        !           135:                                        if(HashStringValue* ph=value->get_hash()){
        !           136:                                                for(HashStringValue::Iterator p(*ph); p; p.next()){
        !           137:                                                        String::Body pkey=p.key();
        !           138:                                                        Value* pval=p.value();
        !           139:                                                        if(pkey=="content_type"){
        !           140:                                                                const char* v=pval->as_string().cstr();
        !           141:                                                                props.content_type=amqp_cstring_bytes(v);
        !           142:                                                                props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
        !           143:                                                        } else if(pkey=="content_encoding"){
        !           144:                                                                const char* v=pval->as_string().cstr();
        !           145:                                                                props.content_encoding=amqp_cstring_bytes(v);
        !           146:                                                                props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
        !           147:                                                        } else if(pkey=="delivery_mode"){
        !           148:                                                                uint8_t dm=(uint8_t)pval->as_int();
        !           149:                                                                props.delivery_mode=dm;
        !           150:                                                                props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
        !           151:                                                        } else if(pkey=="priority"){
        !           152:                                                                uint8_t pr=(uint8_t)pval->as_int();
        !           153:                                                                props.priority=pr;
        !           154:                                                                props._flags|=AMQP_BASIC_PRIORITY_FLAG;
        !           155:                                                        } else if(pkey=="correlation_id"){
        !           156:                                                                const char* v=pval->as_string().cstr();
        !           157:                                                                props.correlation_id=amqp_cstring_bytes(v);
        !           158:                                                                props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
        !           159:                                                        } else if(pkey=="reply_to"){
        !           160:                                                                const char* v=pval->as_string().cstr();
        !           161:                                                                props.reply_to=amqp_cstring_bytes(v);
        !           162:                                                                props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
        !           163:                                                        } else if(pkey=="expiration"){
        !           164:                                                                const char* v=pval->as_string().cstr();
        !           165:                                                                props.expiration=amqp_cstring_bytes(v);
        !           166:                                                                props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
        !           167:                                                        } else if(pkey=="message_id"){
        !           168:                                                                const char* v=pval->as_string().cstr();
        !           169:                                                                props.message_id=amqp_cstring_bytes(v);
        !           170:                                                                props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
        !           171:                                                        } else if(pkey=="timestamp"){
        !           172:                                                                uint64_t ts=(uint64_t)pval->as_double();
        !           173:                                                                props.timestamp=ts;
        !           174:                                                                props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
        !           175:                                                        } else if(pkey=="type"){
        !           176:                                                                const char* v=pval->as_string().cstr();
        !           177:                                                                props.type=amqp_cstring_bytes(v);
        !           178:                                                                props._flags|=AMQP_BASIC_TYPE_FLAG;
        !           179:                                                        } else if(pkey=="user_id"){
        !           180:                                                                const char* v=pval->as_string().cstr();
        !           181:                                                                props.user_id=amqp_cstring_bytes(v);
        !           182:                                                                props._flags|=AMQP_BASIC_USER_ID_FLAG;
        !           183:                                                        } else if(pkey=="app_id"){
        !           184:                                                                const char* v=pval->as_string().cstr();
        !           185:                                                                props.app_id=amqp_cstring_bytes(v);
        !           186:                                                                props._flags|=AMQP_BASIC_APP_ID_FLAG;
        !           187:                                                        } else if(pkey=="headers"){
        !           188: /*                                                             if(HashStringValue* hh=pval->get_hash()){
        !           189:                                                                        size_t count=hh->count();
        !           190:                                                                        amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
        !           191:                                                                        size_t idx=0;
        !           192:                                                                        for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
        !           193:                                                                                String::Body hkey=hi.key();
        !           194:                                                                                const char* hv=hi.value()->as_string().cstr();
        !           195:                                                                                entries[idx].key=amqp_cstring_bytes(hkey.cstr());
        !           196:                                                                                entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
        !           197:                                                                                entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
        !           198:                                                                                idx++;
        !           199:                                                                        }
        !           200:                                                                        props.headers.num_entries=(int)count;
        !           201:                                                                        props.headers.entries=entries;
        !           202:                                                                        props._flags|=AMQP_BASIC_HEADERS_FLAG;
        !           203:                                                                }
        !           204: */                                                     } else
        !           205:                                                                throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           206:                                                }
        !           207:                                        }
        !           208:                                } else
        !           209:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           210:                        }
        !           211:                }
        !           212:        }
        !           213: 
        !           214:        if(!routing_key_c)
        !           215:                throw Exception("amqp", 0, "routing_key or queue must be specified");
        !           216: 
        !           217:        amqp_bytes_t body;
        !           218:        body.len = msg.length();
        !           219:        body.bytes=(void*)msg.cstr();
        !           220: 
        !           221:        int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body);
        !           222: 
        !           223:        if(ret!=AMQP_STATUS_OK)
        !           224:                throw Exception("amqp", 0, "publish failed");
        !           225: 
        !           226:        // free temporary headers entries if allocated
        !           227:        if(props._flags & AMQP_BASIC_HEADERS_FLAG){
        !           228: //             delete [] props.headers.entries;
        !           229:        }
        !           230:        (void)have_queue_sugar;
        !           231: }
        !           232: 
        !           233: static void _release(Request& r, MethodParams&) {
        !           234:        VAmqp& self=GET_SELF(r, VAmqp);
        !           235:        if(self.fconnection){
        !           236:                amqp_connection_state_t conn=self.fconnection;
        !           237:                amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
        !           238:                amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
        !           239:                amqp_destroy_connection(conn);
        !           240:                self.fconnection=0;
        !           241:                self.fchannel=0;
        !           242:        }
        !           243: }
        !           244: 
        !           245: static void _ack(Request& r, MethodParams& params) {
        !           246:        VAmqp& self=GET_SELF(r, VAmqp);
        !           247:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
        !           248:        int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
        !           249:        if(ret!=AMQP_STATUS_OK)
        !           250:                throw Exception("amqp", 0, "ack failed");
        !           251: }
        !           252: 
        !           253: static void _nack(Request& r, MethodParams& params) {
        !           254:        VAmqp& self=GET_SELF(r, VAmqp);
        !           255:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
        !           256:        bool requeue=false;
        !           257:        if(params.count()>1){
        !           258:                if(HashStringValue* options=params.as_hash(1)){
        !           259:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           260:                                if(i.key()=="requeue"){
        !           261:                                        requeue=r.process(*i.value()).as_bool();
        !           262:                                } else
        !           263:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           264:                        }
        !           265:                }
        !           266:        }
        !           267:        int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
        !           268:        if(ret!=AMQP_STATUS_OK)
        !           269:                throw Exception("amqp", 0, "nack failed");
        !           270: }
        !           271: 
        !           272: static void _qos(Request& r, MethodParams& params) {
        !           273:        VAmqp& self=GET_SELF(r, VAmqp);
        !           274:        uint16_t prefetch_count=0;
        !           275:        if(params.count()>0){
        !           276:                if(HashStringValue* options=params.as_hash(0)){
        !           277:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           278:                                if(i.key()=="prefetch_count"){
        !           279:                                        int pc=r.process(*i.value()).as_int();
        !           280:                                        prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
        !           281:                                } else
        !           282:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           283:                        }
        !           284:                }
        !           285:        }
        !           286:        amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
        !           287:        if(!ret)
        !           288:                throw Exception("amqp", 0, "qos failed");
        !           289: }
        !           290: 
        !           291: static void _reject(Request& r, MethodParams& params) {
        !           292:        VAmqp& self=GET_SELF(r, VAmqp);
        !           293:        const String &tag_s=params.as_string(0, "delivery tag must not be code");
        !           294:        bool requeue=true; // by default return to queue
        !           295:        if(params.count()>1){
        !           296:                if(HashStringValue* options = params.as_hash(1)){
        !           297:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           298:                                if(i.key() == "requeue"){
        !           299:                                        requeue=r.process(*i.value()).as_bool();
        !           300:                                } else
        !           301:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           302:                        }
        !           303:                }
        !           304:        }
        !           305:        int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
        !           306:        if(ret!=AMQP_STATUS_OK)
        !           307:                throw Exception("amqp", 0, "reject failed");
        !           308: }
        !           309: 
        !           310: static void _declare_exchange(Request& r, MethodParams& params) {
        !           311:        VAmqp& self=GET_SELF(r, VAmqp);
        !           312:        const char* name_c = 0;
        !           313:        const char* type_c = "direct";
        !           314:        bool passive=false, durable=false, auto_delete=true, nowait=false;
        !           315:        if(params.count()>0){
        !           316:                if(HashStringValue* options=params.as_hash(0)){
        !           317:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           318:                                String::Body key=i.key();
        !           319:                                Value* value=i.value();
        !           320:                                if(key=="name"){
        !           321:                                        name_c=value->as_string().cstr();
        !           322:                                } else if(key=="type"){
        !           323:                                        type_c=value->as_string().cstr();
        !           324:                                } else if(key=="passive"){
        !           325:                                        passive=r.process(*value).as_bool();
        !           326:                                } else if(key=="durable"){
        !           327:                                        durable=r.process(*value).as_bool();
        !           328:                                } else if(key=="auto_delete"){
        !           329:                                        auto_delete=r.process(*value).as_bool();
        !           330:                                } else if(key=="nowait"){
        !           331:                                        nowait=r.process(*value).as_bool();
        !           332:                                } else
        !           333:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           334:                        }
        !           335:                }
        !           336:        }
        !           337:        if(!name_c)
        !           338:                throw Exception("amqp", 0, "name is required");
        !           339:        amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
        !           340:        check("declare exchange", amqp_get_rpc_reply(self.connection()));
        !           341: }
        !           342: 
        !           343: static void _delete_exchange(Request& r, MethodParams& params) {
        !           344:        VAmqp& self=GET_SELF(r, VAmqp);
        !           345:        const char* name_c = 0;
        !           346:        bool if_unused=false, nowait=false;
        !           347:        if(params.count()>0){
        !           348:                if(HashStringValue* options=params.as_hash(0)){
        !           349:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           350:                                String::Body key=i.key();
        !           351:                                Value* value=i.value();
        !           352:                                if(key=="exchange"){
        !           353:                                        name_c=value->as_string().cstr();
        !           354:                                } else if(key=="if_unused"){
        !           355:                                        if_unused=r.process(*value).as_bool();
        !           356:                                } else if(key=="nowait"){
        !           357:                                        nowait=r.process(*value).as_bool();
        !           358:                                } else
        !           359:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           360:                        }
        !           361:                }
        !           362:        }
        !           363:        if(!name_c) throw Exception("amqp", 0, "exchange is required");
        !           364:        amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
        !           365:        check("delete exchange", amqp_get_rpc_reply(self.connection()));
        !           366: }
        !           367: 
        !           368: static void _declare_queue(Request& r, MethodParams& params) {
        !           369:        VAmqp& self=GET_SELF(r, VAmqp);
        !           370:        const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
        !           371:        if(params.count()>0){
        !           372:                if(HashStringValue* options=params.as_hash(0)){
        !           373:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           374:                                String::Body key=i.key();
        !           375:                                Value* value=i.value();
        !           376:                                if(key=="queue"){
        !           377:                                        queue_c=value->as_string().cstr();
        !           378:                                } else if(key=="passive"){
        !           379:                                        passive=r.process(*value).as_bool();
        !           380:                                } else if(key=="durable"){
        !           381:                                        durable=r.process(*value).as_bool();
        !           382:                                } else if(key=="auto_delete"){
        !           383:                                        auto_delete=r.process(*value).as_bool();
        !           384:                                } else if(key=="nowait"){
        !           385:                                        nowait=r.process(*value).as_bool();
        !           386:                                } else
        !           387:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           388:                        }
        !           389:                }
        !           390:        }
        !           391:        amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
        !           392:        check("declare queue", amqp_get_rpc_reply(self.connection()));
        !           393:        if(!queue_c && ok){
        !           394:                r.write(*new String(String::C(pa_strdup((const char *)ok->queue.bytes, ok->queue.len), ok->queue.len)));
        !           395:        }
        !           396: }
        !           397: 
        !           398: static void _delete_queue(Request& r, MethodParams& params) {
        !           399:        VAmqp& self=GET_SELF(r, VAmqp);
        !           400:        const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
        !           401:        if(params.count()>0){
        !           402:                if(HashStringValue* options=params.as_hash(0)){
        !           403:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           404:                                String::Body key=i.key();
        !           405:                                Value* value=i.value();
        !           406:                                if(key=="queue"){
        !           407:                                        queue_c=value->as_string().cstr();
        !           408:                                } else if(key=="if_unused"){
        !           409:                                        if_unused=r.process(*value).as_bool();
        !           410:                                } else if(key=="if_empty"){
        !           411:                                        if_empty=r.process(*value).as_bool();
        !           412:                                } else if(key=="nowait"){
        !           413:                                        nowait=r.process(*value).as_bool();
        !           414:                                } else
        !           415:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           416:                        }
        !           417:                }
        !           418:        }
        !           419:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
        !           420:        amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
        !           421:        check("delete queue", amqp_get_rpc_reply(self.connection()));
        !           422: }
        !           423: 
        !           424: static void _bind_queue(Request& r, MethodParams& params) {
        !           425:        VAmqp& self=GET_SELF(r, VAmqp);
        !           426:        const char* exchange_c=0;
        !           427:        const char* queue_c=0;
        !           428:        const char* routing_key_c="";
        !           429:        if(params.count()>0){
        !           430:                if(HashStringValue* options=params.as_hash(0)){
        !           431:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           432:                                String::Body key=i.key();
        !           433:                                Value* value=i.value();
        !           434:                                if(key=="exchange"){
        !           435:                                        exchange_c=value->as_string().cstr();
        !           436:                                } else if(key=="queue"){
        !           437:                                        queue_c=value->as_string().cstr();
        !           438:                                } else if(key=="routing_key"){
        !           439:                                        routing_key_c=value->as_string().cstr();
        !           440:                                } else
        !           441:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           442:                        }
        !           443:                }
        !           444:        }
        !           445:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
        !           446:        amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
        !           447:        check("bind queue", amqp_get_rpc_reply(self.connection()));
        !           448: }
        !           449: 
        !           450: static void _unbind_queue(Request& r, MethodParams& params) {
        !           451:        VAmqp& self=GET_SELF(r, VAmqp);
        !           452:        const char* exchange_c=0;
        !           453:        const char* queue_c=0;
        !           454:        const char* routing_key_c="";
        !           455:        if(params.count()>0){
        !           456:                if(HashStringValue* options=params.as_hash(0)){
        !           457:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           458:                                String::Body key=i.key();
        !           459:                                Value* value=i.value();
        !           460:                                if(key=="exchange"){
        !           461:                                        exchange_c=value->as_string().cstr();
        !           462:                                } else if(key=="queue"){
        !           463:                                        queue_c=value->as_string().cstr();
        !           464:                                } else if(key=="routing_key"){
        !           465:                                        routing_key_c=value->as_string().cstr();
        !           466:                                } else
        !           467:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           468:                        }
        !           469:                }
        !           470:        }
        !           471:        if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
        !           472:        amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
        !           473:        check("unbind queue", amqp_get_rpc_reply(self.connection()));
        !           474: }
        !           475: 
        !           476: static void _consume(Request& r, MethodParams& params) {
        !           477:        VAmqp& self=GET_SELF(r, VAmqp);
        !           478:        const char* queue_c=0;
        !           479:        const char* consumer_tag_c=0;
        !           480:        bool no_ack=true, nowait=false;
        !           481:        Junction* callback=0;
        !           482: 
        !           483:        if(params.count()>0){
        !           484:                if(HashStringValue* options=params.as_hash(0)){
        !           485:                        for(HashStringValue::Iterator i(*options); i; i.next()){
        !           486:                                String::Body key=i.key();
        !           487:                                Value* value=i.value();
        !           488:                                if(key=="queue"){
        !           489:                                        queue_c=value->as_string().cstr();
        !           490:                                } else if(key=="consumer_tag"){
        !           491:                                        consumer_tag_c=value->as_string().cstr();
        !           492:                                } else if(key=="no_ack"){
        !           493:                                        no_ack=r.process(*value).as_bool();
        !           494:                                } else if(key=="nowait"){
        !           495:                                        nowait=r.process(*value).as_bool();
        !           496:                                } else if(key=="callback"){
        !           497:                                        callback=value->get_junction();
        !           498:                                } else
        !           499:                                        throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
        !           500:                        }
        !           501:                }
        !           502:        }
        !           503: 
        !           504:        if(!queue_c) throw Exception("amqp", 0, "queue is required");
        !           505:        if(!callback) throw Exception("amqp", 0, "callback is required");
        !           506: 
        !           507:        amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
        !           508:                consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
        !           509:                0 /*no_local*/, no_ack, nowait, amqp_empty_table);
        !           510:        amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection());
        !           511:        if(rr.reply_type != AMQP_RESPONSE_NORMAL)
        !           512:                throw Exception("amqp", 0, "consume failed");
        !           513: 
        !           514:        self.fstop=false;
        !           515:        while(!self.fstop){
        !           516:                amqp_envelope_t envelope; amqp_maybe_release_buffers(self.connection());
        !           517:                amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
        !           518:                if(res.reply_type == AMQP_RESPONSE_NORMAL){
        !           519:                        VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
        !           520:                        String msg(String::Body((const unsigned char*)envelope.message.body.bytes, envelope.message.body.len), String::L_CLEAN);
        !           521:                        h->put("msg", new VString(msg));
        !           522:                        h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
        !           523:                        h->put("consumer_tag", new VString(String::Body((const unsigned char*)envelope.consumer_tag.bytes, envelope.consumer_tag.len)));
        !           524:                        h->put("exchange", new VString(String::Body((const unsigned char*)envelope.exchange.bytes, envelope.exchange.len)));
        !           525: 
        !           526:                        Value *params_cb[]={&vh};
        !           527:                        METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
        !           528:                                frame.store_params(params_cb, 1);
        !           529:                                r.call(frame);
        !           530:                        });
        !           531: 
        !           532:                        amqp_destroy_envelope(&envelope);
        !           533:                } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
        !           534:                        continue;
        !           535:                } else {
        !           536:                        break;
        !           537:                }
        !           538:        }
        !           539: }
        !           540: 
        !           541: static void _stop_consume(Request& r, MethodParams&) {
        !           542:        VAmqp& self=GET_SELF(r, VAmqp);
        !           543:        self.fstop=true;
        !           544: }
        !           545: 
        !           546: #endif
        !           547: 
        !           548: // constructor
        !           549: MAmqp::MAmqp(): Methoded("amqp") {
        !           550:        add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
        !           551: #ifdef WITH_AMQP
        !           552:        add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
        !           553:        add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
        !           554:        add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
        !           555:        add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
        !           556:        add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
        !           557:        add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
        !           558:        add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
        !           559:        add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
        !           560:        add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
        !           561:        add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
        !           562:        add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
        !           563:        add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
        !           564:        add_native_method("consume", Method::CT_DYNAMIC, _consume, 0, 1);
        !           565:        add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
        !           566: #endif
        !           567: }
        !           568: 
        !           569: 

E-mail: