Erlang Mailing Lists

Author Message

<  RabbitMQ mailing list  ~  Erlang Client - Unable to Subscribe with basic.consume

Guest
Posted: Fri Jan 08, 2010 11:02 pm Reply with quote
Guest
I am trying to subscribe a queue to a Pid using:

amqp_channel:call(Channel, #'basic.consume'{queue=QueueName}, Pid)

and using all other default settings in the #'basic.consume' record. I
get the following exception stack and have been unable to discover why.
Any help would be greatly appreciated!


=ERROR REPORT==== 8-Jan-2010::13:52:43 ===
** Generic server <0.44.0> terminating
** Last message in was {call,
{'basic.consume',1,
<<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,
false,false,false,false},
<0.47.0>}
** When Server state == {c_state,1,<0.37.0>,<0.45.0>,<0.46.0>,network,
{[],[]},
{[],[]},
{dict,0,16,16,8,80,48,

{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}},
false,none,false,none,
{dict,0,16,16,8,80,48,

{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}}}
** Reason for termination ==
** {function_clause,[{amqp_channel,build_content,[<0.47.0>]},
{amqp_channel,handle_call,3},
{gen_server,handle_msg,5},
{proc_lib,init_p_do_apply,3}]}

=ERROR REPORT==== 8-Jan-2010::13:52:43 ===
Connection (<0.37.0>) closing: channel (<0.44.0>) died. Reason:
{function_clause,

[{amqp_channel,

build_content,

[<0.47.0>]},

{amqp_channel,

handle_call,
3},

{gen_server,

handle_msg,
5},
{proc_lib,

init_p_do_apply,
3}]}
** exception exit:
{{function_clause,[{amqp_channel,build_content,[<0.47.0>]},
{amqp_channel,handle_call,3},
{gen_server,handle_msg,5},
{proc_lib,init_p_do_apply,3}]},
{gen_server,call,
[<0.44.0>,
{call,{'basic.consume',1,

<<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,false,false,
false,false},
<0.47.0>},
infinity]}}
in function gen_server:call/3
in call from rabbitmq_test:consume/0
2>
=ERROR REPORT==== 8-Jan-2010::13:52:43 ===
** Generic server <0.37.0> terminating
** Last message in was {'$gen_cast',{method,{'connection.close_ok'},none}}
** When Server state == {nc_state,

{amqp_params,<<"guest">>,<<"guest">>,<<"/test">>,
"localhost",5672,0,0,0,none},
#Port<0.530>,<0.42.0>,<0.41.0>,<0.40.0>,0,0,
{nc_closing,internal_error,
{'connection.close',541,<<>>,0,0},
none,wait_close_ok},
{{0,nil},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]},

{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}}}}
** Reason for termination ==
** {internal_error,541,<<>>}

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post received from mailinglist
Guest
Posted: Sat Jan 09, 2010 2:52 pm Reply with quote
Guest
Hi Loren,

It depends which version of the Erlang client you're using, because this
has recently changed.

The version that is tagged with rabbitmq_v1_7_0 has a "subscribe"
function. This should be used as:

#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:subscribe(Chan, #'basic.consume'{ queue = Q }, Pid),

However, I decided that I really didn't like this as it's non obvious
and doesn't match the rest of the API. Thus on default, consume exists
as a normal call. However, there is no Pid argument: instead you should
pass the Chan to the pid that needs to receive the msgs and get it to do
the consumer itself:

#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:call(Chan, #'basic.consume'{ queue = Q }),

By using the 3-arg version of call, the client thinks that the consume
method has content, which is why it's exploding, as it can't encode
it. This is why we don't want to have the situation where for consumer,
the 3rd-arg is treated as a Pid, whereas for all other methods it's
treated as method content.

Hopefully the API is now more consistent.

Matthew

On Fri, Jan 08, 2010 at 04:01:00PM -0700, Schlomer, Loren wrote:
> I am trying to subscribe a queue to a Pid using:
>
> amqp_channel:call(Channel, #'basic.consume'{queue=QueueName}, Pid)
>
> and using all other default settings in the #'basic.consume' record. I
> get the following exception stack and have been unable to discover why.
> Any help would be greatly appreciated!
>
>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> ** Generic server <0.44.0> terminating
> ** Last message in was {call,
> {'basic.consume',1,
> <<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,
> false,false,false,false},
> <0.47.0>}
> ** When Server state == {c_state,1,<0.37.0>,<0.45.0>,<0.46.0>,network,
> {[],[]},
> {[],[]},
> {dict,0,16,16,8,80,48,
>
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}},
> false,none,false,none,
> {dict,0,16,16,8,80,48,
>
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}}}
> ** Reason for termination ==
> ** {function_clause,[{amqp_channel,build_content,[<0.47.0>]},
> {amqp_channel,handle_call,3},
> {gen_server,handle_msg,5},
> {proc_lib,init_p_do_apply,3}]}
>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> Connection (<0.37.0>) closing: channel (<0.44.0>) died. Reason:
> {function_clause,
>
> [{amqp_channel,
>
> build_content,
>
> [<0.47.0>]},
>
> {amqp_channel,
>
> handle_call,
> 3},
>
> {gen_server,
>
> handle_msg,
> 5},
> {proc_lib,
>
> init_p_do_apply,
> 3}]}
> ** exception exit:
> {{function_clause,[{amqp_channel,build_content,[<0.47.0>]},
> {amqp_channel,handle_call,3},
> {gen_server,handle_msg,5},
> {proc_lib,init_p_do_apply,3}]},
> {gen_server,call,
> [<0.44.0>,
> {call,{'basic.consume',1,
>
> <<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,false,false,
> false,false},
> <0.47.0>},
> infinity]}}
> in function gen_server:call/3
> in call from rabbitmq_test:consume/0
> 2>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> ** Generic server <0.37.0> terminating
> ** Last message in was {'$gen_cast',{method,{'connection.close_ok'},none}}
> ** When Server state == {nc_state,
>
> {amqp_params,<<"guest">>,<<"guest">>,<<"/test">>,
> "localhost",5672,0,0,0,none},
> #Port<0.530>,<0.42.0>,<0.41.0>,<0.40.0>,0,0,
> {nc_closing,internal_error,
> {'connection.close',541,<<>>,0,0},
> none,wait_close_ok},
> {{0,nil},
> {dict,0,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]},
>
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}}}}
> ** Reason for termination ==
> ** {internal_error,541,<<>>}
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss@lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post received from mailinglist
Guest
Posted: Mon Jan 11, 2010 3:08 pm Reply with quote
Guest
Matthew, thank you for your help! The only documentation I could find for subscribing a queue to a pid using basic.consume was here:

http://hopper.squarespace.com/blog/2008/1/12/introducing-the-erlang-amqp-client.html

So, it appeared that the only API changes made was the way connection to the broker was established. Thanks again!

________________________________________
From: rabbitmq-discuss-bounces@lists.rabbitmq.com [rabbitmq-discuss-bounces@lists.rabbitmq.com] On Behalf Of Matthew Sackman [matthew@lshift.net]
Sent: Saturday, January 09, 2010 7:51 AM
To: rabbitmq-discuss@lists.rabbitmq.com
Subject: Re: [rabbitmq-discuss] Erlang Client - Unable to Subscribe with basic.consume

Hi Loren,

It depends which version of the Erlang client you're using, because this
has recently changed.

The version that is tagged with rabbitmq_v1_7_0 has a "subscribe"
function. This should be used as:

#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:subscribe(Chan, #'basic.consume'{ queue = Q }, Pid),

However, I decided that I really didn't like this as it's non obvious
and doesn't match the rest of the API. Thus on default, consume exists
as a normal call. However, there is no Pid argument: instead you should
pass the Chan to the pid that needs to receive the msgs and get it to do
the consumer itself:

#'basic.consume_ok'{ consumer_tag = Tag } =
amqp_channel:call(Chan, #'basic.consume'{ queue = Q }),

By using the 3-arg version of call, the client thinks that the consume
method has content, which is why it's exploding, as it can't encode
it. This is why we don't want to have the situation where for consumer,
the 3rd-arg is treated as a Pid, whereas for all other methods it's
treated as method content.

Hopefully the API is now more consistent.

Matthew

On Fri, Jan 08, 2010 at 04:01:00PM -0700, Schlomer, Loren wrote:
> I am trying to subscribe a queue to a Pid using:
>
> amqp_channel:call(Channel, #'basic.consume'{queue=QueueName}, Pid)
>
> and using all other default settings in the #'basic.consume' record. I
> get the following exception stack and have been unable to discover why.
> Any help would be greatly appreciated!
>
>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> ** Generic server <0.44.0> terminating
> ** Last message in was {call,
> {'basic.consume',1,
> <<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,
> false,false,false,false},
> <0.47.0>}
> ** When Server state == {c_state,1,<0.37.0>,<0.45.0>,<0.46.0>,network,
> {[],[]},
> {[],[]},
> {dict,0,16,16,8,80,48,
>
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}},
> false,none,false,none,
> {dict,0,16,16,8,80,48,
>
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> []},
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}}}
> ** Reason for termination ==
> ** {function_clause,[{amqp_channel,build_content,[<0.47.0>]},
> {amqp_channel,handle_call,3},
> {gen_server,handle_msg,5},
> {proc_lib,init_p_do_apply,3}]}
>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> Connection (<0.37.0>) closing: channel (<0.44.0>) died. Reason:
> {function_clause,
>
> [{amqp_channel,
>
> build_content,
>
> [<0.47.0>]},
>
> {amqp_channel,
>
> handle_call,
> 3},
>
> {gen_server,
>
> handle_msg,
> 5},
> {proc_lib,
>
> init_p_do_apply,
> 3}]}
> ** exception exit:
> {{function_clause,[{amqp_channel,build_content,[<0.47.0>]},
> {amqp_channel,handle_call,3},
> {gen_server,handle_msg,5},
> {proc_lib,init_p_do_apply,3}]},
> {gen_server,call,
> [<0.44.0>,
> {call,{'basic.consume',1,
>
> <<"amq.gen-JKETIEiVgO9bzH5ZKfghkA==">>,<<>>,false,false,
> false,false},
> <0.47.0>},
> infinity]}}
> in function gen_server:call/3
> in call from rabbitmq_test:consume/0
> 2>
> =ERROR REPORT==== 8-Jan-2010::13:52:43 ===
> ** Generic server <0.37.0> terminating
> ** Last message in was {'$gen_cast',{method,{'connection.close_ok'},none}}
> ** When Server state == {nc_state,
>
> {amqp_params,<<"guest">>,<<"guest">>,<<"/test">>,
> "localhost",5672,0,0,0,none},
> #Port<0.530>,<0.42.0>,<0.41.0>,<0.40.0>,0,0,
> {nc_closing,internal_error,
> {'connection.close',541,<<>>,0,0},
> none,wait_close_ok},
> {{0,nil},
> {dict,0,16,16,8,80,48,
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]},
>
> {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
> [],[]}}}}}
> ** Reason for termination ==
> ** {internal_error,541,<<>>}
>
> _______________________________________________
> rabbitmq-discuss mailing list
> rabbitmq-discuss@lists.rabbitmq.com
> http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss

_______________________________________________
rabbitmq-discuss mailing list
rabbitmq-discuss@lists.rabbitmq.com
http://lists.rabbitmq.com/cgi-bin/mailman/listinfo/rabbitmq-discuss
Post received from mailinglist

Display posts from previous:  

All times are GMT
Page 1 of 1
This forum is locked: you cannot post, reply to, or edit topics.

Jump to:  

You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
You cannot attach files in this forum
You cannot download files in this forum