Annotation of parser3/src/classes/amqp.C, revision 1.12
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
1.8 moko 13: #include "pa_varray.h"
1.1 moko 14: #include "pa_vbool.h"
15: #include "pa_vamqp.h"
16:
17: #ifdef WITH_AMQP
18: #include <amqp.h>
19: #include <amqp_tcp_socket.h>
1.5 moko 20: #include <amqp_ssl_socket.h>
1.1 moko 21: #include <amqp_framing.h>
22: #include <stdlib.h>
23: #include <string.h>
24: #endif
25:
1.12 ! moko 26: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.11 2026/01/09 03:30:39 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 27:
28: class MAmqp: public Methoded {
29: public: // VStateless_class
30: Value* create_new_value(Pool&) { return new VAmqp(); }
31: public:
32: MAmqp();
33: };
34:
35: DECLARE_CLASS_VAR(amqp, new MAmqp);
36:
1.6 moko 37: #ifdef WITH_AMQP
38:
39: static void status_check(int ret, const char *detail=""){
40: if(ret == AMQP_STATUS_OK)
41: return;
42:
43: const char* error_str = amqp_error_string2(ret);
44: if(error_str) {
45: throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
46: } else {
47: throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
48: }
49: }
50:
51: static void check(amqp_rpc_reply_t rr, const char *detail=""){
52: if(rr.reply_type == AMQP_RESPONSE_NORMAL)
53: return;
54:
55: // Extract error message from reply
56: const char* error_msg = 0;
57: size_t error_len = 0;
58: if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
59: if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
60: amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
61: if(m->reply_text.len > 0 && m->reply_text.bytes) {
62: error_msg = (const char*)m->reply_text.bytes;
63: error_len = m->reply_text.len;
64: }
65: } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
66: amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
67: if(m->reply_text.len > 0 && m->reply_text.bytes) {
68: error_msg = (const char*)m->reply_text.bytes;
69: error_len = m->reply_text.len;
70: }
71: }
72: }
73:
74: if(error_msg) {
75: throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
76: } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
77: status_check(rr.library_error, detail);
78: }
79:
80: throw Exception("amqp", 0, "%sfailed", detail);
81: }
82:
83: #endif // WITH_AMQP
84:
85:
1.1 moko 86: static void _create(Request& r, MethodParams& params) {
87: VAmqp& self=GET_SELF(r, VAmqp);
88:
89: #ifdef WITH_AMQP
90: const char* host_c = "localhost";
91: int port = 5672;
92: const char* user_c = "guest";
93: const char* pass_c = "guest";
94: const char* vhost_c = "/";
95: const char* locale_c = "en_US";
96: int heartbeat = 30; // seconds
1.5 moko 97: const char* tls_ca = 0;
98: const char* tls_cert = 0;
99: const char* tls_key = 0;
100: bool tls_specified = false;
101: bool tls_verify = true;
1.1 moko 102:
103: if(params.count()>0){
104: if(HashStringValue* options=params.as_hash(0)){
105: for(HashStringValue::Iterator i(*options); i; i.next()){
106: String::Body key=i.key();
107: Value* value=i.value();
108: if(key=="host"){
109: host_c=value->as_string().cstr();
110: } else if(key=="port"){
111: port=r.process(*value).as_int();
112: } else if(key=="user"){
113: user_c=value->as_string().cstr();
114: } else if(key=="password"){
115: pass_c=value->as_string().cstr();
116: } else if(key=="vhost"){
117: vhost_c=value->as_string().cstr();
118: } else if(key=="locale"){
119: locale_c=value->as_string().cstr();
120: } else if(key=="heartbeat"){
121: heartbeat=r.process(*value).as_int();
1.5 moko 122: } else if(key=="tls"){
123: tls_specified = true;
124: if(HashStringValue* tls_options=value->get_hash()){
125: for(HashStringValue::Iterator t(*tls_options); t; t.next()){
126: String::Body tkey=t.key();
127: Value* tval=t.value();
128: if(tkey=="ca"){
129: tls_ca=tval->as_string().cstr();
130: } else if(tkey=="cert"){
131: tls_cert=tval->as_string().cstr();
132: } else if(tkey=="key"){
133: tls_key=tval->as_string().cstr();
134: } else if(tkey=="verify"){
135: tls_verify=r.process(*tval).as_bool();
136: } else
137: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
138: }
139: }
1.1 moko 140: } else
141: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
142: }
143: }
144: }
145:
146: amqp_connection_state_t conn = amqp_new_connection();
1.5 moko 147: amqp_socket_t* socket = 0;
148:
149: if(tls_specified) {
150: socket = amqp_ssl_socket_new(conn);
151: if(!socket)
152: throw Exception("amqp", 0, "failed to create SSL socket");
153:
154: // Set CA certificate if provided
155: if(tls_ca)
156: if(amqp_ssl_socket_set_cacert(socket, tls_ca))
157: throw Exception("amqp", 0, "failed to set CA certificate");
1.6 moko 158:
1.5 moko 159: // Set client certificate and key if provided
160: if(tls_cert && tls_key) {
161: if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
162: throw Exception("amqp", 0, "failed to set client certificate/key");
163: } else if(tls_cert || tls_key) {
164: throw Exception("amqp", 0, "both cert and key must be specified for TLS");
165: }
1.6 moko 166:
1.5 moko 167: // If CA is provided, peer verification will use it
168: amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
169: // If verify=true, enable hostname verification
170: amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
171: } else {
172: socket = amqp_tcp_socket_new(conn);
173: if(!socket)
174: throw Exception("amqp", 0, "failed to create TCP socket");
175: }
176:
1.6 moko 177: status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1 moko 178:
179: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
180: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
181: amqp_destroy_connection(conn);
1.6 moko 182: check(rlogin, "login ");
1.1 moko 183: }
184:
185: int channel = 1;
186: amqp_channel_open(conn, channel);
187: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
188: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
189: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
190: amqp_destroy_connection(conn);
1.6 moko 191: check(ropen, "open channel ");
1.1 moko 192: }
193:
194: self.fconnection = conn;
195: self.fchannel = channel;
196: #else
197: (void)params; (void)self;
198: throw Exception("amqp", 0, "compiled without amqp support");
1.6 moko 199: #endif // WITH_AMQP
1.1 moko 200: }
201:
202: #ifdef WITH_AMQP
203:
1.2 moko 204: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
205: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
206:
1.1 moko 207: static void _publish(Request& r, MethodParams& params) {
208: VAmqp& self=GET_SELF(r, VAmqp);
209: const String &msg=params.as_string(0, "msg must be string");
210: const char* exchange_c = ""; // default exchange
211: const char* routing_key_c = 0;
212: bool mandatory=false;
213:
214: amqp_basic_properties_t props;
215: props._flags = 0;
216:
217: if(params.count()>1){
218: if(HashStringValue* options=params.as_hash(1)){
219: for(HashStringValue::Iterator i(*options); i; i.next()){
220: String::Body key=i.key();
221: Value* value=i.value();
222: if(key=="exchange"){
223: exchange_c=value->as_string().cstr();
224: } else if(key=="routing_key"){
225: routing_key_c=value->as_string().cstr();
226: } else if(key=="queue"){
1.3 moko 227: routing_key_c=value->as_string().cstr();
1.1 moko 228: } else if(key=="mandatory"){
229: mandatory=r.process(*value).as_bool();
1.3 moko 230: } else if(key=="content_type"){
231: const char* v=value->as_string().cstr();
232: props.content_type=amqp_cstring_bytes(v);
233: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
234: } else if(key=="content_encoding"){
235: const char* v=value->as_string().cstr();
236: props.content_encoding=amqp_cstring_bytes(v);
237: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
238: } else if(key=="delivery_mode"){
239: uint8_t dm=(uint8_t)value->as_int();
240: props.delivery_mode=dm;
241: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
242: } else if(key=="priority"){
243: uint8_t pr=(uint8_t)value->as_int();
244: props.priority=pr;
245: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
246: } else if(key=="correlation_id"){
247: const char* v=value->as_string().cstr();
248: props.correlation_id=amqp_cstring_bytes(v);
249: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
250: } else if(key=="reply_to"){
251: const char* v=value->as_string().cstr();
252: props.reply_to=amqp_cstring_bytes(v);
253: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
254: } else if(key=="expiration"){
255: const char* v=value->as_string().cstr();
256: props.expiration=amqp_cstring_bytes(v);
257: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
258: } else if(key=="message_id"){
259: const char* v=value->as_string().cstr();
260: props.message_id=amqp_cstring_bytes(v);
261: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
262: } else if(key=="timestamp"){
263: uint64_t ts=(uint64_t)value->as_double();
264: props.timestamp=ts;
265: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
266: } else if(key=="type"){
267: const char* v=value->as_string().cstr();
268: props.type=amqp_cstring_bytes(v);
269: props._flags|=AMQP_BASIC_TYPE_FLAG;
270: } else if(key=="user_id"){
271: const char* v=value->as_string().cstr();
272: props.user_id=amqp_cstring_bytes(v);
273: props._flags|=AMQP_BASIC_USER_ID_FLAG;
274: } else if(key=="app_id"){
275: const char* v=value->as_string().cstr();
276: props.app_id=amqp_cstring_bytes(v);
277: props._flags|=AMQP_BASIC_APP_ID_FLAG;
278: } else if(key=="headers"){
279: /* if(HashStringValue* hh=pval->get_hash()){
280: size_t count=hh->count();
281: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
282: size_t idx=0;
283: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
284: String::Body hkey=hi.key();
285: const char* hv=hi.value()->as_string().cstr();
286: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
287: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
288: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
289: idx++;
1.1 moko 290: }
1.3 moko 291: props.headers.num_entries=(int)count;
292: props.headers.entries=entries;
293: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 294: }
1.3 moko 295: */ } else
1.1 moko 296: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
297: }
298: }
299: }
300:
301: if(!routing_key_c)
302: throw Exception("amqp", 0, "routing_key or queue must be specified");
303:
304: amqp_bytes_t body;
305: body.len = msg.length();
306: body.bytes=(void*)msg.cstr();
307:
1.3 moko 308: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 309:
310: if(ret!=AMQP_STATUS_OK)
311: throw Exception("amqp", 0, "publish failed");
312:
313: // free temporary headers entries if allocated
314: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
315: // delete [] props.headers.entries;
316: }
317: }
318:
319: static void _release(Request& r, MethodParams&) {
320: VAmqp& self=GET_SELF(r, VAmqp);
321: if(self.fconnection){
322: amqp_connection_state_t conn=self.fconnection;
323: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
324: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
325: amqp_destroy_connection(conn);
326: self.fconnection=0;
327: self.fchannel=0;
328: }
329: }
330:
331: static void _ack(Request& r, MethodParams& params) {
332: VAmqp& self=GET_SELF(r, VAmqp);
1.7 moko 333: double tag=params.as_double(0, "delivery tag must be number", r);
334: int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0);
1.1 moko 335: if(ret!=AMQP_STATUS_OK)
336: throw Exception("amqp", 0, "ack failed");
337: }
338:
339: static void _nack(Request& r, MethodParams& params) {
340: VAmqp& self=GET_SELF(r, VAmqp);
341: const String &tag_s=params.as_string(0, "delivery tag must not be code");
342: bool requeue=false;
343: if(params.count()>1){
344: if(HashStringValue* options=params.as_hash(1)){
345: for(HashStringValue::Iterator i(*options); i; i.next()){
346: if(i.key()=="requeue"){
347: requeue=r.process(*i.value()).as_bool();
348: } else
349: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
350: }
351: }
352: }
353: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
354: if(ret!=AMQP_STATUS_OK)
355: throw Exception("amqp", 0, "nack failed");
356: }
357:
358: static void _qos(Request& r, MethodParams& params) {
359: VAmqp& self=GET_SELF(r, VAmqp);
360: uint16_t prefetch_count=0;
361: if(params.count()>0){
362: if(HashStringValue* options=params.as_hash(0)){
363: for(HashStringValue::Iterator i(*options); i; i.next()){
364: if(i.key()=="prefetch_count"){
365: int pc=r.process(*i.value()).as_int();
366: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
367: } else
368: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
369: }
370: }
371: }
372: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
373: if(!ret)
374: throw Exception("amqp", 0, "qos failed");
375: }
376:
377: static void _reject(Request& r, MethodParams& params) {
378: VAmqp& self=GET_SELF(r, VAmqp);
379: const String &tag_s=params.as_string(0, "delivery tag must not be code");
380: bool requeue=true; // by default return to queue
381: if(params.count()>1){
382: if(HashStringValue* options = params.as_hash(1)){
383: for(HashStringValue::Iterator i(*options); i; i.next()){
384: if(i.key() == "requeue"){
385: requeue=r.process(*i.value()).as_bool();
386: } else
387: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
388: }
389: }
390: }
391: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
392: if(ret!=AMQP_STATUS_OK)
393: throw Exception("amqp", 0, "reject failed");
394: }
395:
1.8 moko 396: static void _declare(Request& r, MethodParams& params) {
1.1 moko 397: VAmqp& self=GET_SELF(r, VAmqp);
1.8 moko 398: const char* exchange_c = 0;
399: const char* queue_c = 0;
1.1 moko 400: const char* type_c = "direct";
1.12 ! moko 401: bool passive=false, durable=false, auto_delete=false, internal=false, exclusive=false;
1.8 moko 402: if(HashStringValue* options=params.as_hash(0)){
403: for(HashStringValue::Iterator i(*options); i; i.next()){
404: String::Body key=i.key();
405: Value* value=i.value();
406: if(key=="exchange"){
407: exchange_c=value->as_string().cstr();
408: } else if(key=="queue"){
409: queue_c=value->as_string().cstr();
410: } else if(key=="type"){
411: type_c=value->as_string().cstr();
412: } else if(key=="passive"){
413: passive=r.process(*value).as_bool();
414: } else if(key=="durable"){
415: durable=r.process(*value).as_bool();
416: } else if(key=="auto_delete"){
417: auto_delete=r.process(*value).as_bool();
1.11 moko 418: } else if(key=="internal"){
419: internal=r.process(*value).as_bool();
420: } else if(key=="exclusive"){
421: exclusive=r.process(*value).as_bool();
1.8 moko 422: } else
423: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 424: }
425: }
1.8 moko 426: if(!exchange_c && !queue_c)
427: throw Exception("amqp", 0, "exchange or queue must be specified");
428:
429: if(exchange_c){
1.11 moko 430: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, internal, amqp_empty_table);
1.8 moko 431: check(amqp_get_rpc_reply(self.connection()));
432: }
1.1 moko 433:
1.8 moko 434: if(queue_c){
1.11 moko 435: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), *queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, exclusive, auto_delete, amqp_empty_table);
1.8 moko 436: check(amqp_get_rpc_reply(self.connection()));
437: if(!*queue_c && ok){
438: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 439: }
440: }
441: }
442:
1.8 moko 443: static void _delete(Request& r, MethodParams& params) {
1.1 moko 444: VAmqp& self=GET_SELF(r, VAmqp);
1.8 moko 445: const char* exchange_c = 0;
446: const char* queue_c = 0;
1.11 moko 447: bool if_unused=false, if_empty=false;
1.8 moko 448: if(HashStringValue* options=params.as_hash(0)){
449: for(HashStringValue::Iterator i(*options); i; i.next()){
450: String::Body key=i.key();
451: Value* value=i.value();
452: if(key=="exchange"){
453: exchange_c=value->as_string().cstr();
454: } else if(key=="queue"){
455: queue_c=value->as_string().cstr();
456: } else if(key=="if_unused"){
457: if_unused=r.process(*value).as_bool();
458: } else if(key=="if_empty"){
459: if_empty=r.process(*value).as_bool();
460: } else
461: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 462: }
463: }
1.8 moko 464: if(!exchange_c && !queue_c)
465: throw Exception("amqp", 0, "exchange or queue must be specified");
466:
467: if(exchange_c){
468: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), if_unused);
469: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 470: }
471:
1.8 moko 472: if(queue_c){
473: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
474: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 475: }
476: }
477:
1.8 moko 478: static void _bind(Request& r, MethodParams& params) {
1.1 moko 479: VAmqp& self=GET_SELF(r, VAmqp);
480: const char* exchange_c=0;
481: const char* queue_c=0;
482: const char* routing_key_c="";
1.8 moko 483: if(HashStringValue* options=params.as_hash(0)){
484: for(HashStringValue::Iterator i(*options); i; i.next()){
485: String::Body key=i.key();
486: Value* value=i.value();
487: if(key=="exchange"){
488: exchange_c=value->as_string().cstr();
489: } else if(key=="queue"){
490: queue_c=value->as_string().cstr();
491: } else if(key=="routing_key"){
492: routing_key_c=value->as_string().cstr();
493: } else
494: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 495: }
496: }
497: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
498: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 499: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 500: }
501:
1.8 moko 502: static void _unbind(Request& r, MethodParams& params) {
1.1 moko 503: VAmqp& self=GET_SELF(r, VAmqp);
504: const char* exchange_c=0;
505: const char* queue_c=0;
506: const char* routing_key_c="";
1.8 moko 507: if(HashStringValue* options=params.as_hash(0)){
508: for(HashStringValue::Iterator i(*options); i; i.next()){
509: String::Body key=i.key();
510: Value* value=i.value();
511: if(key=="exchange"){
512: exchange_c=value->as_string().cstr();
513: } else if(key=="queue"){
514: queue_c=value->as_string().cstr();
515: } else if(key=="routing_key"){
516: routing_key_c=value->as_string().cstr();
517: } else
518: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 519: }
520: }
521: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
522: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 523: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 524: }
525:
1.8 moko 526: static void _purge(Request& r, MethodParams& params) {
527: VAmqp& self=GET_SELF(r, VAmqp);
528: const char* queue_c = 0;
529: if(HashStringValue* options=params.as_hash(0)){
530: for(HashStringValue::Iterator i(*options); i; i.next()){
531: String::Body key=i.key();
532: Value* value=i.value();
533: if(key=="queue"){
534: queue_c=value->as_string().cstr();
535: } else
536: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
537: }
538: }
539: if(!queue_c)
540: throw Exception("amqp", 0, "queue must be specified");
541:
1.9 moko 542: amqp_queue_purge_ok_t *ok = amqp_queue_purge(self.connection(), self.channel(), amqp_cstring_bytes(queue_c));
1.8 moko 543: check(amqp_get_rpc_reply(self.connection()));
1.9 moko 544: r.write(*new VInt(ok ? ok->message_count : 0));
1.8 moko 545: }
546:
547: static void _info(Request& r, MethodParams& params) {
548: VAmqp& self = GET_SELF(r, VAmqp);
549: const char* queue_c = 0;
550:
551: if (HashStringValue* options=params.as_hash(0)) {
552: for (HashStringValue::Iterator i(*options); i; i.next()) {
553: String::Body key=i.key();
554: Value* value=i.value();
555: if(key=="queue"){
556: queue_c=value->as_string().cstr();
557: } else {
558: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
559: }
560: }
561: }
562:
563: if (!queue_c)
564: throw Exception("amqp", 0, "queue must be specified");
565:
566: amqp_queue_declare_ok_t* ok = amqp_queue_declare(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), /*passive*/ 1, 0, 0, 0, amqp_empty_table);
567: check(amqp_get_rpc_reply(self.connection()));
568:
569: Value& result=*new VHash;
1.9 moko 570: if(ok){
571: result.put_element(*new String("queue"), AMQP_VSTRING(ok->queue.bytes, ok->queue.len));
572: result.put_element(*new String("messages"), new VInt(ok->message_count));
573: result.put_element(*new String("consumers"), new VInt(ok->consumer_count));
574: }
1.8 moko 575: r.write(result);
576: }
577:
578: static VHash *amqp_message_hash(amqp_envelope_t &envelope) {
579: VHash *result=new VHash;
580: HashStringValue* h=result->get_hash();
581: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
582: h->put("delivery_tag", new VInt(envelope.delivery_tag));
583: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
584: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
585: return result;
586: }
587:
1.1 moko 588: static void _consume(Request& r, MethodParams& params) {
589: VAmqp& self=GET_SELF(r, VAmqp);
590: const char* queue_c=0;
591: const char* consumer_tag_c=0;
1.10 moko 592: bool no_ack=true, exclusive=false;
1.8 moko 593: int count=1;
1.1 moko 594: Junction* callback=0;
595:
1.2 moko 596: if(HashStringValue* options=params.as_hash(0)){
597: for(HashStringValue::Iterator i(*options); i; i.next()){
598: String::Body key=i.key();
599: Value* value=i.value();
1.3 moko 600: if(key=="callback"){
601: callback=value->get_junction();
602: } else if(key=="queue"){
1.2 moko 603: queue_c=value->as_string().cstr();
604: } else if(key=="consumer_tag"){
605: consumer_tag_c=value->as_string().cstr();
606: } else if(key=="no_ack"){
607: no_ack=r.process(*value).as_bool();
1.10 moko 608: } else if(key=="exclusive"){
609: exclusive=r.process(*value).as_bool();
1.8 moko 610: } else if(key=="count"){
611: count=r.process(*value).as_int();
1.2 moko 612: } else
613: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 614: }
615: }
616:
1.8 moko 617: if(!queue_c) throw Exception("amqp", 0, "queue must be specified");
1.1 moko 618:
1.9 moko 619: amqp_basic_consume_ok_t *ok = amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
1.1 moko 620: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
1.10 moko 621: 0 /*no_local*/, no_ack, exclusive, amqp_empty_table);
1.4 moko 622: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 623:
1.8 moko 624: if(callback){
625: self.fstop=false;
626: while(!self.fstop){
627: amqp_envelope_t envelope;
628: memset(&envelope, 0, sizeof(envelope));
629: amqp_maybe_release_buffers(self.connection());
630: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
631: if(res.reply_type == AMQP_RESPONSE_NORMAL){
632: VHash *vh=amqp_message_hash(envelope);
633: Value *params_cb[]={vh};
634: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
635: frame.store_params(params_cb, 1);
636: r.call(frame);
637: });
638: amqp_destroy_envelope(&envelope);
639: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
640: continue;
641: } else {
642: break;
643: }
644: }
645: } else {
646: VArray& result=*new VArray();
647: ArrayValue& result_array=result.array();
648:
649: for(int i=0; i<count; i++){
650: amqp_envelope_t envelope;
651: memset(&envelope, 0, sizeof(envelope));
652: amqp_maybe_release_buffers(self.connection());
653: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
654: if(res.reply_type == AMQP_RESPONSE_NORMAL){
655: result_array+=amqp_message_hash(envelope);
656: amqp_destroy_envelope(&envelope);
657: } else {
658: check(res);
659: }
1.1 moko 660: }
1.8 moko 661: r.write(result);
1.1 moko 662: }
1.9 moko 663:
664: if(ok){
665: amqp_basic_cancel(self.connection(), self.channel(), ok->consumer_tag);
666: check(amqp_get_rpc_reply(self.connection()));
667: }
1.1 moko 668: }
669:
670: static void _stop_consume(Request& r, MethodParams&) {
671: VAmqp& self=GET_SELF(r, VAmqp);
672: self.fstop=true;
673: }
674:
1.6 moko 675: #endif // WITH_AMQP
1.1 moko 676:
677: // constructor
678: MAmqp::MAmqp(): Methoded("amqp") {
679: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
680: #ifdef WITH_AMQP
681: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
682: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
683: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
684: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
685: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
686: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
1.8 moko 687: add_native_method("declare", Method::CT_DYNAMIC, _declare, 1, 1);
688: add_native_method("delete", Method::CT_DYNAMIC, _delete, 1, 1);
689: add_native_method("bind", Method::CT_DYNAMIC, _bind, 1, 1);
690: add_native_method("unbind", Method::CT_DYNAMIC, _unbind, 1, 1);
691: add_native_method("purge", Method::CT_DYNAMIC, _purge, 1, 1);
692: add_native_method("info", Method::CT_DYNAMIC, _info, 1, 1);
1.2 moko 693: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 694: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6 moko 695: #endif // WITH_AMQP
1.1 moko 696: }
E-mail: