|
|
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
13: #include "pa_vbool.h"
14: #include "pa_vamqp.h"
15:
16: #ifdef WITH_AMQP
17: #include <amqp.h>
18: #include <amqp_tcp_socket.h>
19: #include <amqp_framing.h>
20: #include <stdlib.h>
21: #include <string.h>
22: #endif
23:
1.2 ! moko 24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.1 2025/11/06 22:08:03 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 25:
26: class MAmqp: public Methoded {
27: public: // VStateless_class
28: Value* create_new_value(Pool&) { return new VAmqp(); }
29: public:
30: MAmqp();
31: };
32:
33: DECLARE_CLASS_VAR(amqp, new MAmqp);
34:
35: static void _create(Request& r, MethodParams& params) {
36: VAmqp& self=GET_SELF(r, VAmqp);
37:
38: #ifdef WITH_AMQP
39: const char* host_c = "localhost";
40: int port = 5672;
41: const char* user_c = "guest";
42: const char* pass_c = "guest";
43: const char* vhost_c = "/";
44: const char* locale_c = "en_US";
45: int heartbeat = 30; // seconds
46:
47: if(params.count()>0){
48: if(HashStringValue* options=params.as_hash(0)){
49: for(HashStringValue::Iterator i(*options); i; i.next()){
50: String::Body key=i.key();
51: Value* value=i.value();
52: if(key=="host"){
53: host_c=value->as_string().cstr();
54: } else if(key=="port"){
55: port=r.process(*value).as_int();
56: } else if(key=="user"){
57: user_c=value->as_string().cstr();
58: } else if(key=="password"){
59: pass_c=value->as_string().cstr();
60: } else if(key=="vhost"){
61: vhost_c=value->as_string().cstr();
62: } else if(key=="locale"){
63: locale_c=value->as_string().cstr();
64: } else if(key=="heartbeat"){
65: heartbeat=r.process(*value).as_int();
66: } else
67: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
68: }
69: }
70: }
71:
72: amqp_connection_state_t conn = amqp_new_connection();
73: amqp_socket_t* socket = amqp_tcp_socket_new(conn);
74: if(!socket)
75: throw Exception("amqp", 0, "failed to create TCP socket");
76: if(amqp_socket_open(socket, host_c, port))
77: throw Exception("amqp", 0, "failed to open TCP socket");
78:
79: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
80: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
81: amqp_destroy_connection(conn);
82: throw Exception("amqp", 0, "login failed");
83: }
84:
85: int channel = 1;
86: amqp_channel_open(conn, channel);
87: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
88: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
89: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
90: amqp_destroy_connection(conn);
91: throw Exception("amqp", 0, "channel open failed");
92: }
93:
94: self.fconnection = conn;
95: self.fchannel = channel;
96: #else
97: (void)params; (void)self;
98: throw Exception("amqp", 0, "compiled without amqp support");
99: #endif
100: }
101:
102: #ifdef WITH_AMQP
103:
104: static void check(const char* action, amqp_rpc_reply_t rr){
105: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
106: throw Exception("amqp", 0, "%s failed", action);
107: }
108:
1.2 ! moko 109: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
! 110: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
! 111:
1.1 moko 112: static void _publish(Request& r, MethodParams& params) {
113: VAmqp& self=GET_SELF(r, VAmqp);
114: const String &msg=params.as_string(0, "msg must be string");
115: const char* exchange_c = ""; // default exchange
116: const char* routing_key_c = 0;
117: bool have_queue_sugar=false;
118: bool mandatory=false;
119:
120: amqp_basic_properties_t props;
121: props._flags = 0;
122:
123: if(params.count()>1){
124: if(HashStringValue* options=params.as_hash(1)){
125: for(HashStringValue::Iterator i(*options); i; i.next()){
126: String::Body key=i.key();
127: Value* value=i.value();
128: if(key=="exchange"){
129: exchange_c=value->as_string().cstr();
130: } else if(key=="routing_key"){
131: routing_key_c=value->as_string().cstr();
132: } else if(key=="queue"){
133: routing_key_c=value->as_string().cstr(); have_queue_sugar=true;
134: } else if(key=="mandatory"){
135: mandatory=r.process(*value).as_bool();
136: } else if(key=="properties"){
137: // parse message properties
138: if(HashStringValue* ph=value->get_hash()){
139: for(HashStringValue::Iterator p(*ph); p; p.next()){
140: String::Body pkey=p.key();
141: Value* pval=p.value();
142: if(pkey=="content_type"){
143: const char* v=pval->as_string().cstr();
144: props.content_type=amqp_cstring_bytes(v);
145: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
146: } else if(pkey=="content_encoding"){
147: const char* v=pval->as_string().cstr();
148: props.content_encoding=amqp_cstring_bytes(v);
149: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
150: } else if(pkey=="delivery_mode"){
151: uint8_t dm=(uint8_t)pval->as_int();
152: props.delivery_mode=dm;
153: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
154: } else if(pkey=="priority"){
155: uint8_t pr=(uint8_t)pval->as_int();
156: props.priority=pr;
157: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
158: } else if(pkey=="correlation_id"){
159: const char* v=pval->as_string().cstr();
160: props.correlation_id=amqp_cstring_bytes(v);
161: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
162: } else if(pkey=="reply_to"){
163: const char* v=pval->as_string().cstr();
164: props.reply_to=amqp_cstring_bytes(v);
165: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
166: } else if(pkey=="expiration"){
167: const char* v=pval->as_string().cstr();
168: props.expiration=amqp_cstring_bytes(v);
169: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
170: } else if(pkey=="message_id"){
171: const char* v=pval->as_string().cstr();
172: props.message_id=amqp_cstring_bytes(v);
173: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
174: } else if(pkey=="timestamp"){
175: uint64_t ts=(uint64_t)pval->as_double();
176: props.timestamp=ts;
177: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
178: } else if(pkey=="type"){
179: const char* v=pval->as_string().cstr();
180: props.type=amqp_cstring_bytes(v);
181: props._flags|=AMQP_BASIC_TYPE_FLAG;
182: } else if(pkey=="user_id"){
183: const char* v=pval->as_string().cstr();
184: props.user_id=amqp_cstring_bytes(v);
185: props._flags|=AMQP_BASIC_USER_ID_FLAG;
186: } else if(pkey=="app_id"){
187: const char* v=pval->as_string().cstr();
188: props.app_id=amqp_cstring_bytes(v);
189: props._flags|=AMQP_BASIC_APP_ID_FLAG;
190: } else if(pkey=="headers"){
191: /* if(HashStringValue* hh=pval->get_hash()){
192: size_t count=hh->count();
193: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
194: size_t idx=0;
195: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
196: String::Body hkey=hi.key();
197: const char* hv=hi.value()->as_string().cstr();
198: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
199: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
200: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
201: idx++;
202: }
203: props.headers.num_entries=(int)count;
204: props.headers.entries=entries;
205: props._flags|=AMQP_BASIC_HEADERS_FLAG;
206: }
207: */ } else
208: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
209: }
210: }
211: } else
212: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
213: }
214: }
215: }
216:
217: if(!routing_key_c)
218: throw Exception("amqp", 0, "routing_key or queue must be specified");
219:
220: amqp_bytes_t body;
221: body.len = msg.length();
222: body.bytes=(void*)msg.cstr();
223:
224: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body);
225:
226: if(ret!=AMQP_STATUS_OK)
227: throw Exception("amqp", 0, "publish failed");
228:
229: // free temporary headers entries if allocated
230: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
231: // delete [] props.headers.entries;
232: }
233: (void)have_queue_sugar;
234: }
235:
236: static void _release(Request& r, MethodParams&) {
237: VAmqp& self=GET_SELF(r, VAmqp);
238: if(self.fconnection){
239: amqp_connection_state_t conn=self.fconnection;
240: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
241: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
242: amqp_destroy_connection(conn);
243: self.fconnection=0;
244: self.fchannel=0;
245: }
246: }
247:
248: static void _ack(Request& r, MethodParams& params) {
249: VAmqp& self=GET_SELF(r, VAmqp);
250: const String &tag_s=params.as_string(0, "delivery tag must not be code");
251: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
252: if(ret!=AMQP_STATUS_OK)
253: throw Exception("amqp", 0, "ack failed");
254: }
255:
256: static void _nack(Request& r, MethodParams& params) {
257: VAmqp& self=GET_SELF(r, VAmqp);
258: const String &tag_s=params.as_string(0, "delivery tag must not be code");
259: bool requeue=false;
260: if(params.count()>1){
261: if(HashStringValue* options=params.as_hash(1)){
262: for(HashStringValue::Iterator i(*options); i; i.next()){
263: if(i.key()=="requeue"){
264: requeue=r.process(*i.value()).as_bool();
265: } else
266: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
267: }
268: }
269: }
270: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
271: if(ret!=AMQP_STATUS_OK)
272: throw Exception("amqp", 0, "nack failed");
273: }
274:
275: static void _qos(Request& r, MethodParams& params) {
276: VAmqp& self=GET_SELF(r, VAmqp);
277: uint16_t prefetch_count=0;
278: if(params.count()>0){
279: if(HashStringValue* options=params.as_hash(0)){
280: for(HashStringValue::Iterator i(*options); i; i.next()){
281: if(i.key()=="prefetch_count"){
282: int pc=r.process(*i.value()).as_int();
283: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
284: } else
285: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
286: }
287: }
288: }
289: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
290: if(!ret)
291: throw Exception("amqp", 0, "qos failed");
292: }
293:
294: static void _reject(Request& r, MethodParams& params) {
295: VAmqp& self=GET_SELF(r, VAmqp);
296: const String &tag_s=params.as_string(0, "delivery tag must not be code");
297: bool requeue=true; // by default return to queue
298: if(params.count()>1){
299: if(HashStringValue* options = params.as_hash(1)){
300: for(HashStringValue::Iterator i(*options); i; i.next()){
301: if(i.key() == "requeue"){
302: requeue=r.process(*i.value()).as_bool();
303: } else
304: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
305: }
306: }
307: }
308: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
309: if(ret!=AMQP_STATUS_OK)
310: throw Exception("amqp", 0, "reject failed");
311: }
312:
313: static void _declare_exchange(Request& r, MethodParams& params) {
314: VAmqp& self=GET_SELF(r, VAmqp);
315: const char* name_c = 0;
316: const char* type_c = "direct";
317: bool passive=false, durable=false, auto_delete=true, nowait=false;
318: if(params.count()>0){
319: if(HashStringValue* options=params.as_hash(0)){
320: for(HashStringValue::Iterator i(*options); i; i.next()){
321: String::Body key=i.key();
322: Value* value=i.value();
323: if(key=="name"){
324: name_c=value->as_string().cstr();
325: } else if(key=="type"){
326: type_c=value->as_string().cstr();
327: } else if(key=="passive"){
328: passive=r.process(*value).as_bool();
329: } else if(key=="durable"){
330: durable=r.process(*value).as_bool();
331: } else if(key=="auto_delete"){
332: auto_delete=r.process(*value).as_bool();
333: } else if(key=="nowait"){
334: nowait=r.process(*value).as_bool();
335: } else
336: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
337: }
338: }
339: }
340: if(!name_c)
341: throw Exception("amqp", 0, "name is required");
342: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
343: check("declare exchange", amqp_get_rpc_reply(self.connection()));
344: }
345:
346: static void _delete_exchange(Request& r, MethodParams& params) {
347: VAmqp& self=GET_SELF(r, VAmqp);
348: const char* name_c = 0;
349: bool if_unused=false, nowait=false;
350: if(params.count()>0){
351: if(HashStringValue* options=params.as_hash(0)){
352: for(HashStringValue::Iterator i(*options); i; i.next()){
353: String::Body key=i.key();
354: Value* value=i.value();
355: if(key=="exchange"){
356: name_c=value->as_string().cstr();
357: } else if(key=="if_unused"){
358: if_unused=r.process(*value).as_bool();
359: } else if(key=="nowait"){
360: nowait=r.process(*value).as_bool();
361: } else
362: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
363: }
364: }
365: }
366: if(!name_c) throw Exception("amqp", 0, "exchange is required");
367: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
368: check("delete exchange", amqp_get_rpc_reply(self.connection()));
369: }
370:
371: static void _declare_queue(Request& r, MethodParams& params) {
372: VAmqp& self=GET_SELF(r, VAmqp);
373: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
374: if(params.count()>0){
375: if(HashStringValue* options=params.as_hash(0)){
376: for(HashStringValue::Iterator i(*options); i; i.next()){
377: String::Body key=i.key();
378: Value* value=i.value();
379: if(key=="queue"){
380: queue_c=value->as_string().cstr();
381: } else if(key=="passive"){
382: passive=r.process(*value).as_bool();
383: } else if(key=="durable"){
384: durable=r.process(*value).as_bool();
385: } else if(key=="auto_delete"){
386: auto_delete=r.process(*value).as_bool();
387: } else if(key=="nowait"){
388: nowait=r.process(*value).as_bool();
389: } else
390: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
391: }
392: }
393: }
394: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
395: check("declare queue", amqp_get_rpc_reply(self.connection()));
396: if(!queue_c && ok){
1.2 ! moko 397: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 398: }
399: }
400:
401: static void _delete_queue(Request& r, MethodParams& params) {
402: VAmqp& self=GET_SELF(r, VAmqp);
403: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
404: if(params.count()>0){
405: if(HashStringValue* options=params.as_hash(0)){
406: for(HashStringValue::Iterator i(*options); i; i.next()){
407: String::Body key=i.key();
408: Value* value=i.value();
409: if(key=="queue"){
410: queue_c=value->as_string().cstr();
411: } else if(key=="if_unused"){
412: if_unused=r.process(*value).as_bool();
413: } else if(key=="if_empty"){
414: if_empty=r.process(*value).as_bool();
415: } else if(key=="nowait"){
416: nowait=r.process(*value).as_bool();
417: } else
418: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
419: }
420: }
421: }
422: if(!queue_c) throw Exception("amqp", 0, "queue is required");
423: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
424: check("delete queue", amqp_get_rpc_reply(self.connection()));
425: }
426:
427: static void _bind_queue(Request& r, MethodParams& params) {
428: VAmqp& self=GET_SELF(r, VAmqp);
429: const char* exchange_c=0;
430: const char* queue_c=0;
431: const char* routing_key_c="";
432: if(params.count()>0){
433: if(HashStringValue* options=params.as_hash(0)){
434: for(HashStringValue::Iterator i(*options); i; i.next()){
435: String::Body key=i.key();
436: Value* value=i.value();
437: if(key=="exchange"){
438: exchange_c=value->as_string().cstr();
439: } else if(key=="queue"){
440: queue_c=value->as_string().cstr();
441: } else if(key=="routing_key"){
442: routing_key_c=value->as_string().cstr();
443: } else
444: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
445: }
446: }
447: }
448: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
449: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
450: check("bind queue", amqp_get_rpc_reply(self.connection()));
451: }
452:
453: static void _unbind_queue(Request& r, MethodParams& params) {
454: VAmqp& self=GET_SELF(r, VAmqp);
455: const char* exchange_c=0;
456: const char* queue_c=0;
457: const char* routing_key_c="";
458: if(params.count()>0){
459: if(HashStringValue* options=params.as_hash(0)){
460: for(HashStringValue::Iterator i(*options); i; i.next()){
461: String::Body key=i.key();
462: Value* value=i.value();
463: if(key=="exchange"){
464: exchange_c=value->as_string().cstr();
465: } else if(key=="queue"){
466: queue_c=value->as_string().cstr();
467: } else if(key=="routing_key"){
468: routing_key_c=value->as_string().cstr();
469: } else
470: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
471: }
472: }
473: }
474: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
475: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
476: check("unbind queue", amqp_get_rpc_reply(self.connection()));
477: }
478:
479: static void _consume(Request& r, MethodParams& params) {
480: VAmqp& self=GET_SELF(r, VAmqp);
481: const char* queue_c=0;
482: const char* consumer_tag_c=0;
483: bool no_ack=true, nowait=false;
484: Junction* callback=0;
485:
1.2 ! moko 486: if(HashStringValue* options=params.as_hash(0)){
! 487: for(HashStringValue::Iterator i(*options); i; i.next()){
! 488: String::Body key=i.key();
! 489: Value* value=i.value();
! 490: if(key=="queue"){
! 491: queue_c=value->as_string().cstr();
! 492: } else if(key=="consumer_tag"){
! 493: consumer_tag_c=value->as_string().cstr();
! 494: } else if(key=="no_ack"){
! 495: no_ack=r.process(*value).as_bool();
! 496: } else if(key=="nowait"){
! 497: nowait=r.process(*value).as_bool();
! 498: } else if(key=="callback"){
! 499: callback=value->get_junction();
! 500: } else
! 501: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 502: }
503: }
504:
505: if(!queue_c) throw Exception("amqp", 0, "queue is required");
506: if(!callback) throw Exception("amqp", 0, "callback is required");
507:
508: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
509: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
510: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
511: amqp_rpc_reply_t rr = amqp_get_rpc_reply(self.connection());
512: if(rr.reply_type != AMQP_RESPONSE_NORMAL)
513: throw Exception("amqp", 0, "consume failed");
514:
515: self.fstop=false;
516: while(!self.fstop){
1.2 ! moko 517: amqp_envelope_t envelope;
! 518: memset(&envelope, 0, sizeof(envelope));
! 519: amqp_maybe_release_buffers(self.connection());
1.1 moko 520: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
521: if(res.reply_type == AMQP_RESPONSE_NORMAL){
522: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2 ! moko 523: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1 moko 524: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2 ! moko 525: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
! 526: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1 moko 527:
528: Value *params_cb[]={&vh};
529: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
530: frame.store_params(params_cb, 1);
531: r.call(frame);
532: });
533:
534: amqp_destroy_envelope(&envelope);
535: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
536: continue;
537: } else {
538: break;
539: }
540: }
541: }
542:
543: static void _stop_consume(Request& r, MethodParams&) {
544: VAmqp& self=GET_SELF(r, VAmqp);
545: self.fstop=true;
546: }
547:
548: #endif
549:
550: // constructor
551: MAmqp::MAmqp(): Methoded("amqp") {
552: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
553: #ifdef WITH_AMQP
554: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
555: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
556: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
557: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
558: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
559: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
560: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
561: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
562: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
563: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
564: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
565: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2 ! moko 566: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 567: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
568: #endif
569: }
570:
571: