Annotation of parser3/src/classes/amqp.C, revision 1.8
1.1 moko 1: /** @file
2: Parser: @b amqp parser class.
3:
4: Copyright (c) 2001-2025 Art. Lebedev Studio (http://www.artlebedev.com)
5: Authors: Konstantin Morshnev <moko@design.ru>
6: */
7:
8: #include "pa_vmethod_frame.h"
9:
10: #include "pa_request.h"
11: #include "pa_vstring.h"
12: #include "pa_vhash.h"
1.8 ! moko 13: #include "pa_varray.h"
1.1 moko 14: #include "pa_vbool.h"
15: #include "pa_vamqp.h"
16:
17: #ifdef WITH_AMQP
18: #include <amqp.h>
19: #include <amqp_tcp_socket.h>
1.5 moko 20: #include <amqp_ssl_socket.h>
1.1 moko 21: #include <amqp_framing.h>
22: #include <stdlib.h>
23: #include <string.h>
24: #endif
25:
1.8 ! moko 26: volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.7 2025/11/22 15:36:40 moko Exp $" IDENT_PA_VAMQP_H;
1.1 moko 27:
28: class MAmqp: public Methoded {
29: public: // VStateless_class
30: Value* create_new_value(Pool&) { return new VAmqp(); }
31: public:
32: MAmqp();
33: };
34:
35: DECLARE_CLASS_VAR(amqp, new MAmqp);
36:
1.6 moko 37: #ifdef WITH_AMQP
38:
39: static void status_check(int ret, const char *detail=""){
40: if(ret == AMQP_STATUS_OK)
41: return;
42:
43: const char* error_str = amqp_error_string2(ret);
44: if(error_str) {
45: throw Exception("amqp", 0, "%sfailed: %s", detail, error_str);
46: } else {
47: throw Exception("amqp", 0, "%sfailed: error %d", detail, ret);
48: }
49: }
50:
51: static void check(amqp_rpc_reply_t rr, const char *detail=""){
52: if(rr.reply_type == AMQP_RESPONSE_NORMAL)
53: return;
54:
55: // Extract error message from reply
56: const char* error_msg = 0;
57: size_t error_len = 0;
58: if(rr.reply_type == AMQP_RESPONSE_SERVER_EXCEPTION) {
59: if(rr.reply.id == AMQP_CHANNEL_CLOSE_METHOD) {
60: amqp_channel_close_t *m = (amqp_channel_close_t *)rr.reply.decoded;
61: if(m->reply_text.len > 0 && m->reply_text.bytes) {
62: error_msg = (const char*)m->reply_text.bytes;
63: error_len = m->reply_text.len;
64: }
65: } else if(rr.reply.id == AMQP_CONNECTION_CLOSE_METHOD) {
66: amqp_connection_close_t *m = (amqp_connection_close_t *)rr.reply.decoded;
67: if(m->reply_text.len > 0 && m->reply_text.bytes) {
68: error_msg = (const char*)m->reply_text.bytes;
69: error_len = m->reply_text.len;
70: }
71: }
72: }
73:
74: if(error_msg) {
75: throw Exception("amqp", 0, "%sfailed: %.*s", detail, (int)error_len, error_msg);
76: } else if(rr.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION) {
77: status_check(rr.library_error, detail);
78: }
79:
80: throw Exception("amqp", 0, "%sfailed", detail);
81: }
82:
83: #endif // WITH_AMQP
84:
85:
1.1 moko 86: static void _create(Request& r, MethodParams& params) {
87: VAmqp& self=GET_SELF(r, VAmqp);
88:
89: #ifdef WITH_AMQP
90: const char* host_c = "localhost";
91: int port = 5672;
92: const char* user_c = "guest";
93: const char* pass_c = "guest";
94: const char* vhost_c = "/";
95: const char* locale_c = "en_US";
96: int heartbeat = 30; // seconds
1.5 moko 97: const char* tls_ca = 0;
98: const char* tls_cert = 0;
99: const char* tls_key = 0;
100: bool tls_specified = false;
101: bool tls_verify = true;
1.1 moko 102:
103: if(params.count()>0){
104: if(HashStringValue* options=params.as_hash(0)){
105: for(HashStringValue::Iterator i(*options); i; i.next()){
106: String::Body key=i.key();
107: Value* value=i.value();
108: if(key=="host"){
109: host_c=value->as_string().cstr();
110: } else if(key=="port"){
111: port=r.process(*value).as_int();
112: } else if(key=="user"){
113: user_c=value->as_string().cstr();
114: } else if(key=="password"){
115: pass_c=value->as_string().cstr();
116: } else if(key=="vhost"){
117: vhost_c=value->as_string().cstr();
118: } else if(key=="locale"){
119: locale_c=value->as_string().cstr();
120: } else if(key=="heartbeat"){
121: heartbeat=r.process(*value).as_int();
1.5 moko 122: } else if(key=="tls"){
123: tls_specified = true;
124: if(HashStringValue* tls_options=value->get_hash()){
125: for(HashStringValue::Iterator t(*tls_options); t; t.next()){
126: String::Body tkey=t.key();
127: Value* tval=t.value();
128: if(tkey=="ca"){
129: tls_ca=tval->as_string().cstr();
130: } else if(tkey=="cert"){
131: tls_cert=tval->as_string().cstr();
132: } else if(tkey=="key"){
133: tls_key=tval->as_string().cstr();
134: } else if(tkey=="verify"){
135: tls_verify=r.process(*tval).as_bool();
136: } else
137: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
138: }
139: }
1.1 moko 140: } else
141: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
142: }
143: }
144: }
145:
146: amqp_connection_state_t conn = amqp_new_connection();
1.5 moko 147: amqp_socket_t* socket = 0;
148:
149: if(tls_specified) {
150: socket = amqp_ssl_socket_new(conn);
151: if(!socket)
152: throw Exception("amqp", 0, "failed to create SSL socket");
153:
154: // Set CA certificate if provided
155: if(tls_ca)
156: if(amqp_ssl_socket_set_cacert(socket, tls_ca))
157: throw Exception("amqp", 0, "failed to set CA certificate");
1.6 moko 158:
1.5 moko 159: // Set client certificate and key if provided
160: if(tls_cert && tls_key) {
161: if(amqp_ssl_socket_set_key(socket, tls_cert, tls_key))
162: throw Exception("amqp", 0, "failed to set client certificate/key");
163: } else if(tls_cert || tls_key) {
164: throw Exception("amqp", 0, "both cert and key must be specified for TLS");
165: }
1.6 moko 166:
1.5 moko 167: // If CA is provided, peer verification will use it
168: amqp_ssl_socket_set_verify_peer(socket, tls_verify && tls_ca);
169: // If verify=true, enable hostname verification
170: amqp_ssl_socket_set_verify_hostname(socket, tls_verify);
171: } else {
172: socket = amqp_tcp_socket_new(conn);
173: if(!socket)
174: throw Exception("amqp", 0, "failed to create TCP socket");
175: }
176:
1.6 moko 177: status_check(amqp_socket_open(socket, host_c, port), tls_specified ? "open SSL socket " : "open TCP socket ");
1.1 moko 178:
179: amqp_rpc_reply_t rlogin = amqp_login(conn, vhost_c, 0, 131072, heartbeat, AMQP_SASL_METHOD_PLAIN, user_c, pass_c);
180: if(rlogin.reply_type != AMQP_RESPONSE_NORMAL){
181: amqp_destroy_connection(conn);
1.6 moko 182: check(rlogin, "login ");
1.1 moko 183: }
184:
185: int channel = 1;
186: amqp_channel_open(conn, channel);
187: amqp_rpc_reply_t ropen = amqp_get_rpc_reply(conn);
188: if(ropen.reply_type != AMQP_RESPONSE_NORMAL){
189: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
190: amqp_destroy_connection(conn);
1.6 moko 191: check(ropen, "open channel ");
1.1 moko 192: }
193:
194: self.fconnection = conn;
195: self.fchannel = channel;
196: #else
197: (void)params; (void)self;
198: throw Exception("amqp", 0, "compiled without amqp support");
1.6 moko 199: #endif // WITH_AMQP
1.1 moko 200: }
201:
202: #ifdef WITH_AMQP
203:
1.2 moko 204: #define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l)))
205: #define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l))
206:
1.1 moko 207: static void _publish(Request& r, MethodParams& params) {
208: VAmqp& self=GET_SELF(r, VAmqp);
209: const String &msg=params.as_string(0, "msg must be string");
210: const char* exchange_c = ""; // default exchange
211: const char* routing_key_c = 0;
212: bool mandatory=false;
213:
214: amqp_basic_properties_t props;
215: props._flags = 0;
216:
217: if(params.count()>1){
218: if(HashStringValue* options=params.as_hash(1)){
219: for(HashStringValue::Iterator i(*options); i; i.next()){
220: String::Body key=i.key();
221: Value* value=i.value();
222: if(key=="exchange"){
223: exchange_c=value->as_string().cstr();
224: } else if(key=="routing_key"){
225: routing_key_c=value->as_string().cstr();
226: } else if(key=="queue"){
1.3 moko 227: routing_key_c=value->as_string().cstr();
1.1 moko 228: } else if(key=="mandatory"){
229: mandatory=r.process(*value).as_bool();
1.3 moko 230: } else if(key=="content_type"){
231: const char* v=value->as_string().cstr();
232: props.content_type=amqp_cstring_bytes(v);
233: props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG;
234: } else if(key=="content_encoding"){
235: const char* v=value->as_string().cstr();
236: props.content_encoding=amqp_cstring_bytes(v);
237: props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG;
238: } else if(key=="delivery_mode"){
239: uint8_t dm=(uint8_t)value->as_int();
240: props.delivery_mode=dm;
241: props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG;
242: } else if(key=="priority"){
243: uint8_t pr=(uint8_t)value->as_int();
244: props.priority=pr;
245: props._flags|=AMQP_BASIC_PRIORITY_FLAG;
246: } else if(key=="correlation_id"){
247: const char* v=value->as_string().cstr();
248: props.correlation_id=amqp_cstring_bytes(v);
249: props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG;
250: } else if(key=="reply_to"){
251: const char* v=value->as_string().cstr();
252: props.reply_to=amqp_cstring_bytes(v);
253: props._flags|=AMQP_BASIC_REPLY_TO_FLAG;
254: } else if(key=="expiration"){
255: const char* v=value->as_string().cstr();
256: props.expiration=amqp_cstring_bytes(v);
257: props._flags|=AMQP_BASIC_EXPIRATION_FLAG;
258: } else if(key=="message_id"){
259: const char* v=value->as_string().cstr();
260: props.message_id=amqp_cstring_bytes(v);
261: props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG;
262: } else if(key=="timestamp"){
263: uint64_t ts=(uint64_t)value->as_double();
264: props.timestamp=ts;
265: props._flags|=AMQP_BASIC_TIMESTAMP_FLAG;
266: } else if(key=="type"){
267: const char* v=value->as_string().cstr();
268: props.type=amqp_cstring_bytes(v);
269: props._flags|=AMQP_BASIC_TYPE_FLAG;
270: } else if(key=="user_id"){
271: const char* v=value->as_string().cstr();
272: props.user_id=amqp_cstring_bytes(v);
273: props._flags|=AMQP_BASIC_USER_ID_FLAG;
274: } else if(key=="app_id"){
275: const char* v=value->as_string().cstr();
276: props.app_id=amqp_cstring_bytes(v);
277: props._flags|=AMQP_BASIC_APP_ID_FLAG;
278: } else if(key=="headers"){
279: /* if(HashStringValue* hh=pval->get_hash()){
280: size_t count=hh->count();
281: amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0;
282: size_t idx=0;
283: for(HashStringValue::Iterator hi(*hh); hi; hi.next()){
284: String::Body hkey=hi.key();
285: const char* hv=hi.value()->as_string().cstr();
286: entries[idx].key=amqp_cstring_bytes(hkey.cstr());
287: entries[idx].value.kind=AMQP_FIELD_KIND_UTF8;
288: entries[idx].value.value.bytes=amqp_cstring_bytes(hv);
289: idx++;
1.1 moko 290: }
1.3 moko 291: props.headers.num_entries=(int)count;
292: props.headers.entries=entries;
293: props._flags|=AMQP_BASIC_HEADERS_FLAG;
1.1 moko 294: }
1.3 moko 295: */ } else
1.1 moko 296: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
297: }
298: }
299: }
300:
301: if(!routing_key_c)
302: throw Exception("amqp", 0, "routing_key or queue must be specified");
303:
304: amqp_bytes_t body;
305: body.len = msg.length();
306: body.bytes=(void*)msg.cstr();
307:
1.3 moko 308: int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body);
1.1 moko 309:
310: if(ret!=AMQP_STATUS_OK)
311: throw Exception("amqp", 0, "publish failed");
312:
313: // free temporary headers entries if allocated
314: if(props._flags & AMQP_BASIC_HEADERS_FLAG){
315: // delete [] props.headers.entries;
316: }
317: }
318:
319: static void _release(Request& r, MethodParams&) {
320: VAmqp& self=GET_SELF(r, VAmqp);
321: if(self.fconnection){
322: amqp_connection_state_t conn=self.fconnection;
323: amqp_channel_close(conn, self.fchannel, AMQP_REPLY_SUCCESS);
324: amqp_connection_close(conn, AMQP_REPLY_SUCCESS);
325: amqp_destroy_connection(conn);
326: self.fconnection=0;
327: self.fchannel=0;
328: }
329: }
330:
331: static void _ack(Request& r, MethodParams& params) {
332: VAmqp& self=GET_SELF(r, VAmqp);
1.7 moko 333: double tag=params.as_double(0, "delivery tag must be number", r);
334: int ret = amqp_basic_ack(self.connection(), self.channel(), (uint64_t)tag, 0);
1.1 moko 335: if(ret!=AMQP_STATUS_OK)
336: throw Exception("amqp", 0, "ack failed");
337: }
338:
339: static void _nack(Request& r, MethodParams& params) {
340: VAmqp& self=GET_SELF(r, VAmqp);
341: const String &tag_s=params.as_string(0, "delivery tag must not be code");
342: bool requeue=false;
343: if(params.count()>1){
344: if(HashStringValue* options=params.as_hash(1)){
345: for(HashStringValue::Iterator i(*options); i; i.next()){
346: if(i.key()=="requeue"){
347: requeue=r.process(*i.value()).as_bool();
348: } else
349: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
350: }
351: }
352: }
353: int ret = amqp_basic_nack(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), 0, requeue);
354: if(ret!=AMQP_STATUS_OK)
355: throw Exception("amqp", 0, "nack failed");
356: }
357:
358: static void _qos(Request& r, MethodParams& params) {
359: VAmqp& self=GET_SELF(r, VAmqp);
360: uint16_t prefetch_count=0;
361: if(params.count()>0){
362: if(HashStringValue* options=params.as_hash(0)){
363: for(HashStringValue::Iterator i(*options); i; i.next()){
364: if(i.key()=="prefetch_count"){
365: int pc=r.process(*i.value()).as_int();
366: prefetch_count= pc<0 ? 0 : (pc>65535 ? 65535 : (uint16_t)pc);
367: } else
368: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
369: }
370: }
371: }
372: amqp_basic_qos_ok_t *ret = amqp_basic_qos(self.connection(), self.channel(), 0, prefetch_count, 0);
373: if(!ret)
374: throw Exception("amqp", 0, "qos failed");
375: }
376:
377: static void _reject(Request& r, MethodParams& params) {
378: VAmqp& self=GET_SELF(r, VAmqp);
379: const String &tag_s=params.as_string(0, "delivery tag must not be code");
380: bool requeue=true; // by default return to queue
381: if(params.count()>1){
382: if(HashStringValue* options = params.as_hash(1)){
383: for(HashStringValue::Iterator i(*options); i; i.next()){
384: if(i.key() == "requeue"){
385: requeue=r.process(*i.value()).as_bool();
386: } else
387: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
388: }
389: }
390: }
391: int ret = amqp_basic_reject(self.connection(), self.channel(), pa_atoul(tag_s.cstr()), requeue);
392: if(ret!=AMQP_STATUS_OK)
393: throw Exception("amqp", 0, "reject failed");
394: }
395:
1.8 ! moko 396: static void _declare(Request& r, MethodParams& params) {
1.1 moko 397: VAmqp& self=GET_SELF(r, VAmqp);
1.8 ! moko 398: const char* exchange_c = 0;
! 399: const char* queue_c = 0;
1.1 moko 400: const char* type_c = "direct";
401: bool passive=false, durable=false, auto_delete=true, nowait=false;
1.8 ! moko 402: if(HashStringValue* options=params.as_hash(0)){
! 403: for(HashStringValue::Iterator i(*options); i; i.next()){
! 404: String::Body key=i.key();
! 405: Value* value=i.value();
! 406: if(key=="exchange"){
! 407: exchange_c=value->as_string().cstr();
! 408: } else if(key=="queue"){
! 409: queue_c=value->as_string().cstr();
! 410: } else if(key=="type"){
! 411: type_c=value->as_string().cstr();
! 412: } else if(key=="passive"){
! 413: passive=r.process(*value).as_bool();
! 414: } else if(key=="durable"){
! 415: durable=r.process(*value).as_bool();
! 416: } else if(key=="auto_delete"){
! 417: auto_delete=r.process(*value).as_bool();
! 418: } else if(key=="nowait"){
! 419: nowait=r.process(*value).as_bool();
! 420: } else
! 421: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 422: }
423: }
1.8 ! moko 424: if(!exchange_c && !queue_c)
! 425: throw Exception("amqp", 0, "exchange or queue must be specified");
! 426:
! 427: if(exchange_c){
! 428: amqp_exchange_declare(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(type_c), passive, durable, auto_delete, nowait, amqp_empty_table);
! 429: check(amqp_get_rpc_reply(self.connection()));
! 430: }
1.1 moko 431:
1.8 ! moko 432: if(queue_c){
! 433: amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), *queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table);
! 434: check(amqp_get_rpc_reply(self.connection()));
! 435: if(!*queue_c && ok){
! 436: r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len));
1.1 moko 437: }
438: }
439: }
440:
1.8 ! moko 441: static void _delete(Request& r, MethodParams& params) {
1.1 moko 442: VAmqp& self=GET_SELF(r, VAmqp);
1.8 ! moko 443: const char* exchange_c = 0;
! 444: const char* queue_c = 0;
! 445: bool if_unused=false, if_empty=false, nowait=false;
! 446: if(HashStringValue* options=params.as_hash(0)){
! 447: for(HashStringValue::Iterator i(*options); i; i.next()){
! 448: String::Body key=i.key();
! 449: Value* value=i.value();
! 450: if(key=="exchange"){
! 451: exchange_c=value->as_string().cstr();
! 452: } else if(key=="queue"){
! 453: queue_c=value->as_string().cstr();
! 454: } else if(key=="if_unused"){
! 455: if_unused=r.process(*value).as_bool();
! 456: } else if(key=="if_empty"){
! 457: if_empty=r.process(*value).as_bool();
! 458: } else if(key=="nowait"){
! 459: nowait=r.process(*value).as_bool();
! 460: } else
! 461: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 462: }
463: }
1.8 ! moko 464: if(!exchange_c && !queue_c)
! 465: throw Exception("amqp", 0, "exchange or queue must be specified");
! 466:
! 467: if(exchange_c){
! 468: amqp_exchange_delete(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), if_unused);
! 469: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 470: }
471:
1.8 ! moko 472: if(queue_c){
! 473: amqp_queue_delete(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), if_unused, if_empty);
! 474: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 475: }
476: }
477:
1.8 ! moko 478: static void _bind(Request& r, MethodParams& params) {
1.1 moko 479: VAmqp& self=GET_SELF(r, VAmqp);
480: const char* exchange_c=0;
481: const char* queue_c=0;
482: const char* routing_key_c="";
1.8 ! moko 483: if(HashStringValue* options=params.as_hash(0)){
! 484: for(HashStringValue::Iterator i(*options); i; i.next()){
! 485: String::Body key=i.key();
! 486: Value* value=i.value();
! 487: if(key=="exchange"){
! 488: exchange_c=value->as_string().cstr();
! 489: } else if(key=="queue"){
! 490: queue_c=value->as_string().cstr();
! 491: } else if(key=="routing_key"){
! 492: routing_key_c=value->as_string().cstr();
! 493: } else
! 494: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 495: }
496: }
497: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
498: amqp_queue_bind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 499: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 500: }
501:
1.8 ! moko 502: static void _unbind(Request& r, MethodParams& params) {
1.1 moko 503: VAmqp& self=GET_SELF(r, VAmqp);
504: const char* exchange_c=0;
505: const char* queue_c=0;
506: const char* routing_key_c="";
1.8 ! moko 507: if(HashStringValue* options=params.as_hash(0)){
! 508: for(HashStringValue::Iterator i(*options); i; i.next()){
! 509: String::Body key=i.key();
! 510: Value* value=i.value();
! 511: if(key=="exchange"){
! 512: exchange_c=value->as_string().cstr();
! 513: } else if(key=="queue"){
! 514: queue_c=value->as_string().cstr();
! 515: } else if(key=="routing_key"){
! 516: routing_key_c=value->as_string().cstr();
! 517: } else
! 518: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 519: }
520: }
521: if(!exchange_c || !queue_c) throw Exception("amqp", 0, "exchange and queue are required");
522: amqp_queue_unbind(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), amqp_empty_table);
1.4 moko 523: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 524: }
525:
1.8 ! moko 526: static void _purge(Request& r, MethodParams& params) {
! 527: VAmqp& self=GET_SELF(r, VAmqp);
! 528: const char* queue_c = 0;
! 529: if(HashStringValue* options=params.as_hash(0)){
! 530: for(HashStringValue::Iterator i(*options); i; i.next()){
! 531: String::Body key=i.key();
! 532: Value* value=i.value();
! 533: if(key=="queue"){
! 534: queue_c=value->as_string().cstr();
! 535: } else
! 536: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 537: }
! 538: }
! 539: if(!queue_c)
! 540: throw Exception("amqp", 0, "queue must be specified");
! 541:
! 542: amqp_queue_purge(self.connection(), self.channel(), amqp_cstring_bytes(queue_c));
! 543: check(amqp_get_rpc_reply(self.connection()));
! 544: }
! 545:
! 546: static void _info(Request& r, MethodParams& params) {
! 547: VAmqp& self = GET_SELF(r, VAmqp);
! 548: const char* queue_c = 0;
! 549:
! 550: if (HashStringValue* options=params.as_hash(0)) {
! 551: for (HashStringValue::Iterator i(*options); i; i.next()) {
! 552: String::Body key=i.key();
! 553: Value* value=i.value();
! 554: if(key=="queue"){
! 555: queue_c=value->as_string().cstr();
! 556: } else {
! 557: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
! 558: }
! 559: }
! 560: }
! 561:
! 562: if (!queue_c)
! 563: throw Exception("amqp", 0, "queue must be specified");
! 564:
! 565: amqp_queue_declare_ok_t* ok = amqp_queue_declare(self.connection(), self.channel(), amqp_cstring_bytes(queue_c), /*passive*/ 1, 0, 0, 0, amqp_empty_table);
! 566: check(amqp_get_rpc_reply(self.connection()));
! 567:
! 568: Value& result=*new VHash;
! 569: result.put_element(*new String("queue"), AMQP_VSTRING(ok->queue.bytes, ok->queue.len));
! 570: result.put_element(*new String("messages"), new VInt(ok->message_count));
! 571: result.put_element(*new String("consumers"), new VInt(ok->consumer_count));
! 572: r.write(result);
! 573: }
! 574:
! 575: static VHash *amqp_message_hash(amqp_envelope_t &envelope) {
! 576: VHash *result=new VHash;
! 577: HashStringValue* h=result->get_hash();
! 578: h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len));
! 579: h->put("delivery_tag", new VInt(envelope.delivery_tag));
! 580: h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len));
! 581: h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len));
! 582: return result;
! 583: }
! 584:
1.1 moko 585: static void _consume(Request& r, MethodParams& params) {
586: VAmqp& self=GET_SELF(r, VAmqp);
587: const char* queue_c=0;
588: const char* consumer_tag_c=0;
589: bool no_ack=true, nowait=false;
1.8 ! moko 590: int count=1;
1.1 moko 591: Junction* callback=0;
592:
1.2 moko 593: if(HashStringValue* options=params.as_hash(0)){
594: for(HashStringValue::Iterator i(*options); i; i.next()){
595: String::Body key=i.key();
596: Value* value=i.value();
1.3 moko 597: if(key=="callback"){
598: callback=value->get_junction();
599: } else if(key=="queue"){
1.2 moko 600: queue_c=value->as_string().cstr();
601: } else if(key=="consumer_tag"){
602: consumer_tag_c=value->as_string().cstr();
603: } else if(key=="no_ack"){
604: no_ack=r.process(*value).as_bool();
605: } else if(key=="nowait"){
606: nowait=r.process(*value).as_bool();
1.8 ! moko 607: } else if(key=="count"){
! 608: count=r.process(*value).as_int();
1.2 moko 609: } else
610: throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION);
1.1 moko 611: }
612: }
613:
1.8 ! moko 614: if(!queue_c) throw Exception("amqp", 0, "queue must be specified");
1.1 moko 615:
616: amqp_basic_consume(self.connection(), self.channel(), amqp_cstring_bytes(queue_c),
617: consumer_tag_c ? amqp_cstring_bytes(consumer_tag_c) : amqp_empty_bytes,
618: 0 /*no_local*/, no_ack, nowait, amqp_empty_table);
1.4 moko 619: check(amqp_get_rpc_reply(self.connection()));
1.1 moko 620:
1.8 ! moko 621: if(callback){
! 622: self.fstop=false;
! 623: while(!self.fstop){
! 624: amqp_envelope_t envelope;
! 625: memset(&envelope, 0, sizeof(envelope));
! 626: amqp_maybe_release_buffers(self.connection());
! 627: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
! 628: if(res.reply_type == AMQP_RESPONSE_NORMAL){
! 629: VHash *vh=amqp_message_hash(envelope);
! 630: Value *params_cb[]={vh};
! 631: METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, {
! 632: frame.store_params(params_cb, 1);
! 633: r.call(frame);
! 634: });
! 635: amqp_destroy_envelope(&envelope);
! 636: } else if(res.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION && res.library_error == AMQP_STATUS_TIMEOUT) {
! 637: continue;
! 638: } else {
! 639: break;
! 640: }
! 641: }
! 642: } else {
! 643: VArray& result=*new VArray();
! 644: ArrayValue& result_array=result.array();
! 645:
! 646: for(int i=0; i<count; i++){
! 647: amqp_envelope_t envelope;
! 648: memset(&envelope, 0, sizeof(envelope));
! 649: amqp_maybe_release_buffers(self.connection());
! 650: amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0);
! 651: if(res.reply_type == AMQP_RESPONSE_NORMAL){
! 652: result_array+=amqp_message_hash(envelope);
! 653: amqp_destroy_envelope(&envelope);
! 654: } else {
! 655: check(res);
! 656: }
1.1 moko 657: }
1.8 ! moko 658: r.write(result);
1.1 moko 659: }
660: }
661:
662: static void _stop_consume(Request& r, MethodParams&) {
663: VAmqp& self=GET_SELF(r, VAmqp);
664: self.fstop=true;
665: }
666:
1.6 moko 667: #endif // WITH_AMQP
1.1 moko 668:
669: // constructor
670: MAmqp::MAmqp(): Methoded("amqp") {
671: add_native_method("create", Method::CT_DYNAMIC, _create, 0, 1);
672: #ifdef WITH_AMQP
673: add_native_method("publish", Method::CT_DYNAMIC, _publish, 1, 2);
674: add_native_method("release", Method::CT_DYNAMIC, _release, 0, 0);
675: add_native_method("ack", Method::CT_DYNAMIC, _ack, 1, 1);
676: add_native_method("nack", Method::CT_DYNAMIC, _nack, 1, 2);
677: add_native_method("reject", Method::CT_DYNAMIC, _reject, 1, 2);
678: add_native_method("qos", Method::CT_DYNAMIC, _qos, 0, 1);
1.8 ! moko 679: add_native_method("declare", Method::CT_DYNAMIC, _declare, 1, 1);
! 680: add_native_method("delete", Method::CT_DYNAMIC, _delete, 1, 1);
! 681: add_native_method("bind", Method::CT_DYNAMIC, _bind, 1, 1);
! 682: add_native_method("unbind", Method::CT_DYNAMIC, _unbind, 1, 1);
! 683: add_native_method("purge", Method::CT_DYNAMIC, _purge, 1, 1);
! 684: add_native_method("info", Method::CT_DYNAMIC, _info, 1, 1);
1.2 moko 685: add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1);
1.1 moko 686: add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0);
1.6 moko 687: #endif // WITH_AMQP
1.1 moko 688: }
E-mail: