Erlang/OTP Forums

Author Message

<  Erlang questions mailing list  ~  Distribution by another means

Sean.Hinde at one2one.co.
Posted: Tue Jun 13, 2000 3:53 pm Reply with quote
Guest
I need to connect up two separate Erlang/OTP based systems which are
separated amongst other things by a firewall.

I don't want to run the standard distribution mechanism between the two
systems for various reasons but need a simple rpc type call in both
directions.

Out of all the mechanisms available in OTP what are peoples views on which
would be the quickest and easiest mechanism requiring least overhead and
minimum complexity. Full on CORBA would work but is probably overkill??

Views?

Sean


Post generated using Mail2Forum (http://m2f.sourceforge.net)
cesarini at terminus.eric
Posted: Tue Jun 13, 2000 3:59 pm Reply with quote
Guest
Might seem far fetched, but when discussing this problem a few years
ago, one of the simplest ideas and solutions which came to up was to run
the calls through the WWW proxy, masking the http requests as arguments
in the URL.

//fc

Sean Hinde wrote:
>
> I need to connect up two separate Erlang/OTP based systems which are
> separated amongst other things by a firewall.
>
> I don't want to run the standard distribution mechanism between the two
> systems for various reasons but need a simple rpc type call in both
> directions.
>
> Out of all the mechanisms available in OTP what are peoples views on which
> would be the quickest and easiest mechanism requiring least overhead and
> minimum complexity. Full on CORBA would work but is probably overkill??
>
> Views?
>
> Sean

--
Francesco Cesarini

Erlang/OTP consultant
Cellular: +44-(0)7776 250381
ECN: 832-707192


Post generated using Mail2Forum (http://m2f.sourceforge.net)
mbj at bluetail.com
Posted: Tue Jun 13, 2000 4:03 pm Reply with quote
Guest
Sean Hinde <Sean.Hinde_at_one2one.co.uk> wrote:

> Out of all the mechanisms available in OTP what are peoples views on which
> would be the quickest and easiest mechanism requiring least overhead and
> minimum complexity.

Set up a socket, possibly perform a handshake alg, maybe md5 digest
based, and send erlang binaries:

gen_tcp:send(Sock, term_to_binary({apply, M, F, A}))

[You might have to take care of which node is responsible for
connecting (simple alg: smallest node name), or handle simultanoeus
connects.]

If each system consists of more than one node, you might run this
server in a distributed application, to make it fault tolerant.

> Full on CORBA would work but is probably overkill??

I would definitely think so ;-)


/martin




Post generated using Mail2Forum (http://m2f.sourceforge.net)
etxuwig at etxb.ericsson.
Posted: Tue Jun 13, 2000 4:09 pm Reply with quote
Guest
I'm just now building a network of cooperating web servers using
"HTTP-based RPC". That is, the web servers run Erlang, and when they
need to communicate, they use a set of predefined "internal requests"
via HTTP:

query_internal(Host, Port, What, Req) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),
Auth = ccv_auth:internal_pack_user(Req#ccv_req.user),
Query = ["/ccviewer/internal/", What],
Cmd = ["GET ", Query, " HTTP/1.0
"
"Cookie: ", Auth, ";
"
"
"],
ok=io:format("connected~n", []),
gen_tcp:send(Sock, list_to_binary(Cmd)),
do_recv(Sock).

do_recv(Sock) ->
receive
{tcp, Sock, Data} ->
ok=io:format("received ~p~n", [Data]),
{B1,B2} = split_binary(Data, 4),
LengthBytes = binary_to_list(B1),
Length = i32(LengthBytes),
ok=io:format("Length = ~p~n", [Length]),
case size(B2) of
L when L == Length ->
ok=io:format("Got whole packet~n", []),
gen_tcp:close(Sock),
binary_to_term(B2);
_ ->
ok=io:format("wait for more~n", []),
do_recv(Sock, Length, B2)
end
after 10000 ->
exit(timeout)
end.

do_recv(Sock, Length, Bin) ->
receive
{tcp, Sock, Data} ->
Bin2 = list_to_binary([Bin, Data]),
case size(Bin2) of
Length ->
ok=io:format("Got whole packet~n", []),
gen_tcp:close(Sock),
binary_to_term(Bin2);
_ ->
ok=io:format("wait for more~n", []),
do_recv(Sock, Length, Bin2)
end
after 10000 ->
exit(timeout)
end.

i32([B1,B2,B3,B4]) ->
((B1 bsl 24) bor
(B2 bsl 16) bor
(B3 bsl Cool bor
B4);
i32(I) when integer(I) ->
B1 = (I band 16#ff000000) bsr 24,
B2 = (I band 16#00ff0000) bsr 16,
B3 = (I band 16#0000ff00) bsr 8,
B4 = I band 16#000000ff,
[B1, B2, B3, B4].


And the server side looks kinda like this:

ccv_handler({get, "/internal/" ++ Query, Args}, Socket, Env) ->
X = {get, Query, Args},
put(socket, Socket),
Result = case ccv_auth:internal_request(X, Socket, Env) of
{'EXIT', Reason} ->
{error, Reason};
{ok, NewX, Req} ->
put(request, Req),
handle_internal_request(NewX, Req);
Other ->
Other
end,
Bin = term_to_binary(Result),
gen_tcp:send(Socket, [ccv_lib:i32(size(Bin)), Bin]);

Of course, one also needs an HTTP server. Personally, I use a
home-grown, based on Joe's/Luke's pico. A more natural choice perhaps
would be to use INETS, but what fun would that be?



At AXD301, we had a similar problem, and solved it there with CORBA.
It's also reasonably straightforward. I can dig up the code if you'd
like.

/Uffe

On Tue, 13 Jun 2000, Francesco Cesarini wrote:

>Might seem far fetched, but when discussing this problem a few years
>ago, one of the simplest ideas and solutions which came to up was to run
>the calls through the WWW proxy, masking the http requests as arguments
>in the URL.
>
>//fc
>
>Sean Hinde wrote:
>>
>> I need to connect up two separate Erlang/OTP based systems which are
>> separated amongst other things by a firewall.
>>
>> I don't want to run the standard distribution mechanism between the two
>> systems for various reasons but need a simple rpc type call in both
>> directions.
>>
>> Out of all the mechanisms available in OTP what are peoples views on which
>> would be the quickest and easiest mechanism requiring least overhead and
>> minimum complexity. Full on CORBA would work but is probably overkill??
>>
>> Views?
>>
>> Sean
>
>

--
Ulf Wiger tfn: +46 8 719 81 95
Network Architecture & Product Strategies mob: +46 70 519 81 95
Ericsson Telecom AB, Datacom Networks and IP Services
Varuv
Sean.Hinde at one2one.co.
Posted: Tue Jun 13, 2000 5:34 pm Reply with quote
Guest
Martin,

> gen_tcp:send(Sock, term_to_binary({apply, M, F, A}))
>
> [You might have to take care of which node is responsible for
> connecting (simple alg: smallest node name), or handle simultanoeus
> connects.]
>
I like this, I wish I'd thought of it Wink I guess using this with the inets
{packet, 2} socket option (which as far as I can tell guarantees that a
single receive returns the whole packet) could well be the basis of the
simplest rpc ever.

On the other hand this still means that I have to write the tcp client and
server implementations myself which though not that hard is significant. The
advantage I see of CORBA is that once I have figured out how to use the
beast it takes care of maintaining the connections, mapping requests and
replies etc...

Much more fun to do your own though. I'll post if I get something together
(possibly based on pico - thanks for the tip Ulf. BTW I'd definitely be
interested in seeing a good real example of CORBA usage if you find the time
:-)

BR,
Sean


Post generated using Mail2Forum (http://m2f.sourceforge.net)
Sean.Hinde at one2one.co.
Posted: Wed Jun 14, 2000 12:17 pm Reply with quote
Guest
OK, here it is..

A non blocking persistent client with heartbeats for socket maintenance.
Implemented using a gen_fsm (rpc_client)

A multithreaded tcp/ip server which allows for multiple connections.
Implemented using two gen_servers (rpc_listen.erl, rpc_socket.erl)

E.g Usage:

on the server node (note non distributed so no cheating!)

10>rpc_listen:start_link(3456).
{ok, <0.335.0>}

On the client node:

3>rpc_client:start_link("sean", "localhost", 3456).
{ok, <0.123.0>}
4>rpc_client:call("sean", io, format, ["How cool is Erlang?~n"], 3000).
ok


Please tear my code apart at will. I'm never very sure of the best way to
deal with socket errors on send, receive, and particularly in accept so this
needs some work (suggestions?)

Also it needs some security and more testing.

Sean

<<rpc_client.erl>> <<rpc_listen.erl>> <<rpc_socket.erl>>

============================================================

%%%----------------------------------------------------------------------
%%% File : rpc_client.erl
%%% Author : shinde_at_one2one.co.uk local account <otpuser_at_tiger>
%%% Purpose :
%%% Created : 11 Apr 2000 by shinde local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------

-module(rpc_client).
-author('shinde_at_one2one.co.uk').
-vsn(1).

-behaviour(gen_fsm).

%% External exports
-export([start_link/3]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).

-export([call/5]).

-define(HEART_TIMEOUT, 8000). % Heartbeat response timeout
-define(HEART_PERIOD, 15000). % Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000). % Command response timeout
-define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000). % Period to wait before
retrying the connection

-record(state, {heart_fails = 0, % counter of heartbeat
failures
heart_ref = none, % Timer Ref of repeating
heartbeat timer
heart_timeout_ref = none, % Timer Ref for heartbeat
send timeout
cmds = 0, % ets table containing
currently outstanding commands
name = "", % Given name for this
connection
socket = 0, % connected socket
hostname = "", % hostname to maintain
client connection to
port = 0}). % listening port on above
host

%% cmds contains {{cmd, Timer_ref}, Reply_to}

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port},
[]).

call(Name, M, F, A, Timeout) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
init({Name, Hostname, Port}) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
global:re_register_name(Reg_name, self()),
{ok, connecting, #state{cmds = ets:new(cmds, []),
name = Name,
hostname = Hostname,
port = Port}, 0}. % send timeout immediately

%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
[binary, {active, true}, {packet, 2}], 5000) of
{ok, Socket} ->
Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref})) of
ok ->
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, wait_first_heart, StateData#state{socket =
Socket,

heart_timeout_ref = Ref,

heart_fails = 0}};
{error, _} ->
cancel_timer(Ref),
gen_tcp:close(Socket),
timer:sleep(5000), % Wait for a while before
retrying
{next_state, connecting, StateData, 0}
end;
{error, _} ->
timer:sleep(5000),
{next_state, connecting, StateData, 0}
end;
connecting(_, StateData) ->
{next_state, connecting, StateData}.

%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_yet_heartbeating}, connecting, StateData}.

connecting({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_connected}, connecting, StateData}.

connected({apply, M, F, A, Timeout}, From, StateData) ->
Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref})) of
ok ->
ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
{next_state, connected, StateData};
{error, Reason} ->
cancel_timer(Ref),
{reply, {error, Reason}, connected, StateData}
end.

%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout},
wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
gen_tcp:close(StateData#state.socket),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};

%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
{next_state, connecting, StateData, 0};

%% Wow, received some data
handle_info({tcp, Socket, Data},
wait_first_heart, #state{socket = Socket} = StateData) ->
case catch binary_to_term(Data) of
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
% io:format("First Heart_timer_set: ~p~n", [Next_heart]),
{next_state, connected, StateData#state{heart_ref =
Next_heart}};
_ ->
gen_tcp:close(Socket),
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, connecting, StateData, 0}
end;

%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
gen_tcp:close(StateData#state.socket),
cancel_timer(StateData#state.heart_ref),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};

%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};

%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
cancel_timer(StateData#state.heart_timeout_ref), % Just in case
New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
% io:format("Heart_timer_set: ~p~n", [Next_heart]),
case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref})) of
ok ->
{next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
heart_ref =
Next_heart}};
{error, _} ->
cancel_timer(New_ref),
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
heart_timeout_ref =
null,
heart_ref = Next_heart}}
end;

%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
Cmds = StateData#state.cmds,
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, {error, timed_out}),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;

%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
Cmds = StateData#state.cmds,
case catch binary_to_term(Data) of
{reply, Ref, Reply} ->
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, Reply),
ets:delete(Cmds, {cmd, Ref}),
cancel_timer(Ref),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
cancel_timer(Ref),
{next_state, connected, StateData#state{heart_fails = 0}};
_ ->
{next_state, connected, StateData}
end;

%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
{next_state, StateName, StateData};

handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
cancel_timer(StateData#state.heart_ref),
cancel_timer(StateData#state.heart_timeout_ref),
reply_to_all(StateData#state.cmds),
{next_state, connecting, StateData#state{socket = 0},0};

handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
io:format("Tcp_Error: ~p~n", [Reason]),
{next_state, StateName, StateData};

handle_info(Info, StateName, StateData) ->
io:format("Unexpected Info: ~p~n", [Info]),
{next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
ok.

%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

cancel_timer(none) ->
ok;
cancel_timer(Ref) ->
erlang:cancel_timer(Ref),
receive
{timeout, Ref, _} ->
ok
after 0 ->
ok
end.

reply_to_all(Ets) ->
ok.

=====================================================================

%%%----------------------------------------------------------------------
%%% File : rpc_listen.erl
%%% Author : shinde local account <otpuser_at_tiger>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn(1).
-author('shinde_at_one2one.co.uk').

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).

-behaviour(gen_server).

-record(state,{listen_socket = []}). % Listener socket reference
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port) ->
Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
gen_server:start_link({local, Name}, ?MODULE, Port, []).


%% Access to this server from socket servers
create(ServerPid, Pid) ->
gen_server:cast(ServerPid, {create, Pid}).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init(Port) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
{reuseaddr,true}]) of
{ok, ListenSocket} ->
{ok, Pid} = rpc_socket:start(self(), ListenSocket), % Start
acceptor process
rpc_socket:get_connection(Pid), % Tell it to start accepting
{ok, #state{listen_socket = ListenSocket}};
{error, Reason} ->
{stop, Reason}
end.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
{ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket),
rpc_socket:get_connection(NewPid),
{noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
%
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
create(self(), self()), % Start off new acceptor as
listen socket is still open
{noreply,State};

% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
{noreply,State};

handle_info(Info, State) ->
io:format("Unexpected info: ~p~n",[Info]),
{noreply,State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
gen_tcp:close(State#state.listen_socket),
ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------


==============================================================


%%%----------------------------------------------------------------------
%%% File : rpc_socket.erl
%%% Author : ottuser local account <otpuser_at_tiger>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn(1).
-author('shinde_at_one2one.co.uk').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/2, get_connection/1, worker/5]).

-behaviour(gen_server).


% Internal state for this socket process
-record(state,{listen_pid, % Pid of Listener
lis_socket, % Listener Socket
socket = undefined}). % Socket ref

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket) ->
gen_server:start_link(rpc_socket, {ListenPid, ListenSocket},[]).

get_connection(Pid) ->
gen_server:cast(Pid, get_conn).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket}) ->
{ok, #state{listen_pid = ListenPid,
lis_socket = ListenSocket}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
case catch gen_tcp:accept(State#state.lis_socket) of
{error, closed} ->
{stop, {error, accept_failed}, State};
{error, Reason} ->
{stop, {error, accept_failed}, State};
{'EXIT', Reason} ->
{stop, {error, accept_failed}, State};
{ok, Socket} ->
rpc_listen:create(State#state.listen_pid, self()),
{noreply, State#state{socket = Socket}}
end;

handle_cast(_Reply ,State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
case catch binary_to_term(Packet) of
{heartbeat, Ref} ->
% io:format("Heartbeat Received~n"),
gen_tcp:send(Socket, term_to_binary({heart_reply, Ref})),
{noreply, State};
{apply, M, F, A, Ref} ->
Pid = spawn_link(?MODULE, worker, [M, F, A, Ref, Socket]),
{noreply, State};
Else ->
io:format("Socket Received Else: ~p~n",[Else]),
{noreply, State}
end;

handle_info({tcp_closed, Socket}, State) ->
{stop, rpc_skt_closed, State};

handle_info({tcp_error, Socket, Reason}, State) ->
gen_tcp:close(State#state.socket),
{stop, rpc_skt_error, State};


handle_info(Anymessage, State) ->
{noreply, State}.


%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
ok;
terminate(Reason, #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

worker(M, F, A, Ref, Socket) ->
Reply = (catch apply(M, F, A)),
% io:format("Reply: ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).




==================================================END



Post generated using Mail2Forum (http://m2f.sourceforge.net)
Sean.Hinde at one2one.co.
Posted: Wed Jun 14, 2000 2:09 pm Reply with quote
Guest
All,

The security didn't prove too hard so here is a new version. Based on the
RADIUS mechanism, an md5 hash of a shared secret concatenated with the Timer
reference is sent with each command. It is still open to spoofing of replies
but I guess that is not too dangerous.

It now also requires the crypto app to be started

Feel free to make use as you wish, but please share any fixes and
improvements

Sean


%%%----------------------------------------------------------------------
%%% File : rpc_socket.erl
%%% Author : ottuser local account <otpuser_at_tiger>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn('$Revision: 1.2 $ ').
-author('shinde_at_one2one.co.uk').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/3, get_connection/1, worker/5]).

-behaviour(gen_server).


% Internal state for this socket process
-record(state,{listen_pid, % Pid of Listener
lis_socket, % Listener Socket
socket = undefined, % Socket ref
secret = null}). % Shared Secret

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket, Secret) ->
gen_server:start_link(rpc_socket, {ListenPid, ListenSocket, Secret},[]).

get_connection(Pid) ->
gen_server:cast(Pid, get_conn).

%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket, Secret}) ->
{ok, #state{listen_pid = ListenPid,
lis_socket = ListenSocket,
secret = Secret}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
case catch gen_tcp:accept(State#state.lis_socket) of
{error, closed} ->
{stop, {error, accept_failed}, State};
{error, Reason} ->
{stop, {error, accept_failed}, State};
{'EXIT', Reason} ->
{stop, {error, accept_failed}, State};
{ok, Socket} ->
rpc_listen:create(State#state.listen_pid, self()),
{noreply, State#state{socket = Socket}}
end;

handle_cast(_Reply ,State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
case catch binary_to_term(Packet) of
{heartbeat, Ref, Checksum} ->
io:format("Heartbeat Received~n"),
case check(Ref, State#state.secret, Checksum) of
true ->
io:format("check ok~n"),
gen_tcp:send(Socket, term_to_binary({heart_reply,
Ref})),
{noreply, State};
false ->
{noreply, State}
end;
{apply, M, F, A, Ref, Checksum} ->
case check(Ref, State#state.secret, Checksum) of
true ->
Pid = spawn_link(?MODULE, worker, [M, F, A, Ref,
Socket]),
{noreply, State};
false ->
{noreply, State}
end;
Else ->
io:format("Socket Received Else: ~p~n",[Else]),
{noreply, State}
end;

handle_info({tcp_closed, Socket}, State) ->
{stop, rpc_skt_closed, State};

handle_info({tcp_error, Socket, Reason}, State) ->
gen_tcp:close(State#state.socket),
{stop, rpc_skt_error, State};


handle_info(Anymessage, State) ->
{noreply, State}.


%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
ok;
terminate(Reason, #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

worker(M, F, A, Ref, Socket) ->
Reply = (catch apply(M, F, A)),
% io:format("Reply: ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).



check(Ref, Secret, Checksum) ->
Checksum == crypto:md5(concat_binary([term_to_binary(Ref), Secret])).

-----------------------------cut--------------------------------------------
------------------

%%%----------------------------------------------------------------------
%%% File : rpc_listen.erl
%%% Author : shinde local account <otpuser_at_tiger>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn('$Revision: 1.2 $ ').
-author('shinde_at_one2one.co.uk').

-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).

-behaviour(gen_server).

-record(state,{listen_socket = [], % Listener socket reference
secret = null}). % Shared Secret
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port, Secret) ->
Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
gen_server:start_link({local, Name}, ?MODULE, {Port, Secret}, []).


%% Access to this server from socket servers
create(ServerPid, Pid) ->
gen_server:cast(ServerPid, {create, Pid}).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({Port, Secret}) ->
process_flag(trap_exit, true),
Bin_secret = list_to_binary(Secret),
case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
{reuseaddr,true}]) of
{ok, ListenSocket} ->
{ok, Pid} = rpc_socket:start(self(), ListenSocket, Bin_secret),
% Start acceptor process
rpc_socket:get_connection(Pid), % Tell it to start accepting
{ok, #state{listen_socket = ListenSocket,
secret = Bin_secret}};
{error, Reason} ->
{stop, Reason}
end.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.


%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
{ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket,
State#state.secret),
rpc_socket:get_connection(NewPid),
{noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
%
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
create(self(), self()), % Start off new acceptor as
listen socket is still open
{noreply,State};

% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
{noreply,State};

handle_info(Info, State) ->
io:format("Unexpected info: ~p~n",[Info]),
{noreply,State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
gen_tcp:close(State#state.listen_socket),
ok.

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

--------------------------------cut--------------------------

%%%----------------------------------------------------------------------
%%% File : rpc_client.erl
%%% Author : shinde_at_one2one.co.uk local account <otpuser_at_tiger>
%%% Purpose :
%%% Created : 11 Apr 2000 by shinde local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------

-module(rpc_client).
-author('shinde_at_one2one.co.uk').
-vsn('$Revision: 1.2 $ ').

-behaviour(gen_fsm).

%% External exports
-export([start_link/4]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).

-export([call/5]).

-define(HEART_TIMEOUT, 8000). % Heartbeat response timeout
-define(HEART_PERIOD, 15000). % Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000). % Command response timeout
-define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000). % Period to wait before
retrying the connection

-record(state, {heart_fails = 0, % counter of heartbeat
failures
heart_ref = none, % Timer Ref of repeating
heartbeat timer
heart_timeout_ref = none, % Timer Ref for heartbeat
send timeout
cmds = 0, % ets table containing
currently outstanding commands
name = "", % Given name for this
connection
socket = 0, % connected socket
hostname = "", % hostname to maintain
client connection to
port = 0, % listening port on above
host
secret = null}). % shared secret for security

%% cmds contains {{cmd, Timer_ref}, Reply_to}

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port, Secret) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port,
Secret}, []).

call(Name, M, F, A, Timeout) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).


%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
init({Name, Hostname, Port, Secret}) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
global:re_register_name(Reg_name, self()),
{ok, connecting, #state{cmds = ets:new(cmds, []),
name = Name,
hostname = Hostname,
secret = list_to_binary(Secret),
port = Port}, 0}. % send timeout immediately

%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
[binary, {active, true}, {packet, 2}], 5000) of
{ok, Socket} ->
Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref,
md5(Ref, StateData#state.secret)})) of
ok ->
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, wait_first_heart, StateData#state{socket =
Socket,

heart_timeout_ref = Ref,

heart_fails = 0}};
{error, _} ->
cancel_timer(Ref),
gen_tcp:close(Socket),
timer:sleep(5000), % Wait for a while before
retrying
{next_state, connecting, StateData, 0}
end;
{error, _} ->
timer:sleep(5000),
{next_state, connecting, StateData, 0}
end;
connecting(_, StateData) ->
{next_state, connecting, StateData}.

%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_yet_heartbeating}, connecting, StateData}.

connecting({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_connected}, connecting, StateData}.

connected({apply, M, F, A, Timeout}, From, StateData) ->
Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref, md5(Ref, StateData#state.secret)})) of
ok ->
ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
{next_state, connected, StateData};
{error, Reason} ->
cancel_timer(Ref),
{reply, {error, Reason}, connected, StateData}
end.

%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------

%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout},
wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
gen_tcp:close(StateData#state.socket),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};

%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
{next_state, connecting, StateData, 0};

%% Wow, received some data
handle_info({tcp, Socket, Data},
wait_first_heart, #state{socket = Socket} = StateData) ->
case catch binary_to_term(Data) of
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
io:format("First Heart_timer_set: ~p~n", [Next_heart]),
{next_state, connected, StateData#state{heart_ref =
Next_heart}};
_ ->
gen_tcp:close(Socket),
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, connecting, StateData, 0}
end;

%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
gen_tcp:close(StateData#state.socket),
cancel_timer(StateData#state.heart_ref),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};

%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};

%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
cancel_timer(StateData#state.heart_timeout_ref), % Just in case
New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
% io:format("Heart_timer_set: ~p~n", [Next_heart]),
case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref, md5(New_ref, StateData#state.secret)})) of
ok ->
{next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
heart_ref =
Next_heart}};
{error, _} ->
cancel_timer(New_ref),
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
heart_timeout_ref =
null,
heart_ref = Next_heart}}
end;

%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
Cmds = StateData#state.cmds,
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, {error, timed_out}),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;

%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
Cmds = StateData#state.cmds,
case catch binary_to_term(Data) of
{reply, Ref, Reply} ->
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, Reply),
ets:delete(Cmds, {cmd, Ref}),
cancel_timer(Ref),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
cancel_timer(Ref),
{next_state, connected, StateData#state{heart_fails = 0}};
Else ->
io:format("unknown Packet received~p~n", [Else]),
{next_state, connected, StateData}
end;

%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
{next_state, StateName, StateData};

handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
cancel_timer(StateData#state.heart_ref),
cancel_timer(StateData#state.heart_timeout_ref),
reply_to_all(StateData#state.cmds),
{next_state, connecting, StateData#state{socket = 0},0};

handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
io:format("Tcp_Error: ~p~n", [Reason]),
{next_state, StateName, StateData};

handle_info(Info, StateName, StateData) ->
io:format("Unexpected Info: ~p~n", [Info]),
{next_state, StateName, StateData}.

%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
ok.

%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

cancel_timer(none) ->
ok;
cancel_timer(Ref) ->
erlang:cancel_timer(Ref),
receive
{timeout, Ref, _} ->
ok
after 0 ->
ok
end.

reply_to_all(Ets) ->
ok.

md5(Ref, Secret) ->
crypto:md5(concat_binary([term_to_binary(Ref), Secret])).




Post generated using Mail2Forum (http://m2f.sourceforge.net)
wuji
Posted: Tue Sep 18, 2012 8:07 am Reply with quote
User Joined: 10 Aug 2012 Posts: 654
way society perceives unhealthy behaviors, like smoking, drinking and drug drug designer replica *beep* drug use, which used to fall under the general perception
a rebellious "cool."Still, what gives? Why is Slickster Danny Zuko Zuko replica designer *beep* Zuko from "Grease" out and Justin Bieber, the popstar with
boy-next-door charm in?"[Being] cool by definition requires a reference point-- point-- cheap real jordans point-- what is boring, normal, or even uncool," said Lewis.
day culture stops changing is the day our notions of of cheap designer *beep* of coolness will also be frozen in time."Unemployment Unchanged at
Pct, 80,000 Jobs AddedAnother So-So Jobs ReportBy SUSANNA KIM and and cheap designer *beep* and BILL McGUIREJuly 6, 2012— Employers added 80,000 jobs in
and the unemployment rate remained unchanged at 8.2 percent, the the [h4]discount designer *beep*[/h4] the Labor Department announced Friday, in another so-so report on
U.S. economy that promises to frame the debate for the the cheap authentic jordans the fall presidential election.Economists had expected that employers added around
jobs in June, higher than the revised 77,000 jobs added added [h2]discount designer *beep*[/h2] added in May, but lower than what is needed for
View user's profile Send private message
mbtshoes88
Posted: Wed Sep 19, 2012 9:17 am Reply with quote
User Joined: 18 Sep 2012 Posts: 30
Acquiring the true blessing with the public isn’t necessarily straightforward. Organisations consequently have to Cheap Franklin & Marshall use a lot more efforts so as to bring your showcase of these Franklin & Marshall Online intended likely prospective buyers. These products allocate a tremendous provide advertising so that you Franklin & Marshall Clothing can deliver possibility shoppers. However, don’t assume Franklin & Marshall Hoodie Sweatshirt all businesses Franklin & Marshall Man have a similar determined expenditures just for saying. Franklin & Marshall Women When there is businesses that could commit a large amount meant for build-up, many only need important resources.The effective use of tailor made embellished products and Franklin and Marshall services, for instance a long sleeve polo top, provides a new on a good buy choice to get promoting an Franklin & Marshall Shirts online business. Compared with Cheap Franklin Marshall using billboards and / or tarpaulins, these kind of Franklin & Marshall Hoodies physical objects Franklin & Marshall Tracksuit will need trifling investment. These have granted companies a highly effective method for campaigning their products and services or products. By just applying 2012 Franklin & Marshall their management and business Franklin & Marshall Outlet company name or simply system,2012 franklin Cheap Franklin and Marshall & marshall Franklin & Marshall Jackets outlet online, it’s probably for Franklin & Marshall Sale them to get a new customer.
View user's profile Send private message

Display posts from previous:  

All times are GMT
Page 1 of 1
This forum is locked: you cannot post, reply to, or edit topics.

Jump to:  

You cannot post new topics in this forum
You cannot reply to topics in this forum
You cannot edit your posts in this forum
You cannot delete your posts in this forum
You cannot vote in polls in this forum
You cannot attach files in this forum
You cannot download files in this forum