--- parser3/src/classes/amqp.C 2025/11/06 22:08:03 1.1 +++ parser3/src/classes/amqp.C 2025/11/07 22:36:35 1.3 @@ -21,7 +21,7 @@ #include #endif -volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.1 2025/11/06 22:08:03 moko Exp $" IDENT_PA_VAMQP_H; +volatile const char * IDENT_AMQP_C="$Id: amqp.C,v 1.3 2025/11/07 22:36:35 moko Exp $" IDENT_PA_VAMQP_H; class MAmqp: public Methoded { public: // VStateless_class @@ -106,12 +106,14 @@ static void check(const char* action, am throw Exception("amqp", 0, "%s failed", action); } +#define AMQP_STRING(s,l) new String(String::C(pa_strdup((const char*)(s), (l)), (l))) +#define AMQP_VSTRING(s,l) new VString(*AMQP_STRING(s,l)) + static void _publish(Request& r, MethodParams& params) { VAmqp& self=GET_SELF(r, VAmqp); const String &msg=params.as_string(0, "msg must be string"); const char* exchange_c = ""; // default exchange const char* routing_key_c = 0; - bool have_queue_sugar=false; bool mandatory=false; amqp_basic_properties_t props; @@ -127,85 +129,75 @@ static void _publish(Request& r, MethodP } else if(key=="routing_key"){ routing_key_c=value->as_string().cstr(); } else if(key=="queue"){ - routing_key_c=value->as_string().cstr(); have_queue_sugar=true; + routing_key_c=value->as_string().cstr(); } else if(key=="mandatory"){ mandatory=r.process(*value).as_bool(); - } else if(key=="properties"){ - // parse message properties - if(HashStringValue* ph=value->get_hash()){ - for(HashStringValue::Iterator p(*ph); p; p.next()){ - String::Body pkey=p.key(); - Value* pval=p.value(); - if(pkey=="content_type"){ - const char* v=pval->as_string().cstr(); - props.content_type=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG; - } else if(pkey=="content_encoding"){ - const char* v=pval->as_string().cstr(); - props.content_encoding=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG; - } else if(pkey=="delivery_mode"){ - uint8_t dm=(uint8_t)pval->as_int(); - props.delivery_mode=dm; - props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG; - } else if(pkey=="priority"){ - uint8_t pr=(uint8_t)pval->as_int(); - props.priority=pr; - props._flags|=AMQP_BASIC_PRIORITY_FLAG; - } else if(pkey=="correlation_id"){ - const char* v=pval->as_string().cstr(); - props.correlation_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG; - } else if(pkey=="reply_to"){ - const char* v=pval->as_string().cstr(); - props.reply_to=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_REPLY_TO_FLAG; - } else if(pkey=="expiration"){ - const char* v=pval->as_string().cstr(); - props.expiration=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_EXPIRATION_FLAG; - } else if(pkey=="message_id"){ - const char* v=pval->as_string().cstr(); - props.message_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG; - } else if(pkey=="timestamp"){ - uint64_t ts=(uint64_t)pval->as_double(); - props.timestamp=ts; - props._flags|=AMQP_BASIC_TIMESTAMP_FLAG; - } else if(pkey=="type"){ - const char* v=pval->as_string().cstr(); - props.type=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_TYPE_FLAG; - } else if(pkey=="user_id"){ - const char* v=pval->as_string().cstr(); - props.user_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_USER_ID_FLAG; - } else if(pkey=="app_id"){ - const char* v=pval->as_string().cstr(); - props.app_id=amqp_cstring_bytes(v); - props._flags|=AMQP_BASIC_APP_ID_FLAG; - } else if(pkey=="headers"){ -/* if(HashStringValue* hh=pval->get_hash()){ - size_t count=hh->count(); - amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0; - size_t idx=0; - for(HashStringValue::Iterator hi(*hh); hi; hi.next()){ - String::Body hkey=hi.key(); - const char* hv=hi.value()->as_string().cstr(); - entries[idx].key=amqp_cstring_bytes(hkey.cstr()); - entries[idx].value.kind=AMQP_FIELD_KIND_UTF8; - entries[idx].value.value.bytes=amqp_cstring_bytes(hv); - idx++; - } - props.headers.num_entries=(int)count; - props.headers.entries=entries; - props._flags|=AMQP_BASIC_HEADERS_FLAG; - } -*/ } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); + } else if(key=="content_type"){ + const char* v=value->as_string().cstr(); + props.content_type=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CONTENT_TYPE_FLAG; + } else if(key=="content_encoding"){ + const char* v=value->as_string().cstr(); + props.content_encoding=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CONTENT_ENCODING_FLAG; + } else if(key=="delivery_mode"){ + uint8_t dm=(uint8_t)value->as_int(); + props.delivery_mode=dm; + props._flags|=AMQP_BASIC_DELIVERY_MODE_FLAG; + } else if(key=="priority"){ + uint8_t pr=(uint8_t)value->as_int(); + props.priority=pr; + props._flags|=AMQP_BASIC_PRIORITY_FLAG; + } else if(key=="correlation_id"){ + const char* v=value->as_string().cstr(); + props.correlation_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_CORRELATION_ID_FLAG; + } else if(key=="reply_to"){ + const char* v=value->as_string().cstr(); + props.reply_to=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_REPLY_TO_FLAG; + } else if(key=="expiration"){ + const char* v=value->as_string().cstr(); + props.expiration=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_EXPIRATION_FLAG; + } else if(key=="message_id"){ + const char* v=value->as_string().cstr(); + props.message_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_MESSAGE_ID_FLAG; + } else if(key=="timestamp"){ + uint64_t ts=(uint64_t)value->as_double(); + props.timestamp=ts; + props._flags|=AMQP_BASIC_TIMESTAMP_FLAG; + } else if(key=="type"){ + const char* v=value->as_string().cstr(); + props.type=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_TYPE_FLAG; + } else if(key=="user_id"){ + const char* v=value->as_string().cstr(); + props.user_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_USER_ID_FLAG; + } else if(key=="app_id"){ + const char* v=value->as_string().cstr(); + props.app_id=amqp_cstring_bytes(v); + props._flags|=AMQP_BASIC_APP_ID_FLAG; + } else if(key=="headers"){ +/* if(HashStringValue* hh=pval->get_hash()){ + size_t count=hh->count(); + amqp_table_entry_t* entries=count ? new amqp_table_entry_t[count] : 0; + size_t idx=0; + for(HashStringValue::Iterator hi(*hh); hi; hi.next()){ + String::Body hkey=hi.key(); + const char* hv=hi.value()->as_string().cstr(); + entries[idx].key=amqp_cstring_bytes(hkey.cstr()); + entries[idx].value.kind=AMQP_FIELD_KIND_UTF8; + entries[idx].value.value.bytes=amqp_cstring_bytes(hv); + idx++; } + props.headers.num_entries=(int)count; + props.headers.entries=entries; + props._flags|=AMQP_BASIC_HEADERS_FLAG; } - } else +*/ } else throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } @@ -218,7 +210,7 @@ static void _publish(Request& r, MethodP body.len = msg.length(); body.bytes=(void*)msg.cstr(); - int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory , 0, &props, body); + int ret = amqp_basic_publish(self.connection(), self.channel(), amqp_cstring_bytes(exchange_c), amqp_cstring_bytes(routing_key_c), mandatory, 0, &props, body); if(ret!=AMQP_STATUS_OK) throw Exception("amqp", 0, "publish failed"); @@ -227,7 +219,6 @@ static void _publish(Request& r, MethodP if(props._flags & AMQP_BASIC_HEADERS_FLAG){ // delete [] props.headers.entries; } - (void)have_queue_sugar; } static void _release(Request& r, MethodParams&) { @@ -391,7 +382,7 @@ static void _declare_queue(Request& r, M amqp_queue_declare_ok_t *ok = amqp_queue_declare(self.connection(), self.channel(), queue_c ? amqp_cstring_bytes(queue_c) : amqp_empty_bytes, passive, durable, auto_delete, nowait, amqp_empty_table); check("declare queue", amqp_get_rpc_reply(self.connection())); if(!queue_c && ok){ - r.write(*new String(String::C(pa_strdup((const char *)ok->queue.bytes, ok->queue.len), ok->queue.len))); + r.write(*AMQP_STRING(ok->queue.bytes, ok->queue.len)); } } @@ -480,24 +471,22 @@ static void _consume(Request& r, MethodP bool no_ack=true, nowait=false; Junction* callback=0; - if(params.count()>0){ - if(HashStringValue* options=params.as_hash(0)){ - for(HashStringValue::Iterator i(*options); i; i.next()){ - String::Body key=i.key(); - Value* value=i.value(); - if(key=="queue"){ - queue_c=value->as_string().cstr(); - } else if(key=="consumer_tag"){ - consumer_tag_c=value->as_string().cstr(); - } else if(key=="no_ack"){ - no_ack=r.process(*value).as_bool(); - } else if(key=="nowait"){ - nowait=r.process(*value).as_bool(); - } else if(key=="callback"){ - callback=value->get_junction(); - } else - throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); - } + if(HashStringValue* options=params.as_hash(0)){ + for(HashStringValue::Iterator i(*options); i; i.next()){ + String::Body key=i.key(); + Value* value=i.value(); + if(key=="callback"){ + callback=value->get_junction(); + } else if(key=="queue"){ + queue_c=value->as_string().cstr(); + } else if(key=="consumer_tag"){ + consumer_tag_c=value->as_string().cstr(); + } else if(key=="no_ack"){ + no_ack=r.process(*value).as_bool(); + } else if(key=="nowait"){ + nowait=r.process(*value).as_bool(); + } else + throw Exception(PARSER_RUNTIME, 0, CALLED_WITH_INVALID_OPTION); } } @@ -513,15 +502,16 @@ static void _consume(Request& r, MethodP self.fstop=false; while(!self.fstop){ - amqp_envelope_t envelope; amqp_maybe_release_buffers(self.connection()); + amqp_envelope_t envelope; + memset(&envelope, 0, sizeof(envelope)); + amqp_maybe_release_buffers(self.connection()); amqp_rpc_reply_t res = amqp_consume_message(self.connection(), &envelope, NULL, 0); if(res.reply_type == AMQP_RESPONSE_NORMAL){ VHash &vh=*new VHash; HashStringValue* h=vh.get_hash(); - String msg(String::Body((const unsigned char*)envelope.message.body.bytes, envelope.message.body.len), String::L_CLEAN); - h->put("msg", new VString(msg)); + h->put("msg", AMQP_VSTRING(envelope.message.body.bytes, envelope.message.body.len)); h->put("delivery_tag", new VString(String::Body::uitoa((unsigned long long)envelope.delivery_tag))); - h->put("consumer_tag", new VString(String::Body((const unsigned char*)envelope.consumer_tag.bytes, envelope.consumer_tag.len))); - h->put("exchange", new VString(String::Body((const unsigned char*)envelope.exchange.bytes, envelope.exchange.len))); + h->put("consumer_tag", AMQP_VSTRING(envelope.consumer_tag.bytes, envelope.consumer_tag.len)); + h->put("exchange", AMQP_VSTRING(envelope.exchange.bytes, envelope.exchange.len)); Value *params_cb[]={&vh}; METHOD_FRAME_ACTION(*callback->method, r.method_frame, callback->self, { @@ -561,9 +551,7 @@ MAmqp::MAmqp(): Methoded("amqp") { add_native_method("delete_queue", Method::CT_DYNAMIC, _delete_queue, 0, 1); add_native_method("bind_queue", Method::CT_DYNAMIC, _bind_queue, 0, 1); add_native_method("unbind_queue", Method::CT_DYNAMIC, _unbind_queue, 0, 1); - add_native_method("consume", Method::CT_DYNAMIC, _consume, 0, 1); + add_native_method("consume", Method::CT_DYNAMIC, _consume, 1, 1); add_native_method("stop_consume", Method::CT_DYNAMIC, _stop_consume, 0, 0); #endif } - -