Annotation of parser3/src/classes/amqp.C, revision 1.5
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
13: #include "pa_vbool.h"
14: #include "pa_vamqp.h"
15:
16: #ifdef WITH_AMQP
17: #include <amqp.h>
18: #include <amqp_tcp_socket.h>
1.5 ! moko 19: #include <amqp_ssl_socket.h>
1.1 moko 20: #include <amqp_framing.h>
21: #include <stdlib.h>
22: #include <string.h>
23: #endif
24:
1.5 ! moko 25: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.4 2025/11/07 23:00:11 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 26:
27: class MAmqp: public Methoded {
28: public: // VStateless_class
29: Value* create_new_value(Pool&) { return new VAmqp(); }
30: public:
31: MAmqp();
32: };
33:
34: DECLARE_CLASS_VAR(amqp, new MAmqp);
35:
36: static void _create(Request& r, MethodParams& params) {
37: VAmqp& self=GET_SELF(r, VAmqp);
38:
39: #ifdef WITH_AMQP
40: const char* host_c = "localhost";
41: int port = 5672;
42: const char* user_c = "guest";
43: const char* pass_c = "guest";
44: const char* vhost_c = "/";
45: const char* locale_c = "en_US";
46: int heartbeat = 30; // seconds
1.5 ! moko 47: const char* tls_ca = 0;
! 48: const char* tls_cert = 0;
! 49: const char* tls_key = 0;
! 50: bool tls_specified = false;
! 51: bool tls_verify = true;
1.1 moko 52:
53: if(params.count()>0){
54: if(HashStringValue* options=params.as_hash(0)){
55: for(HashStringValue::Iterator i(*options); i; i.next()){
56: String::Body key=i.key();
57: Value* value=i.value();
58: if(key=="host"){
59: host_c=value->as_string().cstr();
60: } else if(key=="port"){
61: port=r.process(*value).as_int();
62: } else if(key=="user"){
63: user_c=value->as_string().cstr();
64: } else if(key=="password"){
65: pass_c=value->as_string().cstr();
66: } else if(key=="vhost"){
67: vhost_c=value->as_string().cstr();
68: } else if(key=="locale"){
69: locale_c=value->as_string().cstr();
70: } else if(key=="heartbeat"){
71: heartbeat=r.process(*value).as_int();
1.5 ! moko 72: } else if(key=="tls"){
! 73: tls_specified = true;
! 74: if(HashStringValue* tls_options=value->get_hash()){
! 75: for(HashStringValue::Iterator t(*tls_options); t; t.next()){
! 76: String::Body tkey=t.key();
! 77: Value* tval=t.value();
! 78: if(tkey=="ca"){
! 79: tls_ca=tval->as_string().cstr();
! 80: } else if(tkey=="cert"){
! 81: tls_cert=tval->as_string().cstr();
! 82: } else if(tkey=="key"){
! 83: tls_key=tval->as_string().cstr();
! 84: } else if(tkey=="verify"){
! 85: tls_verify=r.process(*tval).as_bool();
! 86: } else
! 87: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 88: }
! 89: }
1.1 moko 90: } else
91: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
92: }
93: }
94: }
95:
96: amqp_connection_state_t conn = amqp_new_connection();
1.5 ! moko 97: amqp_socket_t* socket = 0;
! 98:
! 99: if(tls_specified) {
! 100: socket = amqp_ssl_socket_new(conn);
! 101: if(!socket)
! 102: throw Exception("amqp", 0, "failed to create SSL socket");
! 103:
! 104: // Set CA certificate if provided
! 105: if(tls_ca)
! 106: if(amqp_ssl_socket_set_cacert(socket, tls_ca))
! 107: throw Exception("amqp", 0, "failed to set CA certificate");
! 108:
! 109: // Set client certificate and key if provided
! 110: if(tls_cert && tls_key) {
! 111: if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
! 112: throw Exception("amqp", 0, "failed to set client certificate/key");
! 113: } else if(tls_cert || tls_key) {
! 114: throw Exception("amqp", 0, "both cert and key must be specified for TLS");
! 115: }
! 116:
! 117: // If CA is provided, peer verification will use it
! 118: amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
! 119: // If verify=true, enable hostname verification
! 120: amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
! 121: } else {
! 122: socket = amqp_tcp_socket_new(conn);
! 123: if(!socket)
! 124: throw Exception("amqp", 0, "failed to create TCP socket");
! 125: }
! 126:
1.1 moko 127: if(amqp_socket_open(socket, host_c, port))
1.5 ! moko 128: throw Exception("amqp", 0, "failed to open socket");
1.1 moko 129:
130: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
131: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
132: amqp_destroy_connection(conn);
133: throw Exception("amqp", 0, "login failed");
134: }
135:
136: int channel = 1;
137: amqp_channel_open(conn, channel);
138: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
139: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
140: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
141: amqp_destroy_connection(conn);
142: throw Exception("amqp", 0, "channel open failed");
143: }
144:
145: self.fconnection = conn;
146: self.fchannel = channel;
147: #else
148: (void)params; (void)self;
149: throw Exception("amqp", 0, "compiled without amqp support");
150: #endif
151: }
152:
153: #ifdef WITH_AMQP
154:
1.4 moko 155: static void check(amqp_rpc_reply_t rr){
156: if(rr.reply_type == AMQP_RESPONSE_NORMAL)
157: return;
158:
159: // Extract error message from reply
160: const char* error_msg = 0;
161: size_t error_len = 0;
162: if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
163: if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
164: amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
165: if(m->reply_text.len > 0 && m->reply_text.bytes) {
166: error_msg = (const char*)m->reply_text.bytes;
167: error_len = m->reply_text.len;
168: }
169: } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
170: amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
171: if(m->reply_text.len > 0 && m->reply_text.bytes) {
172: error_msg = (const char*)m->reply_text.bytes;
173: error_len = m->reply_text.len;
174: }
175: }
176: }
177:
178: if(error_msg) {
179: throw Exception("amqp", 0, "failed: %.*s", (int)error_len, error_msg);
180: } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
181: throw Exception("amqp", 0, "failed: library error %d", rr.library_error);
182: } else {
183: throw Exception("amqp", 0, "failed");
184: }
1.1 moko 185: }
186:
1.2 moko 187: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
188: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
189:
1.1 moko 190: static void _publish(Request& r, MethodParams& params) {
191: VAmqp& self=GET_SELF(r, VAmqp);
192: const String &msg=params.as_string(0, "msg must be string");
193: const char* exchange_c = ""; // default exchange
194: const char* routing_key_c = 0;
195: bool mandatory=false;
196:
197: amqp_basic_properties_t props;
198: props._flags = 0;
199:
200: if(params.count()>1){
201: if(HashStringValue* options=params.as_hash(1)){
202: for(HashStringValue::Iterator i(*options); i; i.next()){
203: String::Body key=i.key();
204: Value* value=i.value();
205: if(key=="exchange"){
206: exchange_c=value->as_string().cstr();
207: } else if(key=="routing_key"){
208: routing_key_c=value->as_string().cstr();
209: } else if(key=="queue"){
1.3 moko 210: routing_key_c=value->as_string().cstr();
1.1 moko 211: } else if(key=="mandatory"){
212: mandatory=r.process(*value).as_bool();
1.3 moko 213: } else if(key=="content_type"){
214: const char* v=value->as_string().cstr();
215: props.content_type=amqp_cstring_bytes(v);
216: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
217: } else if(key=="content_encoding"){
218: const char* v=value->as_string().cstr();
219: props.content_encoding=amqp_cstring_bytes(v);
220: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
221: } else if(key=="delivery_mode"){
222: uint8_t dm=(uint8_t)value->as_int();
223: props.delivery_mode=dm;
224: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
225: } else if(key=="priority"){
226: uint8_t pr=(uint8_t)value->as_int();
227: props.priority=pr;
228: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
229: } else if(key=="correlation_id"){
230: const char* v=value->as_string().cstr();
231: props.correlation_id=amqp_cstring_bytes(v);
232: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
233: } else if(key=="reply_to"){
234: const char* v=value->as_string().cstr();
235: props.reply_to=amqp_cstring_bytes(v);
236: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
237: } else if(key=="expiration"){
238: const char* v=value->as_string().cstr();
239: props.expiration=amqp_cstring_bytes(v);
240: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
241: } else if(key=="message_id"){
242: const char* v=value->as_string().cstr();
243: props.message_id=amqp_cstring_bytes(v);
244: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
245: } else if(key=="timestamp"){
246: uint64_t ts=(uint64_t)value->as_double();
247: props.timestamp=ts;
248: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
249: } else if(key=="type"){
250: const char* v=value->as_string().cstr();
251: props.type=amqp_cstring_bytes(v);
252: props._flags|=AMQP_BASIC_TYPE_FLAG;
253: } else if(key=="user_id"){
254: const char* v=value->as_string().cstr();
255: props.user_id=amqp_cstring_bytes(v);
256: props._flags|=AMQP_BASIC_USER_ID_FLAG;
257: } else if(key=="app_id"){
258: const char* v=value->as_string().cstr();
259: props.app_id=amqp_cstring_bytes(v);
260: props._flags|=AMQP_BASIC_APP_ID_FLAG;
261: } else if(key=="headers"){
262: /* if(HashStringValue* hh=pval->get_hash()){
263: size_t count=hh->count();
264: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
265: size_t idx=0;
266: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
267: String::Body hkey=hi.key();
268: const char* hv=hi.value()->as_string().cstr();
269: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
270: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
271: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
272: idx++;
1.1 moko 273: }
1.3 moko 274: props.headers.num_entries=(int)count;
275: props.headers.entries=entries;
276: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 277: }
1.3 moko 278: */ } else
1.1 moko 279: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
280: }
281: }
282: }
283:
284: if(!routing_key_c)
285: throw Exception("amqp", 0, "routing_key or queue must be specified");
286:
287: amqp_bytes_t body;
288: body.len = msg.length();
289: body.bytes=(void*)msg.cstr();
290:
1.3 moko 291: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 292:
293: if(ret!=AMQP_STATUS_OK)
294: throw Exception("amqp", 0, "publish failed");
295:
296: // free temporary headers entries if allocated
297: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
298: // delete [] props.headers.entries;
299: }
300: }
301:
302: static void _release(Request& r, MethodParams&) {
303: VAmqp& self=GET_SELF(r, VAmqp);
304: if(self.fconnection){
305: amqp_connection_state_t conn=self.fconnection;
306: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
307: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
308: amqp_destroy_connection(conn);
309: self.fconnection=0;
310: self.fchannel=0;
311: }
312: }
313:
314: static void _ack(Request& r, MethodParams& params) {
315: VAmqp& self=GET_SELF(r, VAmqp);
316: const String &tag_s=params.as_string(0, "delivery tag must not be code");
317: int ret = amqp_basic_ack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0);
318: if(ret!=AMQP_STATUS_OK)
319: throw Exception("amqp", 0, "ack failed");
320: }
321:
322: static void _nack(Request& r, MethodParams& params) {
323: VAmqp& self=GET_SELF(r, VAmqp);
324: const String &tag_s=params.as_string(0, "delivery tag must not be code");
325: bool requeue=false;
326: if(params.count()>1){
327: if(HashStringValue* options=params.as_hash(1)){
328: for(HashStringValue::Iterator i(*options); i; i.next()){
329: if(i.key()=="requeue"){
330: requeue=r.process(*i.value()).as_bool();
331: } else
332: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
333: }
334: }
335: }
336: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
337: if(ret!=AMQP_STATUS_OK)
338: throw Exception("amqp", 0, "nack failed");
339: }
340:
341: static void _qos(Request& r, MethodParams& params) {
342: VAmqp& self=GET_SELF(r, VAmqp);
343: uint16_t prefetch_count=0;
344: if(params.count()>0){
345: if(HashStringValue* options=params.as_hash(0)){
346: for(HashStringValue::Iterator i(*options); i; i.next()){
347: if(i.key()=="prefetch_count"){
348: int pc=r.process(*i.value()).as_int();
349: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
350: } else
351: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
352: }
353: }
354: }
355: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
356: if(!ret)
357: throw Exception("amqp", 0, "qos failed");
358: }
359:
360: static void _reject(Request& r, MethodParams& params) {
361: VAmqp& self=GET_SELF(r, VAmqp);
362: const String &tag_s=params.as_string(0, "delivery tag must not be code");
363: bool requeue=true; // by default return to queue
364: if(params.count()>1){
365: if(HashStringValue* options = params.as_hash(1)){
366: for(HashStringValue::Iterator i(*options); i; i.next()){
367: if(i.key() == "requeue"){
368: requeue=r.process(*i.value()).as_bool();
369: } else
370: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
371: }
372: }
373: }
374: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
375: if(ret!=AMQP_STATUS_OK)
376: throw Exception("amqp", 0, "reject failed");
377: }
378:
379: static void _declare_exchange(Request& r, MethodParams& params) {
380: VAmqp& self=GET_SELF(r, VAmqp);
381: const char* name_c = 0;
382: const char* type_c = "direct";
383: bool passive=false, durable=false, auto_delete=true, nowait=false;
384: if(params.count()>0){
385: if(HashStringValue* options=params.as_hash(0)){
386: for(HashStringValue::Iterator i(*options); i; i.next()){
387: String::Body key=i.key();
388: Value* value=i.value();
389: if(key=="name"){
390: name_c=value->as_string().cstr();
391: } else if(key=="type"){
392: type_c=value->as_string().cstr();
393: } else if(key=="passive"){
394: passive=r.process(*value).as_bool();
395: } else if(key=="durable"){
396: durable=r.process(*value).as_bool();
397: } else if(key=="auto_delete"){
398: auto_delete=r.process(*value).as_bool();
399: } else if(key=="nowait"){
400: nowait=r.process(*value).as_bool();
401: } else
402: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
403: }
404: }
405: }
406: if(!name_c)
407: throw Exception("amqp", 0, "name is required");
408: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(name_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 moko 409: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 410: }
411:
412: static void _delete_exchange(Request& r, MethodParams& params) {
413: VAmqp& self=GET_SELF(r, VAmqp);
414: const char* name_c = 0;
415: bool if_unused=false, nowait=false;
416: if(params.count()>0){
417: if(HashStringValue* options=params.as_hash(0)){
418: for(HashStringValue::Iterator i(*options); i; i.next()){
419: String::Body key=i.key();
420: Value* value=i.value();
421: if(key=="exchange"){
422: name_c=value->as_string().cstr();
423: } else if(key=="if_unused"){
424: if_unused=r.process(*value).as_bool();
425: } else if(key=="nowait"){
426: nowait=r.process(*value).as_bool();
427: } else
428: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
429: }
430: }
431: }
432: if(!name_c) throw Exception("amqp", 0, "exchange is required");
433: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(name_c), if_unused);
1.4 moko 434: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 435: }
436:
437: static void _declare_queue(Request& r, MethodParams& params) {
438: VAmqp& self=GET_SELF(r, VAmqp);
439: const char* queue_c = 0; bool passive=false, durable=false, auto_delete=true, nowait=false;
440: if(params.count()>0){
441: if(HashStringValue* options=params.as_hash(0)){
442: for(HashStringValue::Iterator i(*options); i; i.next()){
443: String::Body key=i.key();
444: Value* value=i.value();
445: if(key=="queue"){
446: queue_c=value->as_string().cstr();
447: } else if(key=="passive"){
448: passive=r.process(*value).as_bool();
449: } else if(key=="durable"){
450: durable=r.process(*value).as_bool();
451: } else if(key=="auto_delete"){
452: auto_delete=r.process(*value).as_bool();
453: } else if(key=="nowait"){
454: nowait=r.process(*value).as_bool();
455: } else
456: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
457: }
458: }
459: }
460: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
1.4 moko 461: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 462: if(!queue_c && ok){
1.2 moko 463: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 464: }
465: }
466:
467: static void _delete_queue(Request& r, MethodParams& params) {
468: VAmqp& self=GET_SELF(r, VAmqp);
469: const char* queue_c = 0; bool if_unused=false, if_empty=false, nowait=false;
470: if(params.count()>0){
471: if(HashStringValue* options=params.as_hash(0)){
472: for(HashStringValue::Iterator i(*options); i; i.next()){
473: String::Body key=i.key();
474: Value* value=i.value();
475: if(key=="queue"){
476: queue_c=value->as_string().cstr();
477: } else if(key=="if_unused"){
478: if_unused=r.process(*value).as_bool();
479: } else if(key=="if_empty"){
480: if_empty=r.process(*value).as_bool();
481: } else if(key=="nowait"){
482: nowait=r.process(*value).as_bool();
483: } else
484: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
485: }
486: }
487: }
488: if(!queue_c) throw Exception("amqp", 0, "queue is required");
489: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
1.4 moko 490: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 491: }
492:
493: static void _bind_queue(Request& r, MethodParams& params) {
494: VAmqp& self=GET_SELF(r, VAmqp);
495: const char* exchange_c=0;
496: const char* queue_c=0;
497: const char* routing_key_c="";
498: if(params.count()>0){
499: if(HashStringValue* options=params.as_hash(0)){
500: for(HashStringValue::Iterator i(*options); i; i.next()){
501: String::Body key=i.key();
502: Value* value=i.value();
503: if(key=="exchange"){
504: exchange_c=value->as_string().cstr();
505: } else if(key=="queue"){
506: queue_c=value->as_string().cstr();
507: } else if(key=="routing_key"){
508: routing_key_c=value->as_string().cstr();
509: } else
510: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
511: }
512: }
513: }
514: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
515: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 516: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 517: }
518:
519: static void _unbind_queue(Request& r, MethodParams& params) {
520: VAmqp& self=GET_SELF(r, VAmqp);
521: const char* exchange_c=0;
522: const char* queue_c=0;
523: const char* routing_key_c="";
524: if(params.count()>0){
525: if(HashStringValue* options=params.as_hash(0)){
526: for(HashStringValue::Iterator i(*options); i; i.next()){
527: String::Body key=i.key();
528: Value* value=i.value();
529: if(key=="exchange"){
530: exchange_c=value->as_string().cstr();
531: } else if(key=="queue"){
532: queue_c=value->as_string().cstr();
533: } else if(key=="routing_key"){
534: routing_key_c=value->as_string().cstr();
535: } else
536: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
537: }
538: }
539: }
540: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
541: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 542: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 543: }
544:
545: static void _consume(Request& r, MethodParams& params) {
546: VAmqp& self=GET_SELF(r, VAmqp);
547: const char* queue_c=0;
548: const char* consumer_tag_c=0;
549: bool no_ack=true, nowait=false;
550: Junction* callback=0;
551:
1.2 moko 552: if(HashStringValue* options=params.as_hash(0)){
553: for(HashStringValue::Iterator i(*options); i; i.next()){
554: String::Body key=i.key();
555: Value* value=i.value();
1.3 moko 556: if(key=="callback"){
557: callback=value->get_junction();
558: } else if(key=="queue"){
1.2 moko 559: queue_c=value->as_string().cstr();
560: } else if(key=="consumer_tag"){
561: consumer_tag_c=value->as_string().cstr();
562: } else if(key=="no_ack"){
563: no_ack=r.process(*value).as_bool();
564: } else if(key=="nowait"){
565: nowait=r.process(*value).as_bool();
566: } else
567: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 568: }
569: }
570:
571: if(!queue_c) throw Exception("amqp", 0, "queue is required");
572: if(!callback) throw Exception("amqp", 0, "callback is required");
573:
574: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
575: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
576: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4 moko 577: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 578:
579: self.fstop=false;
580: while(!self.fstop){
1.2 moko 581: amqp_envelope_t envelope;
582: memset(&envelope, 0, sizeof(envelope));
583: amqp_maybe_release_buffers(self.connection());
1.1 moko 584: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
585: if(res.reply_type == AMQP_RESPONSE_NORMAL){
586: VHash &vh=*new VHash; HashStringValue* h=vh.get_hash();
1.2 moko 587: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
1.1 moko 588: h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag)));
1.2 moko 589: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
590: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
1.1 moko 591:
592: Value *params_cb[]={&vh};
593: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
594: frame.store_params(params_cb, 1);
595: r.call(frame);
596: });
597:
598: amqp_destroy_envelope(&envelope);
599: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
600: continue;
601: } else {
602: break;
603: }
604: }
605: }
606:
607: static void _stop_consume(Request& r, MethodParams&) {
608: VAmqp& self=GET_SELF(r, VAmqp);
609: self.fstop=true;
610: }
611:
612: #endif
613:
614: // constructor
615: MAmqp::MAmqp(): Methoded("amqp") {
616: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
617: #ifdef WITH_AMQP
618: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
619: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
620: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
621: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
622: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
623: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
624: add_native_method("declare_exchange", Method::CT_DYNAMIC, _declare_exchange, 0, 1);
625: add_native_method("delete_exchange", Method::CT_DYNAMIC, _delete_exchange, 0, 1);
626: add_native_method("declare_queue", Method::CT_DYNAMIC, _declare_queue, 0, 1);
627: add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1);
628: add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1);
629: add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1);
1.2 moko 630: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 631: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
632: #endif
633: }
E-mail: