Erlang Mailing Lists

Author Message

<  Erlang questions mailing list  ~  tcp connections dropped in gen_server

Guest
Posted: Mon Sep 05, 2011 4:59 pm Reply with quote
Guest
I have a running application that consist in a supervisor and two
generic servers, one of them wraps around odbc and the other handles
tcp connections, a fragment of the relevant code is:


init([]) ->
process_flag(trap_exit, true),
{ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
{reuseaddr, true},
{active, true}]),
proc_lib:spawn_link(?MODULE, acceptor, [ListenSocket])

acceptor(ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
error_logger:info_msg("New connection from ~p~n", [Socket]),
_Pid = proc_lib:spawn(?MODULE, acceptor, [ListenSocket]),
inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
loop(Socket).

loop(Socket) ->
receive
{tcp, Socket, Data} ->
error_logger:info_msg("Messaged received from ~p: ~p~n", [Socket, Data]),
comm_lib:handle_message(Socket, Data),
loop(Socket);
{tcp_closed, Socket} ->
error_logger:info_msg("Device at ~p disconnected~n", [Socket]);
_Any ->
%% skip this
loop(Socket)
end.

So, I basically start a new unlinked process for every new tcp
connection. It works just fine for a couple hours but then every tcp
connection is dropped gradually with message "Device at ~p
disconnected". The client will try to reconnect if connection is
closed. The tcp connection should only terminate if remote end closes
it or spawned proccess in the server crashes.

After all connections were dropped, I can see with inet:i() that there
are established connections but no logging!

Can anyone give some insight or point to the right direction to debug this?
_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Mon Sep 05, 2011 6:52 pm Reply with quote
Guest
Hello.

I think the problem of missing log messages (and hanging connection
processes) might be in the fact that you don't handle messages of
the form {tcp_error, Socket, Reason} in your loop/1 function.

Also, if I understand the code correctly, the newly created connection
processes (acceptors) are not supervised. To prevent future problems
with this I strongly recommend you to modify your code slightly as
suggested in the book "Erlang and OTP in action".

NOTE: I haven't even attempted to compile the following code (taken
from the book and adapted to your use case).

Modified process structure:
simple_one_for_one - one for each ListenSocket
loop - one for each existing TCP connection on the ListenSocket
acceptor - one on the ListenSocket


%%%%%%%%%%%%%%%%%%%%%%
%%% TCP supervisor %%%
%%%%%%%%%%%%%%%%%%%%%%
-module(tcp_sup).

-behaviour(supervisor).

-export([start_link/1, start_child/1]).
-export([init/1]).

start_link({port, Port}) ->
{ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, true}, {active, true}]),
start_link({listen_socket, ListenSocket});
start_link({listen_socket, ListenSocket}) ->
supervisor:start_link(?MODULE, [ListenSocket]).

start_child(SupPid) ->
supervisor:start_child(SupPid, []).

init([ListenSocket]) ->
Server = {tcp_srv, {tcp_srv, start_link, [self(), ListenSocket]},
temporary, brutal_kill, worker, [tcp_srv]},
RestartStrategy = {simple_one_for_one, 0, 1}, % <-- tune for production demands
{ok, {RestartStrategy, [Server]}}.


%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% TCP server (aceptor + loop for one TCP connection %%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-module(tcp_srv).

-export([start_link/2]).
-export([acceptor/2]).

start_link(SupPid, ListenSocket) ->
proc_lib:spawn_link(?MODULE, acceptor, [SupPid, ListenSocket]).

acceptor(SupPid, ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
tcp_sup:start_child(SupPid, ListenSocket), % <-- Instruct the tcp_sup SupPid to start new acceptor process.
error_logger:info_msg("New connection from ~p~n", [Socket]),
inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
loop(Socket).

loop(Socket) ->
%% As before.


You should also consider to introduce a flow control to limit
unbounded memory usage under heavy load using {active, false}
for ListenSocket and {active, once} for Socket:
1. [in tcp_sup:start_link/1] {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]),
2. [in tcp_srv:acceptor/2] inet:setopts(Socket, [binary, {nodelay, true}, {active, once}]),
3. [modify tcp_srv:loop/1]
loop(Socket) ->
receive
{tcp, Socket, Data} ->
inet:setopts(Socket, [{active, once}]), % <-- added line
error_logger:info_msg("Messaged received from ~p: ~p~n", [Socket, Data]),
comm_lib:handle_message(Socket, Data),
loop(Socket);
{tcp_closed, Socket} ->
error_logger:info_msg("Device at ~p disconnected~n", [Socket]);
_Any ->
%% skip this
loop(Socket)
end.


HTH,

Ladislav Lenart


On 5.9.2011 18:59, Reynaldo Baquerizo wrote:
> I have a running application that consist in a supervisor and two
> generic servers, one of them wraps around odbc and the other handles
> tcp connections, a fragment of the relevant code is:
>
>
> init([]) ->
> process_flag(trap_exit, true),
> {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
> {reuseaddr, true},
> {active, true}]),
> proc_lib:spawn_link(?MODULE, acceptor, [ListenSocket])
>
> acceptor(ListenSocket) ->
> {ok, Socket} = gen_tcp:accept(ListenSocket),
> error_logger:info_msg("New connection from ~p~n", [Socket]),
> _Pid = proc_lib:spawn(?MODULE, acceptor, [ListenSocket]),
> inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
> loop(Socket).
>
> loop(Socket) ->
> receive
> {tcp, Socket, Data} ->
> error_logger:info_msg("Messaged received from ~p: ~p~n", [Socket, Data]),
> comm_lib:handle_message(Socket, Data),
> loop(Socket);
> {tcp_closed, Socket} ->
> error_logger:info_msg("Device at ~p disconnected~n", [Socket]);
> _Any ->
> %% skip this
> loop(Socket)
> end.
>
> So, I basically start a new unlinked process for every new tcp
> connection. It works just fine for a couple hours but then every tcp
> connection is dropped gradually with message "Device at ~p
> disconnected". The client will try to reconnect if connection is
> closed. The tcp connection should only terminate if remote end closes
> it or spawned proccess in the server crashes.
>
> After all connections were dropped, I can see with inet:i() that there
> are established connections but no logging!
>
> Can anyone give some insight or point to the right direction to debug this?
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@erlang.org
> http://erlang.org/mailman/listinfo/erlang-questions
>
>


_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Mon Sep 05, 2011 9:41 pm Reply with quote
Guest
Hi,

I've been testing one code found on internet and it works smoothly. The code can be found at:

http://20bits.com/articles/erlang-a-generalized-tcp-server/

Take a look at the code and compare it with your code. It will help in solving your problem.

About your code, it may be your code gets a bit confused by which connection belongs to which spawned thread and then releases an error which generates tcp_closed message. Nevertheless, I might be wrong. Better try to dump your tcp traffic and see where is the problem in the connections.

Cheers,
CGS



On Mon, Sep 5, 2011 at 6:59 PM, Reynaldo Baquerizo <reynaldomic@gmail.com (reynaldomic@gmail.com)> wrote:
Quote:
I have a running application that consist in a supervisor and two
generic servers, one of them wraps around odbc and the other handles
tcp connections, a fragment of the relevant code is:


init([]) ->
rolphin
Posted: Mon Sep 05, 2011 10:05 pm Reply with quote
Joined: 03 Jul 2007 Posts: 8
On Mon, Sep 5, 2011 at 11:40 PM, George Catalin Serbanut <cgsmcmlxxv@gmail.com (cgsmcmlxxv@gmail.com)> wrote:
Quote:
Hi,

I've been testing one code found on internet and it works smoothly. The code can be found at:

http://20bits.com/articles/erlang-a-generalized-tcp-server/

Take a look at the code and compare it with your code. It will help in solving your problem.



I find that
View user's profile Send private message
Guest
Posted: Tue Sep 06, 2011 11:15 am Reply with quote
Guest
Hello.

On 5.9.2011 23:40, Reynaldo Baquerizo wrote:
>> [snip]
>>
>> Also, if I understand the code correctly, the newly created connection
>> processes (acceptors) are not supervised. To prevent future problems
>> with this I strongly recommend you to modify your code slightly as
>> suggested in the book "Erlang and OTP in action".
>
> I didn't feel the need to supervised those connections. I fail to see
> the difference between leaving them unattended and simple_one_for_one
> with no restart.

There was a comment tcp_sup:init/1 saying "tune for production demands" to
give you a hint to change the restart strategy according to your needs. The
advantage of having the connection processes supervised (with suitable restart
strategy) is that the supervisor should log all unexpected crashes of the
connection processes. It is also nice to look at appmon and see where the
processes belong.


> Thanks for the feedback !

You're welcome Smile


NOTE (to anyone who should later read this thread): I've made a few
mistakes in my previous code sample:
* The supervisor should start its first child (acceptor) as part of
its own init (hidden behind tcp:start_link/1 API call. Otherwise
noone will be listening on the ListenSocket.
* tcp_srv:start_link/2 should return {ok, pid()} instead of pid() to
adhere to common expectations for start_link/X functions.

In module tcp_sup:
start_link({port, Port}) ->
{ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, true}, {active, true}]),
start_link({listen_socket, ListenSocket});
start_link({listen_socket, ListenSocket}) ->
{ok, SupPid} = supervisor:start_link(?MODULE, [ListenSocket]),
{ok, _Pid} = tcp_sup:startChild(SupPid), % <-- Start the first acceptor.
{ok, SupPid}.

In module tcp_srv:
start_link(SupPid, ListenSocket) ->
{ok, proc_lib:spawn_link(?MODULE, acceptor, [SupPid, ListenSocket])}.


Ladislav Lenart


>> NOTE: I haven't even attempted to compile the following code (taken
>> from the book and adapted to your use case).
>>
>> Modified process structure:
>> simple_one_for_one - one for each ListenSocket
>> loop - one for each existing TCP connection on the ListenSocket
>> acceptor - one on the ListenSocket
>>
>>
>> %%%%%%%%%%%%%%%%%%%%%%
>> %%% TCP supervisor %%%
>> %%%%%%%%%%%%%%%%%%%%%%
>> -module(tcp_sup).
>>
>> -behaviour(supervisor).
>>
>> -export([start_link/1, start_child/1]).
>> -export([init/1]).
>>
>> start_link({port, Port}) ->
>> {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
>> {reuseaddr, true}, {active, true}]),
>> start_link({listen_socket, ListenSocket});
>> start_link({listen_socket, ListenSocket}) ->
>> supervisor:start_link(?MODULE, [ListenSocket]).
>>
>> start_child(SupPid) ->
>> supervisor:start_child(SupPid, []).
>>
>> init([ListenSocket]) ->
>> Server = {tcp_srv, {tcp_srv, start_link, [self(), ListenSocket]},
>> temporary, brutal_kill, worker, [tcp_srv]},
>> RestartStrategy = {simple_one_for_one, 0, 1}, %<-- tune for
>> production demands
>> {ok, {RestartStrategy, [Server]}}.
>>
>>
>> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
>> %%% TCP server (aceptor + loop for one TCP connection %%%
>> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
>> -module(tcp_srv).
>>
>> -export([start_link/2]).
>> -export([acceptor/2]).
>>
>> start_link(SupPid, ListenSocket) ->
>> proc_lib:spawn_link(?MODULE, acceptor, [SupPid, ListenSocket]).
>>
>> acceptor(SupPid, ListenSocket) ->
>> {ok, Socket} = gen_tcp:accept(ListenSocket),
>> tcp_sup:start_child(SupPid, ListenSocket), %<-- Instruct the tcp_sup
>> SupPid to start new acceptor process.
>> error_logger:info_msg("New connection from ~p~n", [Socket]),
>> inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
>> loop(Socket).
>>
>> loop(Socket) ->
>> %% As before.
>>
>>
>> You should also consider to introduce a flow control to limit
>> unbounded memory usage under heavy load using {active, false}
>> for ListenSocket and {active, once} for Socket:
>> 1. [in tcp_sup:start_link/1] {ok, ListenSocket} = gen_tcp:listen(Port,
>> [binary, {packet, 0}, {reuseaddr, true}, {active, false}]),
>> 2. [in tcp_srv:acceptor/2] inet:setopts(Socket, [binary, {nodelay, true},
>> {active, once}]),
>> 3. [modify tcp_srv:loop/1]
>> loop(Socket) ->
>> receive
>> {tcp, Socket, Data} ->
>> inet:setopts(Socket, [{active, once}]), %<-- added line
>> error_logger:info_msg("Messaged received from ~p: ~p~n",
>> [Socket, Data]),
>> comm_lib:handle_message(Socket, Data),
>> loop(Socket);
>> {tcp_closed, Socket} ->
>> error_logger:info_msg("Device at ~p disconnected~n",
>> [Socket]);
>> _Any ->
>> %% skip this
>> loop(Socket)
>> end.
>>
>>
>> HTH,
>>
>> Ladislav Lenart
>>
>>
>> On 5.9.2011 18:59, Reynaldo Baquerizo wrote:
>>>
>>> I have a running application that consist in a supervisor and two
>>> generic servers, one of them wraps around odbc and the other handles
>>> tcp connections, a fragment of the relevant code is:
>>>
>>>
>>> init([]) ->
>>> process_flag(trap_exit, true),
>>> {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
>>>
>>> {reuseaddr, true},
>>>
>>> {active, true}]),
>>> proc_lib:spawn_link(?MODULE, acceptor, [ListenSocket])
>>>
>>> acceptor(ListenSocket) ->
>>> {ok, Socket} = gen_tcp:accept(ListenSocket),
>>> error_logger:info_msg("New connection from ~p~n", [Socket]),
>>> _Pid = proc_lib:spawn(?MODULE, acceptor, [ListenSocket]),
>>> inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
>>> loop(Socket).
>>>
>>> loop(Socket) ->
>>> receive
>>> {tcp, Socket, Data} ->
>>> error_logger:info_msg("Messaged received from ~p: ~p~n", [Socket,
>>> Data]),
>>> comm_lib:handle_message(Socket, Data),
>>> loop(Socket);
>>> {tcp_closed, Socket} ->
>>> error_logger:info_msg("Device at ~p disconnected~n", [Socket]);
>>> _Any ->
>>> %% skip this
>>> loop(Socket)
>>> end.
>>>
>>> So, I basically start a new unlinked process for every new tcp
>>> connection. It works just fine for a couple hours but then every tcp
>>> connection is dropped gradually with message "Device at ~p
>>> disconnected". The client will try to reconnect if connection is
>>> closed. The tcp connection should only terminate if remote end closes
>>> it or spawned proccess in the server crashes.
>>>
>>> After all connections were dropped, I can see with inet:i() that there
>>> are established connections but no logging!
>>>
>>> Can anyone give some insight or point to the right direction to debug
>>> this?

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Tue Sep 06, 2011 5:55 pm Reply with quote
Guest
Hello.

[I hope it's ok that I reply to the list - someone else might find this
information useful as well. I am mentioning this only because you keep
replying to me privately.]

On 6.9.2011 17:52, Reynaldo Baquerizo wrote:
>>>> Also, if I understand the code correctly, the newly created connection
>>>> processes (acceptors) are not supervised. To prevent future problems
>>>> with this I strongly recommend you to modify your code slightly as
>>>> suggested in the book "Erlang and OTP in action".
>>>
>>> I didn't feel the need to supervised those connections. I fail to see
>>> the difference between leaving them unattended and simple_one_for_one
>>> with no restart.
>>
>> There was a comment tcp_sup:init/1 saying "tune for production demands" to
>
> Achh.. I read it as "tuned" Smile

Good one! I have to be careful next time I invent "helpful comments" Smile


>> give you a hint to change the restart strategy according to your needs. The
>> advantage of having the connection processes supervised (with suitable
>> restart
>
> Will those processes keep their state when restarted?
> For instance, the socket will be closed or the restart will happen earlier?

They won't be restarted. The socket will be closed because its controlling
process (the one that performed gen_tcp:accept/1) just died (and the socket
was linked to it). My sole aim was to put connection processes under
supervision. This way you will be informed about their abnormal crashes.
I proposed the following supervisor (in tcp_sup):

init([ListenSocket]) ->
Server = {tcp_srv, {tcp_srv, start_link, [self(), ListenSocket]},
temporary, brutal_kill, worker, [tcp_srv]},
RestartStrategy = {simple_one_for_one, 0, 1}, % <-- tune for production demands
{ok, {RestartStrategy, [Server]}}.

As you can see from the child specification (Server):
* temporary - Instructs the supervisor to never restart its terminated children.
* brutal_kill - Instructs the supervisor to kill its children (via exit(ChildPid,
kill)) when it itself is about to terminate without giving them any chance to
react whatsoever (e.g. clean up) but also without any possibility of failure
of this operation. The brutal_kill is the only way to terminate the one process
blocked indefinitely in call to gen_tcp:accept(ListenSocket).

It makes very little sense to restart these processes (to me), because
the TCP connection will die as well. The external client can reconnect
and start anew. Or am I missing something?


> I use a gen_server to encapsulate some state, does it matter? your
> example of tcp_srv isn't a gen_server.

Well, neither was yours, I just kept it that way to minimize changes
in your original code Smile

But gen_server it is...

NOTES:
* Again, I haven't even made an attempt to compile the following code.
* The tricky bit is to postpone the initialization (i.e. gen_tcp:accept/1).
* The server has no application specific state. However it should be
straightforward to add it, preferably with something like
-record(state, {supervisor, listen_socket, socket, ...}).


%%% TCP connection process
-module(tcp_srv).

-behaviour(gen_server).

-export([start_link/2, stop/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).


%%% API

start_link(SupPid, ListenSocket) ->
gen_server:start_link(?MODULE, [SupPid, ListenSocket], []).

stop(Pid) ->
gen_server:cast(Pid, stop).


%%% Callbacks

init([SupPid, ListenSocket]) ->
%% Return from the init now because the supervisor is waiting for me.
%% Postpone my initialization for later.
%% accept will be the first message I will receive and I will block
%% there indefinitely.
self() ! accept,
{ok, {SupPid, ListenSocket}}.

handle_call(Msg, _From, State) ->
{reply, {error, {unknown_request, Msg}}, State}.

handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(accept, {SupPid, ListenSocket}) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
tcp_sup:start_child(SupPid, ListenSocket), % <-- Instruct the tcp_sup SupPid to start new acceptor process.
error_logger:info_msg("New connection from ~p~n", [Socket]),
inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
{noreply, Socket};
handle_info({tcp, Socket, Data}, State) ->
error_logger:info_msg("Messaged received from ~p: ~p~n", [Socket, Data]),
%% comm_lib:handle_message/3 is expected to return NewState.
{noreply, comm_lib:handle_message(Socket, Data, State)};
handle_info({tcp_closed, Socket}, State) ->
error_logger:info_msg("Device at ~p disconnected~n", [Socket]),
{stop, normal, State};
handle_info(_Any, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.

> I think I will reestructure my supervision tree as follow. One root
> supervisor with one_for_one strategy, a child supervisor
> (simple_one_for_one) for tcp client connections and child gen_server
> worker, pretty much like the example hanging out there.

Sounds good to me.


Ladislav Lenart

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Wed Sep 07, 2011 10:55 am Reply with quote
Guest
> [I hope it's ok that I reply to the list - someone else might find
> this information useful as well. I am mentioning this only because
> you keep replying to me privately.]

Ooops, apologies ... I certainly didn't mean to write to you alone.

> It makes very little sense to restart these processes (to me), because
> the TCP connection will die as well. The external client can reconnect
> and start anew. Or am I missing something?

Indeed, the client will reconnect. But I think I found the problem. The
process that is listening for new connections in gen_tcp:accept/1 dies
at some point, all other processes with established connections are fine
but eventually crash (cause of bad input), no further reconnections
will be possible.

How can I isolate the listening process? or reestructure to restart it
if it crashes?

At the end, I simplified it and have this:

-module(comm_tcp).
-export([start_link/2, acceptor/2]).

start_link(SupPid, ListenSocket) ->
{ok, proc_lib:spawn_link(?MODULE, acceptor, [SupPid, ListenSocket])}.

acceptor(SupPid, ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
{ok, Pid} = comm_client_sup:start_child(SupPid),
%% gen_tcp:controlling_process(Socket, Pid),
error_logger:info_msg("New connection from ~p~n", [Socket]),
inet:setopts(Socket, [binary, {nodelay, true}, {active, true}]),
loop(Socket).

loop(Socket) ->
receive
{tcp, Socket, Data} ->
error_logger:info_msg("Messaged received from ~p: ~p~n",[Socket, Data]),
comm_lib:handle_message(Socket, Data),
loop(Socket);
{tcp_closed, Socket} ->
error_logger:info_msg("Device at ~p disconnected~n",[Socket]);
_Any ->
loop(Socket)
end.

--
Reynaldo

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Wed Sep 07, 2011 2:16 pm Reply with quote
Guest
Hello.


On 7.9.2011 12:55, Reynaldo Baquerizo wrote:
>> [I hope it's ok that I reply to the list - someone else might find
>> this information useful as well. I am mentioning this only because
>> you keep replying to me privately.]
>
> Ooops, apologies ... I certainly didn't mean to write to you alone.

No problem Smile


>> It makes very little sense to restart these processes (to me), because
>> the TCP connection will die as well. The external client can reconnect
>> and start anew. Or am I missing something?
>
> Indeed, the client will reconnect. But I think I found the problem. The
> process that is listening for new connections in gen_tcp:accept/1 dies
> at some point, all other processes with established connections are fine
> but eventually crash (cause of bad input), no further reconnections
> will be possible.

I see. Can you provide us with the exact cause of it (i.e. what
gen_tcp:accept/1 returned)? It seems to me that under normal
operation (if the accept was successful at least once before),
there should be no problem of this kind...


> How can I isolate the listening process? or reestructure to restart it
> if it crashes?

You need to restructure the processes then:

port (service) supervisor - one_to_one
acceptor - worker
session supervisor - simple_one_to_one
connection - worker

Notes:
* The above is a complete hierarchy of one TCP service listening on
a given port.
* There's exactly one acceptor under a port supervisor. The acceptor
creates a listening socket and calls gen_tcp:accept/1 on it. It's
terminated via brutal_kill (because of the blocking nature of the
gen_tcp:accept/1 call). If it crashes it will be restarted by the
port supervisor.
* session_supervisor is basically a tcp_sup.
* connection is essentially the loop(Socket) part of the previous
connection process. These don't have to be killed via brutal_kill
because they don't block.

All the "magic" happens in the acceptor process...


%%%%%%%%%%%%%%%%%%%%
%%% TCP acceptor %%%
%%%%%%%%%%%%%%%%%%%%
-module(tcp_acceptor).

start_link(SupPid, Port) ->
{ok, proc_lib:spawn_link(?MODULE, init, [SupPid, Port])}.

init(SupPid, Port) ->
%% IMPORTANT: active must be set to false.
{ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]),
accept(SupPid, ListenSocket).

accept(SupPid, ListenSocket) ->
{ok, Socket} = gen_tcp:accept(ListenSocket),
start_connection(SupPid, Socket),
accept(SupPid, ListenSocket).

start_connection(SupPid, Socket) ->
%% tcp_session_sup is the id of the simple_one_to_one session supervisor
%% in the child specification of the port supervisor (SupPid here).
Kids = supervisor:which_children(SupPid),
{value, {tcp_session_sup, SessionSup, _, _}} = lists:keysearch(tcp_session_sup, 1, Kids),
{ok, Pid}} = supervisor:start_child(SessionSup, []),
%% Force Socket to send future messages to Pid and not to me.
ok = gen_tcp:controlling_process(Socket, Pid),
%% Inform Pid about Socket (let it initialize the TCP session).
%% As a bare minimum Pid should remember Socket in its internal state
%% and set active flag to one of {active, true} or {active, once}.
Pid ! {init_tcp_session, Socket}.


I hope the rest of the picture is clear now. Note also that this can be
turned into a generic TCP service / application fairly easily. You just
need to parametrize the above with:
* CallbackModule - name of the module that implements application specific
connection process on top of TCP. Session supervisor needs it to create
a desired child specification.
* CallbackOptions - list of initial arguments passed to connection process
via message {init_tcp_session, Socket, CallbackOptions}.
* PacketOptions - list of PacketOptions passed to tcp_acceptor.


If you have further questions, please do not hesitate and ask! Smile


Ladislav Lenart

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Wed Sep 07, 2011 2:43 pm Reply with quote
Guest
Hi. I can not help but offer a suggestion. As one working example, the UBF framework contains a TCP/IP listener - called proc_socket_server.

https://github.com/norton/ubf/blob/master/src/proc_socket_server.erl

The structure is simple and effective and doesn't use supervisors or even gen_servers. As previously suggested, supervisors adds unnecessary complexity (and possibly negative performance impact) than seems required for the task.

I suspect your worker process will be different than UBF's worker. Nevertheless, the UBF worker's loop for reading and writing from the socket is simple and easy to follow. The worker is called - contract_driver.

https://github.com/norton/ubf/blob/master/src/contract_driver.erl

regards,


Joseph Norton



On Sep 7, 2011, at 11:15 PM, Ladislav Lenart wrote:

> Hello.
>
>
> On 7.9.2011 12:55, Reynaldo Baquerizo wrote:
>>> [I hope it's ok that I reply to the list - someone else might find
>>> this information useful as well. I am mentioning this only because
>>> you keep replying to me privately.]
>>
>> Ooops, apologies ... I certainly didn't mean to write to you alone.
>
> No problem Smile
>
>
>>> It makes very little sense to restart these processes (to me), because
>>> the TCP connection will die as well. The external client can reconnect
>>> and start anew. Or am I missing something?
>>
>> Indeed, the client will reconnect. But I think I found the problem. The
>> process that is listening for new connections in gen_tcp:accept/1 dies
>> at some point, all other processes with established connections are fine
>> but eventually crash (cause of bad input), no further reconnections
>> will be possible.
>
> I see. Can you provide us with the exact cause of it (i.e. what
> gen_tcp:accept/1 returned)? It seems to me that under normal
> operation (if the accept was successful at least once before),
> there should be no problem of this kind...
>
>
>> How can I isolate the listening process? or reestructure to restart it
>> if it crashes?
>
> You need to restructure the processes then:
>
> port (service) supervisor - one_to_one
> acceptor - worker
> session supervisor - simple_one_to_one
> connection - worker
>
> Notes:
> * The above is a complete hierarchy of one TCP service listening on
> a given port.
> * There's exactly one acceptor under a port supervisor. The acceptor
> creates a listening socket and calls gen_tcp:accept/1 on it. It's
> terminated via brutal_kill (because of the blocking nature of the
> gen_tcp:accept/1 call). If it crashes it will be restarted by the
> port supervisor.
> * session_supervisor is basically a tcp_sup.
> * connection is essentially the loop(Socket) part of the previous
> connection process. These don't have to be killed via brutal_kill
> because they don't block.
>
> All the "magic" happens in the acceptor process...
>
>
> %%%%%%%%%%%%%%%%%%%%
> %%% TCP acceptor %%%
> %%%%%%%%%%%%%%%%%%%%
> -module(tcp_acceptor).
>
> start_link(SupPid, Port) ->
> {ok, proc_lib:spawn_link(?MODULE, init, [SupPid, Port])}.
>
> init(SupPid, Port) ->
> %% IMPORTANT: active must be set to false.
> {ok, ListenSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, {reuseaddr, true}, {active, false}]),
> accept(SupPid, ListenSocket).
>
> accept(SupPid, ListenSocket) ->
> {ok, Socket} = gen_tcp:accept(ListenSocket),
> start_connection(SupPid, Socket),
> accept(SupPid, ListenSocket).
>
> start_connection(SupPid, Socket) ->
> %% tcp_session_sup is the id of the simple_one_to_one session supervisor
> %% in the child specification of the port supervisor (SupPid here).
> Kids = supervisor:which_children(SupPid),
> {value, {tcp_session_sup, SessionSup, _, _}} = lists:keysearch(tcp_session_sup, 1, Kids),
> {ok, Pid}} = supervisor:start_child(SessionSup, []),
> %% Force Socket to send future messages to Pid and not to me.
> ok = gen_tcp:controlling_process(Socket, Pid),
> %% Inform Pid about Socket (let it initialize the TCP session).
> %% As a bare minimum Pid should remember Socket in its internal state
> %% and set active flag to one of {active, true} or {active, once}.
> Pid ! {init_tcp_session, Socket}.
>
>
> I hope the rest of the picture is clear now. Note also that this can be
> turned into a generic TCP service / application fairly easily. You just
> need to parametrize the above with:
> * CallbackModule - name of the module that implements application specific
> connection process on top of TCP. Session supervisor needs it to create
> a desired child specification.
> * CallbackOptions - list of initial arguments passed to connection process
> via message {init_tcp_session, Socket, CallbackOptions}.
> * PacketOptions - list of PacketOptions passed to tcp_acceptor.
>
>
> If you have further questions, please do not hesitate and ask! Smile
>
>
> Ladislav Lenart
>
> _______________________________________________
> erlang-questions mailing list
> erlang-questions@erlang.org
> http://erlang.org/mailman/listinfo/erlang-questions

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Wed Sep 07, 2011 5:33 pm Reply with quote
Guest
>>> It makes very little sense to restart these processes (to me), because
>>> the TCP connection will die as well. The external client can reconnect
>>> and start anew. Or am I missing something?
>>
>> Indeed, the client will reconnect. But I think I found the problem. The
>> process that is listening for new connections in gen_tcp:accept/1 dies
>> at some point, all other processes with established connections are fine
>> but eventually crash (cause of bad input), no further reconnections
>> will be possible.
>
> I see. Can you provide us with the exact cause of it (i.e. what
> gen_tcp:accept/1 returned)? It seems to me that under normal
> operation (if the accept was successful at least once before),
> there should be no problem of this kind...

Achh,, found it (think so)
I've hit the ERL_MAX_PORTS limit

Log Report:

=CRASH REPORT==== 7-Sep-2011::10:42:53 ===
crasher:
initial call: comm_tcp:acceptor/2
pid: <0.1833.0>
registered_name: []
exception error: no match of right hand side value {error,enfile}
in function comm_tcp:acceptor/2
ancestors: [comm_client_sup,comm_sup,<0.51.0>]
messages: []
links: [<0.61.0>]
dictionary: []
trap_exit: false
status: running
heap_size: 233
stack_size: 24
reductions: 96
neighbours:

=SUPERVISOR REPORT==== 7-Sep-2011::10:42:53 ===
Supervisor: {local,comm_client_sup}
Context: child_terminated
Reason: {badmatch,{error,enfile}}
Offender: [{pid,<0.1833.0>},
{name,comm_tcp},
{mfargs,{comm_tcp,start_link,undefined}},
{restart_type,temporary},
{shutdown,brutal_kill},
{child_type,worker}]

For further reference I am running Erlang on a Windows Server 2008.

Besides setting ERL_MAX_PORTS to a higher number, any other suggestion?
I have already found this thread
http://erlang.2086793.n4.nabble.com/error-emfile-on-windows-td3064840.html
in case anyone else find it useful.




>> How can I isolate the listening process? or reestructure to restart it
>> if it crashes?
>
> You need to restructure the processes then:
>
>
Guest
Posted: Wed Sep 07, 2011 5:38 pm Reply with quote
Guest
> Hi.
Guest
Posted: Thu Sep 08, 2011 8:04 am Reply with quote
Guest
Hello.


On 7.9.2011 19:33, Reynaldo Baquerizo wrote:
>>>> It makes very little sense to restart these processes (to me), because
>>>> the TCP connection will die as well. The external client can reconnect
>>>> and start anew. Or am I missing something?
>>>
>>> Indeed, the client will reconnect. But I think I found the problem. The
>>> process that is listening for new connections in gen_tcp:accept/1 dies
>>> at some point, all other processes with established connections are fine
>>> but eventually crash (cause of bad input), no further reconnections
>>> will be possible.
>>
>> I see. Can you provide us with the exact cause of it (i.e. what
>> gen_tcp:accept/1 returned)? It seems to me that under normal
>> operation (if the accept was successful at least once before),
>> there should be no problem of this kind...
>
> Achh,, found it (think so)
> I've hit the ERL_MAX_PORTS limit

Ok.


> For further reference I am running Erlang on a Windows Server 2008.
>
> Besides setting ERL_MAX_PORTS to a higher number, any other suggestion?
> I have already found this thread
> http://erlang.2086793.n4.nabble.com/error-emfile-on-windows-td3064840.html
> in case anyone else find it useful.

No, I am afraid. Maybe someone else can help...


>>> How can I isolate the listening process? or reestructure to restart it
>>> if it crashes?
>>
>> You need to restructure the processes then:
>>
>> port (service) supervisor - one_to_one
>> acceptor - worker
>> session supervisor - simple_one_to_one
>> connection - worker
>
> I don't want to augment the complexity, I will tune ERL_MAX_PORTS and
> handle the exhaustion with a case clause to avoid terminating the
> process.

One question, how do you plan to handle the exhaustion of the ports,
i.e. what will you do in the newly added case clause?


> Thanks for the lengthy explanation

You're welcome.


Ladislav Lenart

_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist
Guest
Posted: Thu Sep 08, 2011 11:23 am Reply with quote
Guest
Reynaldo Baquerizo <reynaldomic@gmail.com> wrote:
>
>Achh,, found it (think so)
>I've hit the ERL_MAX_PORTS limit

> exception error: no match of right hand side value {error,enfile}

>For further reference I am running Erlang on a Windows Server 2008.

I don't know if things are radically different on Windows (I would hope
not), but on *nix 'enfile' means

1> file:format_error(enfile).
"file table overflow"

which is a system-wide (i.e. in your OS) limit on the number of open
file descriptors, as opposed to

2> file:format_error(emfile).
"too many open files"

which is a per-OS-process limit for the same thing, i.e. in this case
for the Erlang VM, whereas hitting ERL_MAX_PORTS results in

3> file:format_error(system_limit).
"a system limit was hit, probably not enough ports"

Btw, on *nix, ERL_MAX_PORTS defaults to the closest power of 2 at or
above the limit that gives 'emfile' - but you can of course hit
ERL_MAX_PORTS without getting 'emfile' when you use ports for things
that aren't file descriptors - notably drivers.

--Per Hedeland
_______________________________________________
erlang-questions mailing list
erlang-questions@erlang.org
http://erlang.org/mailman/listinfo/erlang-questions
Post received from mailinglist

Display posts from previous:  

All times are GMT
Page 1 of 1
This forum is locked: you cannot post, reply to, or edit topics.

Jump to:  

You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
You cannot attach files in this forum
You cannot download files in this forum