Annotation of parser3/src/classes/amqp.C, revision 1.3
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
13: #include "pa_vbool.h"
14: #include "pa_vamqp.h"
15:
16: #ifdef WITH_AMQP
17: #include <amqp.h>
18: #include <amqp_tcp_socket.h>
19: #include <amqp_framing.h>
20: #include <stdlib.h>
21: #include <string.h>
22: #endif
23:
1.3 ! moko 24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.2 2025/11/07 01:26:13 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 25:
26: class MAmqp: public Methoded {
27: public: // VStateless_class
28: Value* create_new_value(Pool&) { return new VAmqp(); }
29: public:
30: MAmqp();
31: };
32:
33: DECLARE_CLASS_VAR(amqp, new MAmqp);
34:
35: static void _create(Request& r, MethodParams& params) {
36: VAmqp& self=GET_SELF(r, VAmqp);
37:
38: #ifdef WITH_AMQP
39: const char* host_c = "localhost";
40: int port = 5672;
41: const char* user_c = "guest";
42: const char* pass_c = "guest";
43: const char* vhost_c = "/";
44: const char* locale_c = "en_US";
45: int heartbeat = 30; // seconds
46:
47: if(params.count()>0){
48: if(HashStringValue* options=params.as_hash(0)){
49: for(HashStringValue::Iterator i(*options); i; i.next()){
50: String::Body key=i.key();
51: Value* value=i.value();
52: if(key=="host"){
53: host_c=value->as_string().cstr();
54: } else if(key=="port"){
55: port=r.process(*value).as_int();
56: } else if(key=="user"){
57: user_c=value->as_string().cstr();
58: } else if(key=="password"){
59: pass_c=value->as_string().cstr();
60: } else if(key=="vhost"){
61: vhost_c=value->as_string().cstr();
62: } else if(key=="locale"){
63: locale_c=value->as_string().cstr();
64: } else if(key=="heartbeat"){
65: heartbeat=r.process(*value).as_int();
66: } else
67: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
68: }
69: }
70: }
71:
72: amqp_connection_state_t conn = amqp_new_connection();
73: amqp_socket_t* socket = amqp_tcp_socket_new(conn);
74: if(!socket)
75: throw Exception("amqp", 0, "failed to create TCP socket");
76: if(amqp_socket_open(socket, host_c, port))
77: throw Exception("amqp", 0, "failed to open TCP socket");
78:
79: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
80: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
81: amqp_destroy_connection(conn);
82: throw Exception("amqp", 0, "login failed");
83: }
84:
85: int channel = 1;
86: amqp_channel_open(conn, channel);
87: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
88: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
89: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
90: amqp_destroy_connection(conn);
91: throw Exception("amqp", 0, "channel open failed");
92: }
93:
94: self.fconnection = conn;
95: self.fchannel = channel;
96: #else
97: (void)params; (void)self;
98: throw Exception("amqp", 0, "compiled without amqp support");
99: #endif
100: }
101:
102: #ifdef WITH_AMQP
103:
104: static void check(const char* action, amqp_rpc_reply_t rr){
105: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
106: throw Exception("amqp", 0, "%s failed", action);
107: }
108:
1.2 moko 109: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
110: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
111:
1.1 moko 112: static void _publish(Request& r, MethodParams& params) {
113: VAmqp& self=GET_SELF(r, VAmqp);
114: const String &msg=params.as_string(0, "msg must be string");
115: const char* exchange_c = ""; // default exchange
116: const char* routing_key_c = 0;
117: bool mandatory=false;
118:
119: amqp_basic_properties_t props;
120: props._flags = 0;
121:
122: if(params.count()>1){
123: if(HashStringValue* options=params.as_hash(1)){
124: for(HashStringValue::Iterator i(*options); i; i.next()){
125: String::Body key=i.key();
126: Value* value=i.value();
127: if(key=="exchange"){
128: exchange_c=value->as_string().cstr();
129: } else if(key=="routing_key"){
130: routing_key_c=value->as_string().cstr();
131: } else if(key=="queue"){
1.3 ! moko 132: routing_key_c=value->as_string().cstr();
1.1 moko 133: } else if(key=="mandatory"){
134: mandatory=r.process(*value).as_bool();
1.3 ! moko 135: } else if(key=="content_type"){
! 136: const char* v=value->as_string().cstr();
! 137: props.content_type=amqp_cstring_bytes(v);
! 138: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
! 139: } else if(key=="content_encoding"){
! 140: const char* v=value->as_string().cstr();
! 141: props.content_encoding=amqp_cstring_bytes(v);
! 142: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
! 143: } else if(key=="delivery_mode"){
! 144: uint8_t dm=(uint8_t)value->as_int();
! 145: props.delivery_mode=dm;
! 146: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
! 147: } else if(key=="priority"){
! 148: uint8_t pr=(uint8_t)value->as_int();
! 149: props.priority=pr;
! 150: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
! 151: } else if(key=="correlation_id"){
! 152: const char* v=value->as_string().cstr();
! 153: props.correlation_id=amqp_cstring_bytes(v);
! 154: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
! 155: } else if(key=="reply_to"){
! 156: const char* v=value->as_string().cstr();
! 157: props.reply_to=amqp_cstring_bytes(v);
! 158: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
! 159: } else if(key=="expiration"){
! 160: const char* v=value->as_string().cstr();
! 161: props.expiration=amqp_cstring_bytes(v);
! 162: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
! 163: } else if(key=="message_id"){
! 164: const char* v=value->as_string().cstr();
! 165: props.message_id=amqp_cstring_bytes(v);
! 166: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
! 167: } else if(key=="timestamp"){
! 168: uint64_t ts=(uint64_t)value->as_double();
! 169: props.timestamp=ts;
! 170: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
! 171: } else if(key=="type"){
! 172: const char* v=value->as_string().cstr();
! 173: props.type=amqp_cstring_bytes(v);
! 174: props._flags|=AMQP_BASIC_TYPE_FLAG;
! 175: } else if(key=="user_id"){
! 176: const char* v=value->as_string().cstr();
! 177: props.user_id=amqp_cstring_bytes(v);
! 178: props._flags|=AMQP_BASIC_USER_ID_FLAG;
! 179: } else if(key=="app_id"){
! 180: const char* v=value->as_string().cstr();
! 181: props.app_id=amqp_cstring_bytes(v);
! 182: props._flags|=AMQP_BASIC_APP_ID_FLAG;
! 183: } else if(key=="headers"){
! 184: /* if(HashStringValue* hh=pval->get_hash()){
! 185: size_t count=hh->count();
! 186: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
! 187: size_t idx=0;
! 188: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
! 189: String::Body hkey=hi.key();
! 190: const char* hv=hi.value()->as_string().cstr();
! 191: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
! 192: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
! 193: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
! 194: idx++;
1.1 moko 195: }
1.3 ! moko 196: props.headers.num_entries=(int)count;
! 197: props.headers.entries=entries;
! 198: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 199: }
1.3 ! moko 200: */ } else
1.1 moko 201: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
202: }
203: }
204: }
205:
206: if(!routing_key_c)
207: throw Exception("amqp", 0, "routing_key or queue must be specified");
208:
209: amqp_bytes_t body;
210: body.len = msg.length();
211: body.bytes=(void*)msg.cstr();
212:
1.3 ! moko 213: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 214:
215: if(ret!=AMQP_STATUS_OK)
216: throw Exception("amqp", 0, "publish failed");
217:
218: // free temporary headers entries if allocated
219: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
220: // delete [] props.headers.entries;
221: }
222: }
223:
224: static void _release(Request& r, MethodParams&) {
225: VAmqp& self=GET_SELF(r, VAmqp);
226: if(self.fconnection){
227: amqp_connection_state_t conn=self.fconnection;
228: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
229: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
230: amqp_destroy_connection(conn);
231: self.fconnection=0;
232: self.fchannel=0;
233: }
234: }
235:
236: static void _ack(Request& r, MethodParams& params) {
237: VAmqp& self=GET_SELF(r, VAmqp);
238: const String &tag_s=params.as_string(0, "delivery tag must not be code");
239: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
240: if(ret!=AMQP_STATUS_OK)
241: throw Exception("amqp", 0, "ack failed");
242: }
243:
244: static void _nack(Request& r, MethodParams& params) {
245: VAmqp& self=GET_SELF(r, VAmqp);
246: const String &tag_s=params.as_string(0, "delivery tag must not be code");
247: bool requeue=false;
248: if(params.count()>1){
249: if(HashStringValue* options=params.as_hash(1)){
250: for(HashStringValue::Iterator i(*options); i; i.next()){
251: if(i.key()=="requeue"){
252: requeue=r.process(*i.value()).as_bool();
253: } else
254: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
255: }
256: }
257: }
258: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
259: if(ret!=AMQP_STATUS_OK)
260: throw Exception("amqp", 0, "nack failed");
261: }
262:
263: static void _qos(Request& r, MethodParams& params) {
264: VAmqp& self=GET_SELF(r, VAmqp);
265: uint16_t prefetch_count=0;
266: if(params.count()>0){
267: if(HashStringValue* options=params.as_hash(0)){
268: for(HashStringValue::Iterator i(*options); i; i.next()){
269: if(i.key()=="prefetch_count"){
270: int pc=r.process(*i.value()).as_int();
271: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
272: } else
273: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
274: }
275: }
276: }
277: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
278: if(!ret)
279: throw Exception("amqp", 0, "qos failed");
280: }
281:
282: static void _reject(Request& r, MethodParams& params) {
283: VAmqp& self=GET_SELF(r, VAmqp);
284: const String &tag_s=params.as_string(0, "delivery tag must not be code");
285: bool requeue=true; // by default return to queue
286: if(params.count()>1){
287: if(HashStringValue* options = params.as_hash(1)){
288: for(HashStringValue::Iterator i(*options); i; i.next()){
289: if(i.key() == "requeue"){
290: requeue=r.process(*i.value()).as_bool();
291: } else
292: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
293: }
294: }
295: }
296: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
297: if(ret!=AMQP_STATUS_OK)
298: throw Exception("amqp", 0, "reject failed");
299: }
300:
301: static void _declare_exchange(Request& r, MethodParams& params) {
302: VAmqp& self=GET_SELF(r, VAmqp);
303: const char* name_c = 0;
304: const char* type_c = "direct";
305: bool passive=false, durable=false, auto_delete=true, nowait=false;
306: if(params.count()>0){
307: if(HashStringValue* options=params.as_hash(0)){
308: for(HashStringValue::Iterator i(*options); i; i.next()){
309: String::Body key=i.key();
310: Value* value=i.value();
311: if(key=="name"){
312: name_c=value->as_string().cstr();
313: } else if(key=="type"){
314: type_c=value->as_string().cstr();
315: } else if(key=="passive"){
316: passive=r.process(*value).as_bool();
317: } else if(key=="durable"){
318: durable=r.process(*value).as_bool();
319: } else if(key=="auto_delete"){
320: auto_delete=r.process(*value).as_bool();
321: } else if(key=="nowait"){
322: nowait=r.process(*value).as_bool();
323: } else
324: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
325: }
326: }
327: }
328: if(!name_c)
329: throw Exception("amqp", 0, "name is required");
330: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
331: check("declare exchange", amqp_get_rpc_reply(self.connection()));
332: }
333:
334: static void _delete_exchange(Request& r, MethodParams& params) {
335: VAmqp& self=GET_SELF(r, VAmqp);
336: const char* name_c = 0;
337: bool if_unused=false, nowait=false;
338: if(params.count()>0){
339: if(HashStringValue* options=params.as_hash(0)){
340: for(HashStringValue::Iterator i(*options); i; i.next()){
341: String::Body key=i.key();
342: Value* value=i.value();
343: if(key=="exchange"){
344: name_c=value->as_string().cstr();
345: } else if(key=="if_unused"){
346: if_unused=r.process(*value).as_bool();
347: } else if(key=="nowait"){
348: nowait=r.process(*value).as_bool();
349: } else
350: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
351: }
352: }
353: }
354: if(!name_c) throw Exception("amqp", 0, "exchange is required");
355: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
356: check("delete exchange", amqp_get_rpc_reply(self.connection()));
357: }
358:
359: static void _declare_queue(Request& r, MethodParams& params) {
360: VAmqp& self=GET_SELF(r, VAmqp);
361: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
362: if(params.count()>0){
363: if(HashStringValue* options=params.as_hash(0)){
364: for(HashStringValue::Iterator i(*options); i; i.next()){
365: String::Body key=i.key();
366: Value* value=i.value();
367: if(key=="queue"){
368: queue_c=value->as_string().cstr();
369: } else if(key=="passive"){
370: passive=r.process(*value).as_bool();
371: } else if(key=="durable"){
372: durable=r.process(*value).as_bool();
373: } else if(key=="auto_delete"){
374: auto_delete=r.process(*value).as_bool();
375: } else if(key=="nowait"){
376: nowait=r.process(*value).as_bool();
377: } else
378: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
379: }
380: }
381: }
382: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
383: check("declare queue", amqp_get_rpc_reply(self.connection()));
384: if(!queue_c && ok){
1.2 moko 385: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 386: }
387: }
388:
389: static void _delete_queue(Request& r, MethodParams& params) {
390: VAmqp& self=GET_SELF(r, VAmqp);
391: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
392: if(params.count()>0){
393: if(HashStringValue* options=params.as_hash(0)){
394: for(HashStringValue::Iterator i(*options); i; i.next()){
395: String::Body key=i.key();
396: Value* value=i.value();
397: if(key=="queue"){
398: queue_c=value->as_string().cstr();
399: } else if(key=="if_unused"){
400: if_unused=r.process(*value).as_bool();
401: } else if(key=="if_empty"){
402: if_empty=r.process(*value).as_bool();
403: } else if(key=="nowait"){
404: nowait=r.process(*value).as_bool();
405: } else
406: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
407: }
408: }
409: }
410: if(!queue_c) throw Exception("amqp", 0, "queue is required");
411: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
412: check("delete queue", amqp_get_rpc_reply(self.connection()));
413: }
414:
415: static void _bind_queue(Request& r, MethodParams& params) {
416: VAmqp& self=GET_SELF(r, VAmqp);
417: const char* exchange_c=0;
418: const char* queue_c=0;
419: const char* routing_key_c="";
420: if(params.count()>0){
421: if(HashStringValue* options=params.as_hash(0)){
422: for(HashStringValue::Iterator i(*options); i; i.next()){
423: String::Body key=i.key();
424: Value* value=i.value();
425: if(key=="exchange"){
426: exchange_c=value->as_string().cstr();
427: } else if(key=="queue"){
428: queue_c=value->as_string().cstr();
429: } else if(key=="routing_key"){
430: routing_key_c=value->as_string().cstr();
431: } else
432: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
433: }
434: }
435: }
436: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
437: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
438: check("bind queue", amqp_get_rpc_reply(self.connection()));
439: }
440:
441: static void _unbind_queue(Request& r, MethodParams& params) {
442: VAmqp& self=GET_SELF(r, VAmqp);
443: const char* exchange_c=0;
444: const char* queue_c=0;
445: const char* routing_key_c="";
446: if(params.count()>0){
447: if(HashStringValue* options=params.as_hash(0)){
448: for(HashStringValue::Iterator i(*options); i; i.next()){
449: String::Body key=i.key();
450: Value* value=i.value();
451: if(key=="exchange"){
452: exchange_c=value->as_string().cstr();
453: } else if(key=="queue"){
454: queue_c=value->as_string().cstr();
455: } else if(key=="routing_key"){
456: routing_key_c=value->as_string().cstr();
457: } else
458: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
459: }
460: }
461: }
462: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
463: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
464: check("unbind queue", amqp_get_rpc_reply(self.connection()));
465: }
466:
467: static void _consume(Request& r, MethodParams& params) {
468: VAmqp& self=GET_SELF(r, VAmqp);
469: const char* queue_c=0;
470: const char* consumer_tag_c=0;
471: bool no_ack=true, nowait=false;
472: Junction* callback=0;
473:
1.2 moko 474: if(HashStringValue* options=params.as_hash(0)){
475: for(HashStringValue::Iterator i(*options); i; i.next()){
476: String::Body key=i.key();
477: Value* value=i.value();
1.3 ! moko 478: if(key=="callback"){
! 479: callback=value->get_junction();
! 480: } else if(key=="queue"){
1.2 moko 481: queue_c=value->as_string().cstr();
482: } else if(key=="consumer_tag"){
483: consumer_tag_c=value->as_string().cstr();
484: } else if(key=="no_ack"){
485: no_ack=r.process(*value).as_bool();
486: } else if(key=="nowait"){
487: nowait=r.process(*value).as_bool();
488: } else
489: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 490: }
491: }
492:
493: if(!queue_c) throw Exception("amqp", 0, "queue is required");
494: if(!callback) throw Exception("amqp", 0, "callback is required");
495:
496: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
497: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
498: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
499: amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection());
500: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
501: throw Exception("amqp", 0, "consume failed");
502:
503: self.fstop=false;
504: while(!self.fstop){
1.2 moko 505: amqp_envelope_t envelope;
506: memset(&envelope, 0, sizeof(envelope));
507: amqp_maybe_release_buffers(self.connection());
1.1 moko 508: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
509: if(res.reply_type == AMQP_RESPONSE_NORMAL){
510: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2 moko 511: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1 moko 512: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2 moko 513: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
514: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1 moko 515:
516: Value *params_cb[]={&vh};
517: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
518: frame.store_params(params_cb, 1);
519: r.call(frame);
520: });
521:
522: amqp_destroy_envelope(&envelope);
523: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
524: continue;
525: } else {
526: break;
527: }
528: }
529: }
530:
531: static void _stop_consume(Request& r, MethodParams&) {
532: VAmqp& self=GET_SELF(r, VAmqp);
533: self.fstop=true;
534: }
535:
536: #endif
537:
538: // constructor
539: MAmqp::MAmqp(): Methoded("amqp") {
540: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
541: #ifdef WITH_AMQP
542: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
543: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
544: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
545: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
546: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
547: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
548: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
549: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
550: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
551: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
552: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
553: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2 moko 554: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 555: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
556: #endif
557: }
E-mail: