Annotation of parser3/src/classes/amqp.C, revision 1.6
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
13: #include "pa_vbool.h"
14: #include "pa_vamqp.h"
15:
16: #ifdef WITH_AMQP
17: #include <amqp.h>
18: #include <amqp_tcp_socket.h>
1.5 moko 19: #include <amqp_ssl_socket.h>
1.1 moko 20: #include <amqp_framing.h>
21: #include <stdlib.h>
22: #include <string.h>
23: #endif
24:
1.6 ! moko 25: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.5 2025/11/07 23:53:42 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 26:
27: class MAmqp: public Methoded {
28: public: // VStateless_class
29: Value* create_new_value(Pool&) { return new VAmqp(); }
30: public:
31: MAmqp();
32: };
33:
34: DECLARE_CLASS_VAR(amqp, new MAmqp);
35:
1.6 ! moko 36: #ifdef WITH_AMQP
! 37:
! 38: static void status_check(int ret, const char *detail=""){
! 39: if(ret == AMQP_STATUS_OK)
! 40: return;
! 41:
! 42: const char* error_str = amqp_error_string2(ret);
! 43: if(error_str) {
! 44: throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
! 45: } else {
! 46: throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
! 47: }
! 48: }
! 49:
! 50: static void check(amqp_rpc_reply_t rr, const char *detail=""){
! 51: if(rr.reply_type == AMQP_RESPONSE_NORMAL)
! 52: return;
! 53:
! 54: // Extract error message from reply
! 55: const char* error_msg = 0;
! 56: size_t error_len = 0;
! 57: if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
! 58: if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
! 59: amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
! 60: if(m->reply_text.len > 0 && m->reply_text.bytes) {
! 61: error_msg = (const char*)m->reply_text.bytes;
! 62: error_len = m->reply_text.len;
! 63: }
! 64: } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
! 65: amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
! 66: if(m->reply_text.len > 0 && m->reply_text.bytes) {
! 67: error_msg = (const char*)m->reply_text.bytes;
! 68: error_len = m->reply_text.len;
! 69: }
! 70: }
! 71: }
! 72:
! 73: if(error_msg) {
! 74: throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
! 75: } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
! 76: status_check(rr.library_error, detail);
! 77: }
! 78:
! 79: throw Exception("amqp", 0, "%sfailed", detail);
! 80: }
! 81:
! 82: #endif // WITH_AMQP
! 83:
! 84:
1.1 moko 85: static void _create(Request& r, MethodParams& params) {
86: VAmqp& self=GET_SELF(r, VAmqp);
87:
88: #ifdef WITH_AMQP
89: const char* host_c = "localhost";
90: int port = 5672;
91: const char* user_c = "guest";
92: const char* pass_c = "guest";
93: const char* vhost_c = "/";
94: const char* locale_c = "en_US";
95: int heartbeat = 30; // seconds
1.5 moko 96: const char* tls_ca = 0;
97: const char* tls_cert = 0;
98: const char* tls_key = 0;
99: bool tls_specified = false;
100: bool tls_verify = true;
1.1 moko 101:
102: if(params.count()>0){
103: if(HashStringValue* options=params.as_hash(0)){
104: for(HashStringValue::Iterator i(*options); i; i.next()){
105: String::Body key=i.key();
106: Value* value=i.value();
107: if(key=="host"){
108: host_c=value->as_string().cstr();
109: } else if(key=="port"){
110: port=r.process(*value).as_int();
111: } else if(key=="user"){
112: user_c=value->as_string().cstr();
113: } else if(key=="password"){
114: pass_c=value->as_string().cstr();
115: } else if(key=="vhost"){
116: vhost_c=value->as_string().cstr();
117: } else if(key=="locale"){
118: locale_c=value->as_string().cstr();
119: } else if(key=="heartbeat"){
120: heartbeat=r.process(*value).as_int();
1.5 moko 121: } else if(key=="tls"){
122: tls_specified = true;
123: if(HashStringValue* tls_options=value->get_hash()){
124: for(HashStringValue::Iterator t(*tls_options); t; t.next()){
125: String::Body tkey=t.key();
126: Value* tval=t.value();
127: if(tkey=="ca"){
128: tls_ca=tval->as_string().cstr();
129: } else if(tkey=="cert"){
130: tls_cert=tval->as_string().cstr();
131: } else if(tkey=="key"){
132: tls_key=tval->as_string().cstr();
133: } else if(tkey=="verify"){
134: tls_verify=r.process(*tval).as_bool();
135: } else
136: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
137: }
138: }
1.1 moko 139: } else
140: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
141: }
142: }
143: }
144:
145: amqp_connection_state_t conn = amqp_new_connection();
1.5 moko 146: amqp_socket_t* socket = 0;
147:
148: if(tls_specified) {
149: socket = amqp_ssl_socket_new(conn);
150: if(!socket)
151: throw Exception("amqp", 0, "failed to create SSL socket");
152:
153: // Set CA certificate if provided
154: if(tls_ca)
155: if(amqp_ssl_socket_set_cacert(socket, tls_ca))
156: throw Exception("amqp", 0, "failed to set CA certificate");
1.6 ! moko 157:
1.5 moko 158: // Set client certificate and key if provided
159: if(tls_cert && tls_key) {
160: if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
161: throw Exception("amqp", 0, "failed to set client certificate/key");
162: } else if(tls_cert || tls_key) {
163: throw Exception("amqp", 0, "both cert and key must be specified for TLS");
164: }
1.6 ! moko 165:
1.5 moko 166: // If CA is provided, peer verification will use it
167: amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
168: // If verify=true, enable hostname verification
169: amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
170: } else {
171: socket = amqp_tcp_socket_new(conn);
172: if(!socket)
173: throw Exception("amqp", 0, "failed to create TCP socket");
174: }
175:
1.6 ! moko 176: status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1 moko 177:
178: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
179: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
180: amqp_destroy_connection(conn);
1.6 ! moko 181: check(rlogin, "login ");
1.1 moko 182: }
183:
184: int channel = 1;
185: amqp_channel_open(conn, channel);
186: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
187: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
188: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
189: amqp_destroy_connection(conn);
1.6 ! moko 190: check(ropen, "open channel ");
1.1 moko 191: }
192:
193: self.fconnection = conn;
194: self.fchannel = channel;
195: #else
196: (void)params; (void)self;
197: throw Exception("amqp", 0, "compiled without amqp support");
1.6 ! moko 198: #endif // WITH_AMQP
1.1 moko 199: }
200:
201: #ifdef WITH_AMQP
202:
1.2 moko 203: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
204: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
205:
1.1 moko 206: static void _publish(Request& r, MethodParams& params) {
207: VAmqp& self=GET_SELF(r, VAmqp);
208: const String &msg=params.as_string(0, "msg must be string");
209: const char* exchange_c = ""; // default exchange
210: const char* routing_key_c = 0;
211: bool mandatory=false;
212:
213: amqp_basic_properties_t props;
214: props._flags = 0;
215:
216: if(params.count()>1){
217: if(HashStringValue* options=params.as_hash(1)){
218: for(HashStringValue::Iterator i(*options); i; i.next()){
219: String::Body key=i.key();
220: Value* value=i.value();
221: if(key=="exchange"){
222: exchange_c=value->as_string().cstr();
223: } else if(key=="routing_key"){
224: routing_key_c=value->as_string().cstr();
225: } else if(key=="queue"){
1.3 moko 226: routing_key_c=value->as_string().cstr();
1.1 moko 227: } else if(key=="mandatory"){
228: mandatory=r.process(*value).as_bool();
1.3 moko 229: } else if(key=="content_type"){
230: const char* v=value->as_string().cstr();
231: props.content_type=amqp_cstring_bytes(v);
232: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
233: } else if(key=="content_encoding"){
234: const char* v=value->as_string().cstr();
235: props.content_encoding=amqp_cstring_bytes(v);
236: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
237: } else if(key=="delivery_mode"){
238: uint8_t dm=(uint8_t)value->as_int();
239: props.delivery_mode=dm;
240: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
241: } else if(key=="priority"){
242: uint8_t pr=(uint8_t)value->as_int();
243: props.priority=pr;
244: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
245: } else if(key=="correlation_id"){
246: const char* v=value->as_string().cstr();
247: props.correlation_id=amqp_cstring_bytes(v);
248: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
249: } else if(key=="reply_to"){
250: const char* v=value->as_string().cstr();
251: props.reply_to=amqp_cstring_bytes(v);
252: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
253: } else if(key=="expiration"){
254: const char* v=value->as_string().cstr();
255: props.expiration=amqp_cstring_bytes(v);
256: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
257: } else if(key=="message_id"){
258: const char* v=value->as_string().cstr();
259: props.message_id=amqp_cstring_bytes(v);
260: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
261: } else if(key=="timestamp"){
262: uint64_t ts=(uint64_t)value->as_double();
263: props.timestamp=ts;
264: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
265: } else if(key=="type"){
266: const char* v=value->as_string().cstr();
267: props.type=amqp_cstring_bytes(v);
268: props._flags|=AMQP_BASIC_TYPE_FLAG;
269: } else if(key=="user_id"){
270: const char* v=value->as_string().cstr();
271: props.user_id=amqp_cstring_bytes(v);
272: props._flags|=AMQP_BASIC_USER_ID_FLAG;
273: } else if(key=="app_id"){
274: const char* v=value->as_string().cstr();
275: props.app_id=amqp_cstring_bytes(v);
276: props._flags|=AMQP_BASIC_APP_ID_FLAG;
277: } else if(key=="headers"){
278: /* if(HashStringValue* hh=pval->get_hash()){
279: size_t count=hh->count();
280: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
281: size_t idx=0;
282: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
283: String::Body hkey=hi.key();
284: const char* hv=hi.value()->as_string().cstr();
285: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
286: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
287: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
288: idx++;
1.1 moko 289: }
1.3 moko 290: props.headers.num_entries=(int)count;
291: props.headers.entries=entries;
292: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 293: }
1.3 moko 294: */ } else
1.1 moko 295: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
296: }
297: }
298: }
299:
300: if(!routing_key_c)
301: throw Exception("amqp", 0, "routing_key or queue must be specified");
302:
303: amqp_bytes_t body;
304: body.len = msg.length();
305: body.bytes=(void*)msg.cstr();
306:
1.3 moko 307: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 308:
309: if(ret!=AMQP_STATUS_OK)
310: throw Exception("amqp", 0, "publish failed");
311:
312: // free temporary headers entries if allocated
313: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
314: // delete [] props.headers.entries;
315: }
316: }
317:
318: static void _release(Request& r, MethodParams&) {
319: VAmqp& self=GET_SELF(r, VAmqp);
320: if(self.fconnection){
321: amqp_connection_state_t conn=self.fconnection;
322: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
323: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
324: amqp_destroy_connection(conn);
325: self.fconnection=0;
326: self.fchannel=0;
327: }
328: }
329:
330: static void _ack(Request& r, MethodParams& params) {
331: VAmqp& self=GET_SELF(r, VAmqp);
332: const String &tag_s=params.as_string(0, "delivery tag must not be code");
333: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
334: if(ret!=AMQP_STATUS_OK)
335: throw Exception("amqp", 0, "ack failed");
336: }
337:
338: static void _nack(Request& r, MethodParams& params) {
339: VAmqp& self=GET_SELF(r, VAmqp);
340: const String &tag_s=params.as_string(0, "delivery tag must not be code");
341: bool requeue=false;
342: if(params.count()>1){
343: if(HashStringValue* options=params.as_hash(1)){
344: for(HashStringValue::Iterator i(*options); i; i.next()){
345: if(i.key()=="requeue"){
346: requeue=r.process(*i.value()).as_bool();
347: } else
348: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
349: }
350: }
351: }
352: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
353: if(ret!=AMQP_STATUS_OK)
354: throw Exception("amqp", 0, "nack failed");
355: }
356:
357: static void _qos(Request& r, MethodParams& params) {
358: VAmqp& self=GET_SELF(r, VAmqp);
359: uint16_t prefetch_count=0;
360: if(params.count()>0){
361: if(HashStringValue* options=params.as_hash(0)){
362: for(HashStringValue::Iterator i(*options); i; i.next()){
363: if(i.key()=="prefetch_count"){
364: int pc=r.process(*i.value()).as_int();
365: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
366: } else
367: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
368: }
369: }
370: }
371: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
372: if(!ret)
373: throw Exception("amqp", 0, "qos failed");
374: }
375:
376: static void _reject(Request& r, MethodParams& params) {
377: VAmqp& self=GET_SELF(r, VAmqp);
378: const String &tag_s=params.as_string(0, "delivery tag must not be code");
379: bool requeue=true; // by default return to queue
380: if(params.count()>1){
381: if(HashStringValue* options = params.as_hash(1)){
382: for(HashStringValue::Iterator i(*options); i; i.next()){
383: if(i.key() == "requeue"){
384: requeue=r.process(*i.value()).as_bool();
385: } else
386: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
387: }
388: }
389: }
390: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
391: if(ret!=AMQP_STATUS_OK)
392: throw Exception("amqp", 0, "reject failed");
393: }
394:
395: static void _declare_exchange(Request& r, MethodParams& params) {
396: VAmqp& self=GET_SELF(r, VAmqp);
397: const char* name_c = 0;
398: const char* type_c = "direct";
399: bool passive=false, durable=false, auto_delete=true, nowait=false;
400: if(params.count()>0){
401: if(HashStringValue* options=params.as_hash(0)){
402: for(HashStringValue::Iterator i(*options); i; i.next()){
403: String::Body key=i.key();
404: Value* value=i.value();
405: if(key=="name"){
406: name_c=value->as_string().cstr();
407: } else if(key=="type"){
408: type_c=value->as_string().cstr();
409: } else if(key=="passive"){
410: passive=r.process(*value).as_bool();
411: } else if(key=="durable"){
412: durable=r.process(*value).as_bool();
413: } else if(key=="auto_delete"){
414: auto_delete=r.process(*value).as_bool();
415: } else if(key=="nowait"){
416: nowait=r.process(*value).as_bool();
417: } else
418: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
419: }
420: }
421: }
422: if(!name_c)
423: throw Exception("amqp", 0, "name is required");
424: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 moko 425: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 426: }
427:
428: static void _delete_exchange(Request& r, MethodParams& params) {
429: VAmqp& self=GET_SELF(r, VAmqp);
430: const char* name_c = 0;
431: bool if_unused=false, nowait=false;
432: if(params.count()>0){
433: if(HashStringValue* options=params.as_hash(0)){
434: for(HashStringValue::Iterator i(*options); i; i.next()){
435: String::Body key=i.key();
436: Value* value=i.value();
437: if(key=="exchange"){
438: name_c=value->as_string().cstr();
439: } else if(key=="if_unused"){
440: if_unused=r.process(*value).as_bool();
441: } else if(key=="nowait"){
442: nowait=r.process(*value).as_bool();
443: } else
444: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
445: }
446: }
447: }
448: if(!name_c) throw Exception("amqp", 0, "exchange is required");
449: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
1.4 moko 450: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 451: }
452:
453: static void _declare_queue(Request& r, MethodParams& params) {
454: VAmqp& self=GET_SELF(r, VAmqp);
455: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
456: if(params.count()>0){
457: if(HashStringValue* options=params.as_hash(0)){
458: for(HashStringValue::Iterator i(*options); i; i.next()){
459: String::Body key=i.key();
460: Value* value=i.value();
461: if(key=="queue"){
462: queue_c=value->as_string().cstr();
463: } else if(key=="passive"){
464: passive=r.process(*value).as_bool();
465: } else if(key=="durable"){
466: durable=r.process(*value).as_bool();
467: } else if(key=="auto_delete"){
468: auto_delete=r.process(*value).as_bool();
469: } else if(key=="nowait"){
470: nowait=r.process(*value).as_bool();
471: } else
472: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
473: }
474: }
475: }
476: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 moko 477: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 478: if(!queue_c && ok){
1.2 moko 479: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 480: }
481: }
482:
483: static void _delete_queue(Request& r, MethodParams& params) {
484: VAmqp& self=GET_SELF(r, VAmqp);
485: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
486: if(params.count()>0){
487: if(HashStringValue* options=params.as_hash(0)){
488: for(HashStringValue::Iterator i(*options); i; i.next()){
489: String::Body key=i.key();
490: Value* value=i.value();
491: if(key=="queue"){
492: queue_c=value->as_string().cstr();
493: } else if(key=="if_unused"){
494: if_unused=r.process(*value).as_bool();
495: } else if(key=="if_empty"){
496: if_empty=r.process(*value).as_bool();
497: } else if(key=="nowait"){
498: nowait=r.process(*value).as_bool();
499: } else
500: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
501: }
502: }
503: }
504: if(!queue_c) throw Exception("amqp", 0, "queue is required");
505: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
1.4 moko 506: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 507: }
508:
509: static void _bind_queue(Request& r, MethodParams& params) {
510: VAmqp& self=GET_SELF(r, VAmqp);
511: const char* exchange_c=0;
512: const char* queue_c=0;
513: const char* routing_key_c="";
514: if(params.count()>0){
515: if(HashStringValue* options=params.as_hash(0)){
516: for(HashStringValue::Iterator i(*options); i; i.next()){
517: String::Body key=i.key();
518: Value* value=i.value();
519: if(key=="exchange"){
520: exchange_c=value->as_string().cstr();
521: } else if(key=="queue"){
522: queue_c=value->as_string().cstr();
523: } else if(key=="routing_key"){
524: routing_key_c=value->as_string().cstr();
525: } else
526: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
527: }
528: }
529: }
530: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
531: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 532: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 533: }
534:
535: static void _unbind_queue(Request& r, MethodParams& params) {
536: VAmqp& self=GET_SELF(r, VAmqp);
537: const char* exchange_c=0;
538: const char* queue_c=0;
539: const char* routing_key_c="";
540: if(params.count()>0){
541: if(HashStringValue* options=params.as_hash(0)){
542: for(HashStringValue::Iterator i(*options); i; i.next()){
543: String::Body key=i.key();
544: Value* value=i.value();
545: if(key=="exchange"){
546: exchange_c=value->as_string().cstr();
547: } else if(key=="queue"){
548: queue_c=value->as_string().cstr();
549: } else if(key=="routing_key"){
550: routing_key_c=value->as_string().cstr();
551: } else
552: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
553: }
554: }
555: }
556: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
557: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 558: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 559: }
560:
561: static void _consume(Request& r, MethodParams& params) {
562: VAmqp& self=GET_SELF(r, VAmqp);
563: const char* queue_c=0;
564: const char* consumer_tag_c=0;
565: bool no_ack=true, nowait=false;
566: Junction* callback=0;
567:
1.2 moko 568: if(HashStringValue* options=params.as_hash(0)){
569: for(HashStringValue::Iterator i(*options); i; i.next()){
570: String::Body key=i.key();
571: Value* value=i.value();
1.3 moko 572: if(key=="callback"){
573: callback=value->get_junction();
574: } else if(key=="queue"){
1.2 moko 575: queue_c=value->as_string().cstr();
576: } else if(key=="consumer_tag"){
577: consumer_tag_c=value->as_string().cstr();
578: } else if(key=="no_ack"){
579: no_ack=r.process(*value).as_bool();
580: } else if(key=="nowait"){
581: nowait=r.process(*value).as_bool();
582: } else
583: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 584: }
585: }
586:
587: if(!queue_c) throw Exception("amqp", 0, "queue is required");
588: if(!callback) throw Exception("amqp", 0, "callback is required");
589:
590: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
591: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
592: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4 moko 593: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 594:
595: self.fstop=false;
596: while(!self.fstop){
1.2 moko 597: amqp_envelope_t envelope;
598: memset(&envelope, 0, sizeof(envelope));
599: amqp_maybe_release_buffers(self.connection());
1.1 moko 600: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
601: if(res.reply_type == AMQP_RESPONSE_NORMAL){
602: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2 moko 603: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1 moko 604: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2 moko 605: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
606: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1 moko 607:
608: Value *params_cb[]={&vh};
609: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
610: frame.store_params(params_cb, 1);
611: r.call(frame);
612: });
613:
614: amqp_destroy_envelope(&envelope);
615: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
616: continue;
617: } else {
618: break;
619: }
620: }
621: }
622:
623: static void _stop_consume(Request& r, MethodParams&) {
624: VAmqp& self=GET_SELF(r, VAmqp);
625: self.fstop=true;
626: }
627:
1.6 ! moko 628: #endif // WITH_AMQP
1.1 moko 629:
630: // constructor
631: MAmqp::MAmqp(): Methoded("amqp") {
632: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
633: #ifdef WITH_AMQP
634: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
635: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
636: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
637: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
638: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
639: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
640: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
641: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
642: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
643: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
644: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
645: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2 moko 646: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 647: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6 ! moko 648: #endif // WITH_AMQP
1.1 moko 649: }
E-mail: