Annotation of parser3/src/classes/amqp.C, revision 1.4
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
13: #include "pa_vbool.h"
14: #include "pa_vamqp.h"
15:
16: #ifdef WITH_AMQP
17: #include <amqp.h>
18: #include <amqp_tcp_socket.h>
19: #include <amqp_framing.h>
20: #include <stdlib.h>
21: #include <string.h>
22: #endif
23:
1.4 ! moko 24: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.3 2025/11/07 22:36:35 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 25:
26: class MAmqp: public Methoded {
27: public: // VStateless_class
28: Value* create_new_value(Pool&) { return new VAmqp(); }
29: public:
30: MAmqp();
31: };
32:
33: DECLARE_CLASS_VAR(amqp, new MAmqp);
34:
35: static void _create(Request& r, MethodParams& params) {
36: VAmqp& self=GET_SELF(r, VAmqp);
37:
38: #ifdef WITH_AMQP
39: const char* host_c = "localhost";
40: int port = 5672;
41: const char* user_c = "guest";
42: const char* pass_c = "guest";
43: const char* vhost_c = "/";
44: const char* locale_c = "en_US";
45: int heartbeat = 30; // seconds
46:
47: if(params.count()>0){
48: if(HashStringValue* options=params.as_hash(0)){
49: for(HashStringValue::Iterator i(*options); i; i.next()){
50: String::Body key=i.key();
51: Value* value=i.value();
52: if(key=="host"){
53: host_c=value->as_string().cstr();
54: } else if(key=="port"){
55: port=r.process(*value).as_int();
56: } else if(key=="user"){
57: user_c=value->as_string().cstr();
58: } else if(key=="password"){
59: pass_c=value->as_string().cstr();
60: } else if(key=="vhost"){
61: vhost_c=value->as_string().cstr();
62: } else if(key=="locale"){
63: locale_c=value->as_string().cstr();
64: } else if(key=="heartbeat"){
65: heartbeat=r.process(*value).as_int();
66: } else
67: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
68: }
69: }
70: }
71:
72: amqp_connection_state_t conn = amqp_new_connection();
73: amqp_socket_t* socket = amqp_tcp_socket_new(conn);
74: if(!socket)
75: throw Exception("amqp", 0, "failed to create TCP socket");
76: if(amqp_socket_open(socket, host_c, port))
77: throw Exception("amqp", 0, "failed to open TCP socket");
78:
79: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
80: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
81: amqp_destroy_connection(conn);
82: throw Exception("amqp", 0, "login failed");
83: }
84:
85: int channel = 1;
86: amqp_channel_open(conn, channel);
87: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
88: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
89: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
90: amqp_destroy_connection(conn);
91: throw Exception("amqp", 0, "channel open failed");
92: }
93:
94: self.fconnection = conn;
95: self.fchannel = channel;
96: #else
97: (void)params; (void)self;
98: throw Exception("amqp", 0, "compiled without amqp support");
99: #endif
100: }
101:
102: #ifdef WITH_AMQP
103:
1.4 ! moko 104: static void check(amqp_rpc_reply_t rr){
! 105: if(rr.reply_type == AMQP_RESPONSE_NORMAL)
! 106: return;
! 107:
! 108: // Extract error message from reply
! 109: const char* error_msg = 0;
! 110: size_t error_len = 0;
! 111: if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
! 112: if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
! 113: amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
! 114: if(m->reply_text.len > 0 && m->reply_text.bytes) {
! 115: error_msg = (const char*)m->reply_text.bytes;
! 116: error_len = m->reply_text.len;
! 117: }
! 118: } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
! 119: amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
! 120: if(m->reply_text.len > 0 && m->reply_text.bytes) {
! 121: error_msg = (const char*)m->reply_text.bytes;
! 122: error_len = m->reply_text.len;
! 123: }
! 124: }
! 125: }
! 126:
! 127: if(error_msg) {
! 128: throw Exception("amqp", 0, "failed: %.*s", (int)error_len, error_msg);
! 129: } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
! 130: throw Exception("amqp", 0, "failed: library error %d", rr.library_error);
! 131: } else {
! 132: throw Exception("amqp", 0, "failed");
! 133: }
1.1 moko 134: }
135:
1.2 moko 136: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
137: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
138:
1.1 moko 139: static void _publish(Request& r, MethodParams& params) {
140: VAmqp& self=GET_SELF(r, VAmqp);
141: const String &msg=params.as_string(0, "msg must be string");
142: const char* exchange_c = ""; // default exchange
143: const char* routing_key_c = 0;
144: bool mandatory=false;
145:
146: amqp_basic_properties_t props;
147: props._flags = 0;
148:
149: if(params.count()>1){
150: if(HashStringValue* options=params.as_hash(1)){
151: for(HashStringValue::Iterator i(*options); i; i.next()){
152: String::Body key=i.key();
153: Value* value=i.value();
154: if(key=="exchange"){
155: exchange_c=value->as_string().cstr();
156: } else if(key=="routing_key"){
157: routing_key_c=value->as_string().cstr();
158: } else if(key=="queue"){
1.3 moko 159: routing_key_c=value->as_string().cstr();
1.1 moko 160: } else if(key=="mandatory"){
161: mandatory=r.process(*value).as_bool();
1.3 moko 162: } else if(key=="content_type"){
163: const char* v=value->as_string().cstr();
164: props.content_type=amqp_cstring_bytes(v);
165: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
166: } else if(key=="content_encoding"){
167: const char* v=value->as_string().cstr();
168: props.content_encoding=amqp_cstring_bytes(v);
169: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
170: } else if(key=="delivery_mode"){
171: uint8_t dm=(uint8_t)value->as_int();
172: props.delivery_mode=dm;
173: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
174: } else if(key=="priority"){
175: uint8_t pr=(uint8_t)value->as_int();
176: props.priority=pr;
177: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
178: } else if(key=="correlation_id"){
179: const char* v=value->as_string().cstr();
180: props.correlation_id=amqp_cstring_bytes(v);
181: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
182: } else if(key=="reply_to"){
183: const char* v=value->as_string().cstr();
184: props.reply_to=amqp_cstring_bytes(v);
185: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
186: } else if(key=="expiration"){
187: const char* v=value->as_string().cstr();
188: props.expiration=amqp_cstring_bytes(v);
189: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
190: } else if(key=="message_id"){
191: const char* v=value->as_string().cstr();
192: props.message_id=amqp_cstring_bytes(v);
193: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
194: } else if(key=="timestamp"){
195: uint64_t ts=(uint64_t)value->as_double();
196: props.timestamp=ts;
197: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
198: } else if(key=="type"){
199: const char* v=value->as_string().cstr();
200: props.type=amqp_cstring_bytes(v);
201: props._flags|=AMQP_BASIC_TYPE_FLAG;
202: } else if(key=="user_id"){
203: const char* v=value->as_string().cstr();
204: props.user_id=amqp_cstring_bytes(v);
205: props._flags|=AMQP_BASIC_USER_ID_FLAG;
206: } else if(key=="app_id"){
207: const char* v=value->as_string().cstr();
208: props.app_id=amqp_cstring_bytes(v);
209: props._flags|=AMQP_BASIC_APP_ID_FLAG;
210: } else if(key=="headers"){
211: /* if(HashStringValue* hh=pval->get_hash()){
212: size_t count=hh->count();
213: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
214: size_t idx=0;
215: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
216: String::Body hkey=hi.key();
217: const char* hv=hi.value()->as_string().cstr();
218: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
219: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
220: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
221: idx++;
1.1 moko 222: }
1.3 moko 223: props.headers.num_entries=(int)count;
224: props.headers.entries=entries;
225: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 226: }
1.3 moko 227: */ } else
1.1 moko 228: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
229: }
230: }
231: }
232:
233: if(!routing_key_c)
234: throw Exception("amqp", 0, "routing_key or queue must be specified");
235:
236: amqp_bytes_t body;
237: body.len = msg.length();
238: body.bytes=(void*)msg.cstr();
239:
1.3 moko 240: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 241:
242: if(ret!=AMQP_STATUS_OK)
243: throw Exception("amqp", 0, "publish failed");
244:
245: // free temporary headers entries if allocated
246: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
247: // delete [] props.headers.entries;
248: }
249: }
250:
251: static void _release(Request& r, MethodParams&) {
252: VAmqp& self=GET_SELF(r, VAmqp);
253: if(self.fconnection){
254: amqp_connection_state_t conn=self.fconnection;
255: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
256: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
257: amqp_destroy_connection(conn);
258: self.fconnection=0;
259: self.fchannel=0;
260: }
261: }
262:
263: static void _ack(Request& r, MethodParams& params) {
264: VAmqp& self=GET_SELF(r, VAmqp);
265: const String &tag_s=params.as_string(0, "delivery tag must not be code");
266: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
267: if(ret!=AMQP_STATUS_OK)
268: throw Exception("amqp", 0, "ack failed");
269: }
270:
271: static void _nack(Request& r, MethodParams& params) {
272: VAmqp& self=GET_SELF(r, VAmqp);
273: const String &tag_s=params.as_string(0, "delivery tag must not be code");
274: bool requeue=false;
275: if(params.count()>1){
276: if(HashStringValue* options=params.as_hash(1)){
277: for(HashStringValue::Iterator i(*options); i; i.next()){
278: if(i.key()=="requeue"){
279: requeue=r.process(*i.value()).as_bool();
280: } else
281: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
282: }
283: }
284: }
285: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
286: if(ret!=AMQP_STATUS_OK)
287: throw Exception("amqp", 0, "nack failed");
288: }
289:
290: static void _qos(Request& r, MethodParams& params) {
291: VAmqp& self=GET_SELF(r, VAmqp);
292: uint16_t prefetch_count=0;
293: if(params.count()>0){
294: if(HashStringValue* options=params.as_hash(0)){
295: for(HashStringValue::Iterator i(*options); i; i.next()){
296: if(i.key()=="prefetch_count"){
297: int pc=r.process(*i.value()).as_int();
298: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
299: } else
300: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
301: }
302: }
303: }
304: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
305: if(!ret)
306: throw Exception("amqp", 0, "qos failed");
307: }
308:
309: static void _reject(Request& r, MethodParams& params) {
310: VAmqp& self=GET_SELF(r, VAmqp);
311: const String &tag_s=params.as_string(0, "delivery tag must not be code");
312: bool requeue=true; // by default return to queue
313: if(params.count()>1){
314: if(HashStringValue* options = params.as_hash(1)){
315: for(HashStringValue::Iterator i(*options); i; i.next()){
316: if(i.key() == "requeue"){
317: requeue=r.process(*i.value()).as_bool();
318: } else
319: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
320: }
321: }
322: }
323: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
324: if(ret!=AMQP_STATUS_OK)
325: throw Exception("amqp", 0, "reject failed");
326: }
327:
328: static void _declare_exchange(Request& r, MethodParams& params) {
329: VAmqp& self=GET_SELF(r, VAmqp);
330: const char* name_c = 0;
331: const char* type_c = "direct";
332: bool passive=false, durable=false, auto_delete=true, nowait=false;
333: if(params.count()>0){
334: if(HashStringValue* options=params.as_hash(0)){
335: for(HashStringValue::Iterator i(*options); i; i.next()){
336: String::Body key=i.key();
337: Value* value=i.value();
338: if(key=="name"){
339: name_c=value->as_string().cstr();
340: } else if(key=="type"){
341: type_c=value->as_string().cstr();
342: } else if(key=="passive"){
343: passive=r.process(*value).as_bool();
344: } else if(key=="durable"){
345: durable=r.process(*value).as_bool();
346: } else if(key=="auto_delete"){
347: auto_delete=r.process(*value).as_bool();
348: } else if(key=="nowait"){
349: nowait=r.process(*value).as_bool();
350: } else
351: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
352: }
353: }
354: }
355: if(!name_c)
356: throw Exception("amqp", 0, "name is required");
357: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 ! moko 358: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 359: }
360:
361: static void _delete_exchange(Request& r, MethodParams& params) {
362: VAmqp& self=GET_SELF(r, VAmqp);
363: const char* name_c = 0;
364: bool if_unused=false, nowait=false;
365: if(params.count()>0){
366: if(HashStringValue* options=params.as_hash(0)){
367: for(HashStringValue::Iterator i(*options); i; i.next()){
368: String::Body key=i.key();
369: Value* value=i.value();
370: if(key=="exchange"){
371: name_c=value->as_string().cstr();
372: } else if(key=="if_unused"){
373: if_unused=r.process(*value).as_bool();
374: } else if(key=="nowait"){
375: nowait=r.process(*value).as_bool();
376: } else
377: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
378: }
379: }
380: }
381: if(!name_c) throw Exception("amqp", 0, "exchange is required");
382: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
1.4 ! moko 383: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 384: }
385:
386: static void _declare_queue(Request& r, MethodParams& params) {
387: VAmqp& self=GET_SELF(r, VAmqp);
388: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
389: if(params.count()>0){
390: if(HashStringValue* options=params.as_hash(0)){
391: for(HashStringValue::Iterator i(*options); i; i.next()){
392: String::Body key=i.key();
393: Value* value=i.value();
394: if(key=="queue"){
395: queue_c=value->as_string().cstr();
396: } else if(key=="passive"){
397: passive=r.process(*value).as_bool();
398: } else if(key=="durable"){
399: durable=r.process(*value).as_bool();
400: } else if(key=="auto_delete"){
401: auto_delete=r.process(*value).as_bool();
402: } else if(key=="nowait"){
403: nowait=r.process(*value).as_bool();
404: } else
405: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
406: }
407: }
408: }
409: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 ! moko 410: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 411: if(!queue_c && ok){
1.2 moko 412: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 413: }
414: }
415:
416: static void _delete_queue(Request& r, MethodParams& params) {
417: VAmqp& self=GET_SELF(r, VAmqp);
418: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
419: if(params.count()>0){
420: if(HashStringValue* options=params.as_hash(0)){
421: for(HashStringValue::Iterator i(*options); i; i.next()){
422: String::Body key=i.key();
423: Value* value=i.value();
424: if(key=="queue"){
425: queue_c=value->as_string().cstr();
426: } else if(key=="if_unused"){
427: if_unused=r.process(*value).as_bool();
428: } else if(key=="if_empty"){
429: if_empty=r.process(*value).as_bool();
430: } else if(key=="nowait"){
431: nowait=r.process(*value).as_bool();
432: } else
433: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
434: }
435: }
436: }
437: if(!queue_c) throw Exception("amqp", 0, "queue is required");
438: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
1.4 ! moko 439: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 440: }
441:
442: static void _bind_queue(Request& r, MethodParams& params) {
443: VAmqp& self=GET_SELF(r, VAmqp);
444: const char* exchange_c=0;
445: const char* queue_c=0;
446: const char* routing_key_c="";
447: if(params.count()>0){
448: if(HashStringValue* options=params.as_hash(0)){
449: for(HashStringValue::Iterator i(*options); i; i.next()){
450: String::Body key=i.key();
451: Value* value=i.value();
452: if(key=="exchange"){
453: exchange_c=value->as_string().cstr();
454: } else if(key=="queue"){
455: queue_c=value->as_string().cstr();
456: } else if(key=="routing_key"){
457: routing_key_c=value->as_string().cstr();
458: } else
459: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
460: }
461: }
462: }
463: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
464: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 ! moko 465: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 466: }
467:
468: static void _unbind_queue(Request& r, MethodParams& params) {
469: VAmqp& self=GET_SELF(r, VAmqp);
470: const char* exchange_c=0;
471: const char* queue_c=0;
472: const char* routing_key_c="";
473: if(params.count()>0){
474: if(HashStringValue* options=params.as_hash(0)){
475: for(HashStringValue::Iterator i(*options); i; i.next()){
476: String::Body key=i.key();
477: Value* value=i.value();
478: if(key=="exchange"){
479: exchange_c=value->as_string().cstr();
480: } else if(key=="queue"){
481: queue_c=value->as_string().cstr();
482: } else if(key=="routing_key"){
483: routing_key_c=value->as_string().cstr();
484: } else
485: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
486: }
487: }
488: }
489: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
490: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 ! moko 491: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 492: }
493:
494: static void _consume(Request& r, MethodParams& params) {
495: VAmqp& self=GET_SELF(r, VAmqp);
496: const char* queue_c=0;
497: const char* consumer_tag_c=0;
498: bool no_ack=true, nowait=false;
499: Junction* callback=0;
500:
1.2 moko 501: if(HashStringValue* options=params.as_hash(0)){
502: for(HashStringValue::Iterator i(*options); i; i.next()){
503: String::Body key=i.key();
504: Value* value=i.value();
1.3 moko 505: if(key=="callback"){
506: callback=value->get_junction();
507: } else if(key=="queue"){
1.2 moko 508: queue_c=value->as_string().cstr();
509: } else if(key=="consumer_tag"){
510: consumer_tag_c=value->as_string().cstr();
511: } else if(key=="no_ack"){
512: no_ack=r.process(*value).as_bool();
513: } else if(key=="nowait"){
514: nowait=r.process(*value).as_bool();
515: } else
516: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 517: }
518: }
519:
520: if(!queue_c) throw Exception("amqp", 0, "queue is required");
521: if(!callback) throw Exception("amqp", 0, "callback is required");
522:
523: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
524: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
525: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4 ! moko 526: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 527:
528: self.fstop=false;
529: while(!self.fstop){
1.2 moko 530: amqp_envelope_t envelope;
531: memset(&envelope, 0, sizeof(envelope));
532: amqp_maybe_release_buffers(self.connection());
1.1 moko 533: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
534: if(res.reply_type == AMQP_RESPONSE_NORMAL){
535: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2 moko 536: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1 moko 537: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2 moko 538: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
539: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1 moko 540:
541: Value *params_cb[]={&vh};
542: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
543: frame.store_params(params_cb, 1);
544: r.call(frame);
545: });
546:
547: amqp_destroy_envelope(&envelope);
548: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
549: continue;
550: } else {
551: break;
552: }
553: }
554: }
555:
556: static void _stop_consume(Request& r, MethodParams&) {
557: VAmqp& self=GET_SELF(r, VAmqp);
558: self.fstop=true;
559: }
560:
561: #endif
562:
563: // constructor
564: MAmqp::MAmqp(): Methoded("amqp") {
565: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
566: #ifdef WITH_AMQP
567: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
568: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
569: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
570: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
571: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
572: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
573: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
574: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
575: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
576: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
577: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
578: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2 moko 579: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 580: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
581: #endif
582: }
E-mail: