| Author |
Message |
|
| Sean.Hinde at one2one.co. |
Posted: Tue Jun 13, 2000 3:53 pm |
|
|
|
Guest
|
I need to connect up two separate Erlang/OTP based systems which are
separated amongst other things by a firewall.
I don't want to run the standard distribution mechanism between the two
systems for various reasons but need a simple rpc type call in both
directions.
Out of all the mechanisms available in OTP what are peoples views on which
would be the quickest and easiest mechanism requiring least overhead and
minimum complexity. Full on CORBA would work but is probably overkill??
Views?
Sean
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| cesarini at terminus.eric |
Posted: Tue Jun 13, 2000 3:59 pm |
|
|
|
Guest
|
Might seem far fetched, but when discussing this problem a few years
ago, one of the simplest ideas and solutions which came to up was to run
the calls through the WWW proxy, masking the http requests as arguments
in the URL.
//fc
Sean Hinde wrote:
>
> I need to connect up two separate Erlang/OTP based systems which are
> separated amongst other things by a firewall.
>
> I don't want to run the standard distribution mechanism between the two
> systems for various reasons but need a simple rpc type call in both
> directions.
>
> Out of all the mechanisms available in OTP what are peoples views on which
> would be the quickest and easiest mechanism requiring least overhead and
> minimum complexity. Full on CORBA would work but is probably overkill??
>
> Views?
>
> Sean
--
Francesco Cesarini
Erlang/OTP consultant
Cellular: +44-(0)7776 250381
ECN: 832-707192
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| mbj at bluetail.com |
Posted: Tue Jun 13, 2000 4:03 pm |
|
|
|
Guest
|
Sean Hinde <Sean.Hinde_at_one2one.co.uk> wrote:
> Out of all the mechanisms available in OTP what are peoples views on which
> would be the quickest and easiest mechanism requiring least overhead and
> minimum complexity.
Set up a socket, possibly perform a handshake alg, maybe md5 digest
based, and send erlang binaries:
gen_tcp:send(Sock, term_to_binary({apply, M, F, A}))
[You might have to take care of which node is responsible for
connecting (simple alg: smallest node name), or handle simultanoeus
connects.]
If each system consists of more than one node, you might run this
server in a distributed application, to make it fault tolerant.
> Full on CORBA would work but is probably overkill??
I would definitely think so ;-)
/martin
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| etxuwig at etxb.ericsson. |
Posted: Tue Jun 13, 2000 4:09 pm |
|
|
|
Guest
|
I'm just now building a network of cooperating web servers using
"HTTP-based RPC". That is, the web servers run Erlang, and when they
need to communicate, they use a set of predefined "internal requests"
via HTTP:
query_internal(Host, Port, What, Req) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [binary, {packet, 0}]),
Auth = ccv_auth:internal_pack_user(Req#ccv_req.user),
Query = ["/ccviewer/internal/", What],
Cmd = ["GET ", Query, " HTTP/1.0
"
"Cookie: ", Auth, ";
"
"
"],
ok=io:format("connected~n", []),
gen_tcp:send(Sock, list_to_binary(Cmd)),
do_recv(Sock).
do_recv(Sock) ->
receive
{tcp, Sock, Data} ->
ok=io:format("received ~p~n", [Data]),
{B1,B2} = split_binary(Data, 4),
LengthBytes = binary_to_list(B1),
Length = i32(LengthBytes),
ok=io:format("Length = ~p~n", [Length]),
case size(B2) of
L when L == Length ->
ok=io:format("Got whole packet~n", []),
gen_tcp:close(Sock),
binary_to_term(B2);
_ ->
ok=io:format("wait for more~n", []),
do_recv(Sock, Length, B2)
end
after 10000 ->
exit(timeout)
end.
do_recv(Sock, Length, Bin) ->
receive
{tcp, Sock, Data} ->
Bin2 = list_to_binary([Bin, Data]),
case size(Bin2) of
Length ->
ok=io:format("Got whole packet~n", []),
gen_tcp:close(Sock),
binary_to_term(Bin2);
_ ->
ok=io:format("wait for more~n", []),
do_recv(Sock, Length, Bin2)
end
after 10000 ->
exit(timeout)
end.
i32([B1,B2,B3,B4]) ->
((B1 bsl 24) bor
(B2 bsl 16) bor
(B3 bsl bor
B4);
i32(I) when integer(I) ->
B1 = (I band 16#ff000000) bsr 24,
B2 = (I band 16#00ff0000) bsr 16,
B3 = (I band 16#0000ff00) bsr 8,
B4 = I band 16#000000ff,
[B1, B2, B3, B4].
And the server side looks kinda like this:
ccv_handler({get, "/internal/" ++ Query, Args}, Socket, Env) ->
X = {get, Query, Args},
put(socket, Socket),
Result = case ccv_auth:internal_request(X, Socket, Env) of
{'EXIT', Reason} ->
{error, Reason};
{ok, NewX, Req} ->
put(request, Req),
handle_internal_request(NewX, Req);
Other ->
Other
end,
Bin = term_to_binary(Result),
gen_tcp:send(Socket, [ccv_lib:i32(size(Bin)), Bin]);
Of course, one also needs an HTTP server. Personally, I use a
home-grown, based on Joe's/Luke's pico. A more natural choice perhaps
would be to use INETS, but what fun would that be?
At AXD301, we had a similar problem, and solved it there with CORBA.
It's also reasonably straightforward. I can dig up the code if you'd
like.
/Uffe
On Tue, 13 Jun 2000, Francesco Cesarini wrote:
>Might seem far fetched, but when discussing this problem a few years
>ago, one of the simplest ideas and solutions which came to up was to run
>the calls through the WWW proxy, masking the http requests as arguments
>in the URL.
>
>//fc
>
>Sean Hinde wrote:
>>
>> I need to connect up two separate Erlang/OTP based systems which are
>> separated amongst other things by a firewall.
>>
>> I don't want to run the standard distribution mechanism between the two
>> systems for various reasons but need a simple rpc type call in both
>> directions.
>>
>> Out of all the mechanisms available in OTP what are peoples views on which
>> would be the quickest and easiest mechanism requiring least overhead and
>> minimum complexity. Full on CORBA would work but is probably overkill??
>>
>> Views?
>>
>> Sean
>
>
--
Ulf Wiger tfn: +46 8 719 81 95
Network Architecture & Product Strategies mob: +46 70 519 81 95
Ericsson Telecom AB, Datacom Networks and IP Services
Varuv |
|
|
| Back to top |
|
| Sean.Hinde at one2one.co. |
Posted: Tue Jun 13, 2000 5:34 pm |
|
|
|
Guest
|
Martin,
> gen_tcp:send(Sock, term_to_binary({apply, M, F, A}))
>
> [You might have to take care of which node is responsible for
> connecting (simple alg: smallest node name), or handle simultanoeus
> connects.]
>
I like this, I wish I'd thought of it I guess using this with the inets
{packet, 2} socket option (which as far as I can tell guarantees that a
single receive returns the whole packet) could well be the basis of the
simplest rpc ever.
On the other hand this still means that I have to write the tcp client and
server implementations myself which though not that hard is significant. The
advantage I see of CORBA is that once I have figured out how to use the
beast it takes care of maintaining the connections, mapping requests and
replies etc...
Much more fun to do your own though. I'll post if I get something together
(possibly based on pico - thanks for the tip Ulf. BTW I'd definitely be
interested in seeing a good real example of CORBA usage if you find the time
:-)
BR,
Sean
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| Sean.Hinde at one2one.co. |
Posted: Wed Jun 14, 2000 12:17 pm |
|
|
|
Guest
|
OK, here it is..
A non blocking persistent client with heartbeats for socket maintenance.
Implemented using a gen_fsm (rpc_client)
A multithreaded tcp/ip server which allows for multiple connections.
Implemented using two gen_servers (rpc_listen.erl, rpc_socket.erl)
E.g Usage:
on the server node (note non distributed so no cheating!)
10>rpc_listen:start_link(3456).
{ok, <0.335.0>}
On the client node:
3>rpc_client:start_link("sean", "localhost", 3456).
{ok, <0.123.0>}
4>rpc_client:call("sean", io, format, ["How cool is Erlang?~n"], 3000).
ok
Please tear my code apart at will. I'm never very sure of the best way to
deal with socket errors on send, receive, and particularly in accept so this
needs some work (suggestions?)
Also it needs some security and more testing.
Sean
<<rpc_client.erl>> <<rpc_listen.erl>> <<rpc_socket.erl>>
============================================================
%%%----------------------------------------------------------------------
%%% File : rpc_client.erl
%%% Author : shinde_at_one2one.co.uk local account <otpuser_at_tiger>
%%% Purpose :
%%% Created : 11 Apr 2000 by shinde local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_client).
-author('shinde_at_one2one.co.uk').
-vsn(1).
-behaviour(gen_fsm).
%% External exports
-export([start_link/3]).
%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).
-export([call/5]).
-define(HEART_TIMEOUT, 8000). % Heartbeat response timeout
-define(HEART_PERIOD, 15000). % Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000). % Command response timeout
-define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000). % Period to wait before
retrying the connection
-record(state, {heart_fails = 0, % counter of heartbeat
failures
heart_ref = none, % Timer Ref of repeating
heartbeat timer
heart_timeout_ref = none, % Timer Ref for heartbeat
send timeout
cmds = 0, % ets table containing
currently outstanding commands
name = "", % Given name for this
connection
socket = 0, % connected socket
hostname = "", % hostname to maintain
client connection to
port = 0}). % listening port on above
host
%% cmds contains {{cmd, Timer_ref}, Reply_to}
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port},
[]).
call(Name, M, F, A, Timeout) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
init({Name, Hostname, Port}) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
global:re_register_name(Reg_name, self()),
{ok, connecting, #state{cmds = ets:new(cmds, []),
name = Name,
hostname = Hostname,
port = Port}, 0}. % send timeout immediately
%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
[binary, {active, true}, {packet, 2}], 5000) of
{ok, Socket} ->
Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref})) of
ok ->
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, wait_first_heart, StateData#state{socket =
Socket,
heart_timeout_ref = Ref,
heart_fails = 0}};
{error, _} ->
cancel_timer(Ref),
gen_tcp:close(Socket),
timer:sleep(5000), % Wait for a while before
retrying
{next_state, connecting, StateData, 0}
end;
{error, _} ->
timer:sleep(5000),
{next_state, connecting, StateData, 0}
end;
connecting(_, StateData) ->
{next_state, connecting, StateData}.
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_yet_heartbeating}, connecting, StateData}.
connecting({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_connected}, connecting, StateData}.
connected({apply, M, F, A, Timeout}, From, StateData) ->
Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref})) of
ok ->
ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
{next_state, connected, StateData};
{error, Reason} ->
cancel_timer(Ref),
{reply, {error, Reason}, connected, StateData}
end.
%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout},
wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
gen_tcp:close(StateData#state.socket),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
{next_state, connecting, StateData, 0};
%% Wow, received some data
handle_info({tcp, Socket, Data},
wait_first_heart, #state{socket = Socket} = StateData) ->
case catch binary_to_term(Data) of
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
% io:format("First Heart_timer_set: ~p~n", [Next_heart]),
{next_state, connected, StateData#state{heart_ref =
Next_heart}};
_ ->
gen_tcp:close(Socket),
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, connecting, StateData, 0}
end;
%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
gen_tcp:close(StateData#state.socket),
cancel_timer(StateData#state.heart_ref),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};
%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
cancel_timer(StateData#state.heart_timeout_ref), % Just in case
New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
% io:format("Heart_timer_set: ~p~n", [Next_heart]),
case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref})) of
ok ->
{next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
heart_ref =
Next_heart}};
{error, _} ->
cancel_timer(New_ref),
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
heart_timeout_ref =
null,
heart_ref = Next_heart}}
end;
%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
Cmds = StateData#state.cmds,
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, {error, timed_out}),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
Cmds = StateData#state.cmds,
case catch binary_to_term(Data) of
{reply, Ref, Reply} ->
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, Reply),
ets:delete(Cmds, {cmd, Ref}),
cancel_timer(Ref),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
cancel_timer(Ref),
{next_state, connected, StateData#state{heart_fails = 0}};
_ ->
{next_state, connected, StateData}
end;
%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
{next_state, StateName, StateData};
handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
cancel_timer(StateData#state.heart_ref),
cancel_timer(StateData#state.heart_timeout_ref),
reply_to_all(StateData#state.cmds),
{next_state, connecting, StateData#state{socket = 0},0};
handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
io:format("Tcp_Error: ~p~n", [Reason]),
{next_state, StateName, StateData};
handle_info(Info, StateName, StateData) ->
io:format("Unexpected Info: ~p~n", [Info]),
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
ok.
%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
cancel_timer(none) ->
ok;
cancel_timer(Ref) ->
erlang:cancel_timer(Ref),
receive
{timeout, Ref, _} ->
ok
after 0 ->
ok
end.
reply_to_all(Ets) ->
ok.
=====================================================================
%%%----------------------------------------------------------------------
%%% File : rpc_listen.erl
%%% Author : shinde local account <otpuser_at_tiger>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn(1).
-author('shinde_at_one2one.co.uk').
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).
-behaviour(gen_server).
-record(state,{listen_socket = []}). % Listener socket reference
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port) ->
Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
gen_server:start_link({local, Name}, ?MODULE, Port, []).
%% Access to this server from socket servers
create(ServerPid, Pid) ->
gen_server:cast(ServerPid, {create, Pid}).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init(Port) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
{reuseaddr,true}]) of
{ok, ListenSocket} ->
{ok, Pid} = rpc_socket:start(self(), ListenSocket), % Start
acceptor process
rpc_socket:get_connection(Pid), % Tell it to start accepting
{ok, #state{listen_socket = ListenSocket}};
{error, Reason} ->
{stop, Reason}
end.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
{ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket),
rpc_socket:get_connection(NewPid),
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
%
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
create(self(), self()), % Start off new acceptor as
listen socket is still open
{noreply,State};
% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
{noreply,State};
handle_info(Info, State) ->
io:format("Unexpected info: ~p~n",[Info]),
{noreply,State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
gen_tcp:close(State#state.listen_socket),
ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
==============================================================
%%%----------------------------------------------------------------------
%%% File : rpc_socket.erl
%%% Author : ottuser local account <otpuser_at_tiger>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn(1).
-author('shinde_at_one2one.co.uk').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/2, get_connection/1, worker/5]).
-behaviour(gen_server).
% Internal state for this socket process
-record(state,{listen_pid, % Pid of Listener
lis_socket, % Listener Socket
socket = undefined}). % Socket ref
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket) ->
gen_server:start_link(rpc_socket, {ListenPid, ListenSocket},[]).
get_connection(Pid) ->
gen_server:cast(Pid, get_conn).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket}) ->
{ok, #state{listen_pid = ListenPid,
lis_socket = ListenSocket}}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
case catch gen_tcp:accept(State#state.lis_socket) of
{error, closed} ->
{stop, {error, accept_failed}, State};
{error, Reason} ->
{stop, {error, accept_failed}, State};
{'EXIT', Reason} ->
{stop, {error, accept_failed}, State};
{ok, Socket} ->
rpc_listen:create(State#state.listen_pid, self()),
{noreply, State#state{socket = Socket}}
end;
handle_cast(_Reply ,State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
case catch binary_to_term(Packet) of
{heartbeat, Ref} ->
% io:format("Heartbeat Received~n"),
gen_tcp:send(Socket, term_to_binary({heart_reply, Ref})),
{noreply, State};
{apply, M, F, A, Ref} ->
Pid = spawn_link(?MODULE, worker, [M, F, A, Ref, Socket]),
{noreply, State};
Else ->
io:format("Socket Received Else: ~p~n",[Else]),
{noreply, State}
end;
handle_info({tcp_closed, Socket}, State) ->
{stop, rpc_skt_closed, State};
handle_info({tcp_error, Socket, Reason}, State) ->
gen_tcp:close(State#state.socket),
{stop, rpc_skt_error, State};
handle_info(Anymessage, State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
ok;
terminate(Reason, #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
worker(M, F, A, Ref, Socket) ->
Reply = (catch apply(M, F, A)),
% io:format("Reply: ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).
==================================================END
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| Sean.Hinde at one2one.co. |
Posted: Wed Jun 14, 2000 2:09 pm |
|
|
|
Guest
|
All,
The security didn't prove too hard so here is a new version. Based on the
RADIUS mechanism, an md5 hash of a shared secret concatenated with the Timer
reference is sent with each command. It is still open to spoofing of replies
but I guess that is not too dangerous.
It now also requires the crypto app to be started
Feel free to make use as you wish, but please share any fixes and
improvements
Sean
%%%----------------------------------------------------------------------
%%% File : rpc_socket.erl
%%% Author : ottuser local account <otpuser_at_tiger>
%%% Purpose : Accept a tcp/ip connection and then handle in and ivr
protocols
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
%%% This gen_server exists for the life of a socket connection
%%% It is spawned by tcp_listen, which send it it's own PID
%%% so we can ask it to set up a new listener if/when this one accepts
%%% a socket connection.
%%%----------------------------------------------------------------------
-module(rpc_socket).
-vsn('$Revision: 1.2 $ ').
-author('shinde_at_one2one.co.uk').
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([start/3, get_connection/1, worker/5]).
-behaviour(gen_server).
% Internal state for this socket process
-record(state,{listen_pid, % Pid of Listener
lis_socket, % Listener Socket
socket = undefined, % Socket ref
secret = null}). % Shared Secret
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start(ListenPid, ListenSocket, Secret) ->
gen_server:start_link(rpc_socket, {ListenPid, ListenSocket, Secret},[]).
get_connection(Pid) ->
gen_server:cast(Pid, get_conn).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({ListenPid, ListenSocket, Secret}) ->
{ok, #state{listen_pid = ListenPid,
lis_socket = ListenSocket,
secret = Secret}}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(get_conn, State) ->
case catch gen_tcp:accept(State#state.lis_socket) of
{error, closed} ->
{stop, {error, accept_failed}, State};
{error, Reason} ->
{stop, {error, accept_failed}, State};
{'EXIT', Reason} ->
{stop, {error, accept_failed}, State};
{ok, Socket} ->
rpc_listen:create(State#state.listen_pid, self()),
{noreply, State#state{socket = Socket}}
end;
handle_cast(_Reply ,State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({tcp, Socket, Packet}, State) ->
case catch binary_to_term(Packet) of
{heartbeat, Ref, Checksum} ->
io:format("Heartbeat Received~n"),
case check(Ref, State#state.secret, Checksum) of
true ->
io:format("check ok~n"),
gen_tcp:send(Socket, term_to_binary({heart_reply,
Ref})),
{noreply, State};
false ->
{noreply, State}
end;
{apply, M, F, A, Ref, Checksum} ->
case check(Ref, State#state.secret, Checksum) of
true ->
Pid = spawn_link(?MODULE, worker, [M, F, A, Ref,
Socket]),
{noreply, State};
false ->
{noreply, State}
end;
Else ->
io:format("Socket Received Else: ~p~n",[Else]),
{noreply, State}
end;
handle_info({tcp_closed, Socket}, State) ->
{stop, rpc_skt_closed, State};
handle_info({tcp_error, Socket, Reason}, State) ->
gen_tcp:close(State#state.socket),
{stop, rpc_skt_error, State};
handle_info(Anymessage, State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, #state{socket = undefined}) ->
ok;
terminate(Reason, #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
worker(M, F, A, Ref, Socket) ->
Reply = (catch apply(M, F, A)),
% io:format("Reply: ~p~n", [Reply]),
gen_tcp:send(Socket, term_to_binary({reply, Ref, Reply})).
check(Ref, Secret, Checksum) ->
Checksum == crypto:md5(concat_binary([term_to_binary(Ref), Secret])).
-----------------------------cut--------------------------------------------
------------------
%%%----------------------------------------------------------------------
%%% File : rpc_listen.erl
%%% Author : shinde local account <otpuser_at_tiger>
%%% Purpose : Act as a tcp/ip server and spawn processes for socket
connections
%%% Created : 26 May 1999 by ottuser local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_listen).
-vsn('$Revision: 1.2 $ ').
-author('shinde_at_one2one.co.uk').
-export([start_link/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-export([create/2]).
-behaviour(gen_server).
-record(state,{listen_socket = [], % Listener socket reference
secret = null}). % Shared Secret
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Port, Secret) ->
Name =
list_to_atom(lists:flatten(io_lib:format("rpc_server_~w",[Port]))),
gen_server:start_link({local, Name}, ?MODULE, {Port, Secret}, []).
%% Access to this server from socket servers
create(ServerPid, Pid) ->
gen_server:cast(ServerPid, {create, Pid}).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init({Port, Secret}) ->
process_flag(trap_exit, true),
Bin_secret = list_to_binary(Secret),
case gen_tcp:listen(Port,[binary,{packet,2},{active, true},
{reuseaddr,true}]) of
{ok, ListenSocket} ->
{ok, Pid} = rpc_socket:start(self(), ListenSocket, Bin_secret),
% Start acceptor process
rpc_socket:get_connection(Pid), % Tell it to start accepting
{ok, #state{listen_socket = ListenSocket,
secret = Bin_secret}};
{error, Reason} ->
{stop, Reason}
end.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(Request,From,State) ->
{reply,ok,State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast({create,Pid}, State) ->
{ok, NewPid} = rpc_socket:start(self(), State#state.listen_socket,
State#state.secret),
rpc_socket:get_connection(NewPid),
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
%
handle_info({'EXIT', Pid, {error, accept_failed}}, State) ->
create(self(), self()), % Start off new acceptor as
listen socket is still open
{noreply,State};
% normal shutdown of socket process
handle_info({'EXIT', Pid, normal}, State) ->
{noreply,State};
handle_info(Info, State) ->
io:format("Unexpected info: ~p~n",[Info]),
{noreply,State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason,State) ->
gen_tcp:close(State#state.listen_socket),
ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
--------------------------------cut--------------------------
%%%----------------------------------------------------------------------
%%% File : rpc_client.erl
%%% Author : shinde_at_one2one.co.uk local account <otpuser_at_tiger>
%%% Purpose :
%%% Created : 11 Apr 2000 by shinde local account <otpuser_at_tiger>
%%%----------------------------------------------------------------------
-module(rpc_client).
-author('shinde_at_one2one.co.uk').
-vsn('$Revision: 1.2 $ ').
-behaviour(gen_fsm).
%% External exports
-export([start_link/4]).
%% gen_fsm callbacks
-export([init/1, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-export([connecting/2]).
-export([connected/3, connecting/3, wait_first_heart/3]).
-export([call/5]).
-define(HEART_TIMEOUT, 8000). % Heartbeat response timeout
-define(HEART_PERIOD, 15000). % Inter heartbeat timeout
-define(MAX_CMD_TIMEOUT, 5000). % Command response timeout
-define(HEART_FAIL_THRESHOLD, 3). % Max number of heartbeat
failures before closing socket
-define(RETRY_PERIOD, 5000). % Period to wait before
retrying the connection
-record(state, {heart_fails = 0, % counter of heartbeat
failures
heart_ref = none, % Timer Ref of repeating
heartbeat timer
heart_timeout_ref = none, % Timer Ref for heartbeat
send timeout
cmds = 0, % ets table containing
currently outstanding commands
name = "", % Given name for this
connection
socket = 0, % connected socket
hostname = "", % hostname to maintain
client connection to
port = 0, % listening port on above
host
secret = null}). % shared secret for security
%% cmds contains {{cmd, Timer_ref}, Reply_to}
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link(Name, Hostname, Port, Secret) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:start_link({local, Reg_name}, ?MODULE, {Name, Hostname, Port,
Secret}, []).
call(Name, M, F, A, Timeout) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
gen_fsm:sync_send_event({global, Reg_name}, {apply, M, F, A, Timeout},
?MAX_CMD_TIMEOUT).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
init({Name, Hostname, Port, Secret}) ->
Reg_name = list_to_atom("rpc_client_" ++ Name),
global:re_register_name(Reg_name, self()),
{ok, connecting, #state{cmds = ets:new(cmds, []),
name = Name,
hostname = Hostname,
secret = list_to_binary(Secret),
port = Port}, 0}. % send timeout immediately
%%----------------------------------------------------------------------
%% Func: StateName/2
%% Called when gen_fsm:send_event/2,3 is invoked (async)
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%% This is called immediately on startup as a timeout from init/1
connecting(timeout, StateData) ->
case gen_tcp:connect(StateData#state.hostname, StateData#state.port,
[binary, {active, true}, {packet, 2}], 5000) of
{ok, Socket} ->
Ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
case gen_tcp:send(Socket, term_to_binary({heartbeat, Ref,
md5(Ref, StateData#state.secret)})) of
ok ->
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, wait_first_heart, StateData#state{socket =
Socket,
heart_timeout_ref = Ref,
heart_fails = 0}};
{error, _} ->
cancel_timer(Ref),
gen_tcp:close(Socket),
timer:sleep(5000), % Wait for a while before
retrying
{next_state, connecting, StateData, 0}
end;
{error, _} ->
timer:sleep(5000),
{next_state, connecting, StateData, 0}
end;
connecting(_, StateData) ->
{next_state, connecting, StateData}.
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Called when gen_fsm:sync_send_event/2,3 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
wait_first_heart({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_yet_heartbeating}, connecting, StateData}.
connecting({apply, _, _, _, _}, From, StateData) ->
{reply, {error, not_connected}, connecting, StateData}.
connected({apply, M, F, A, Timeout}, From, StateData) ->
Ref = erlang:start_timer(Timeout, self(), cmd_timeout),
case gen_tcp:send(StateData#state.socket, term_to_binary({apply, M, F,
A, Ref, md5(Ref, StateData#state.secret)})) of
ok ->
ets:insert(StateData#state.cmds, {{cmd, Ref}, From}),
{next_state, connected, StateData};
{error, Reason} ->
cancel_timer(Ref),
{reply, {error, Reason}, connected, StateData}
end.
%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Called when gen_fsm:send_all_state_event/2 is invoked.
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Called when gen_fsm:sync_send_all_state_event/2,3 is invoked
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
handle_sync_event(Event, From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% State: wait_first_heart
%%----------------------------------------------------------------------
%% Waiting for first heartbeat. Received timeout which is the current
attempt
handle_info({timeout, Ref, heart_timeout},
wait_first_heart, #state{heart_timeout_ref = Ref} = StateData)
->
gen_tcp:close(StateData#state.socket),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Waiting for first heartbeat. Received timeout which could just
%% be a none cancelled timer from a previous attempt.
%% Continue to wait for the real one.
handle_info({timeout, Ref, heart_timeout}, wait_first_heart, StateData) ->
{next_state, connecting, StateData, 0};
%% Wow, received some data
handle_info({tcp, Socket, Data},
wait_first_heart, #state{socket = Socket} = StateData) ->
case catch binary_to_term(Data) of
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
Next_heart = erlang:start_timer(?HEART_PERIOD, self(),
send_heart),
io:format("First Heart_timer_set: ~p~n", [Next_heart]),
{next_state, connected, StateData#state{heart_ref =
Next_heart}};
_ ->
gen_tcp:close(Socket),
cancel_timer(StateData#state.heart_timeout_ref),
{next_state, connecting, StateData, 0}
end;
%%----------------------------------------------------------------------
%% State: connected
%%----------------------------------------------------------------------
%% Received Nth heartbeat timeout in connected phase which means we
%% should close and start again.
handle_info({timeout, Ref, heart_timeout}, connected, StateData)
when StateData#state.heart_fails >= ?HEART_FAIL_THRESHOLD->
gen_tcp:close(StateData#state.socket),
cancel_timer(StateData#state.heart_ref),
{next_state, connecting, StateData#state{socket = 0,
heart_timeout_ref = none}, 0};
%% Received non final heartbeat timeout in connected phase, increment
counter.
handle_info({timeout, Ref, heart_timeout}, connected, StateData) ->
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails = Heart_fails + 1}};
%% Time to send a new heartbeat.
handle_info({timeout, Ref, send_heart}, connected, StateData) ->
cancel_timer(StateData#state.heart_timeout_ref), % Just in case
New_ref = erlang:start_timer(?HEART_TIMEOUT, self(), heart_timeout),
Next_heart = erlang:start_timer(?HEART_PERIOD, self(), send_heart),
% io:format("Heart_timer_set: ~p~n", [Next_heart]),
case gen_tcp:send(StateData#state.socket, term_to_binary({heartbeat,
New_ref, md5(New_ref, StateData#state.secret)})) of
ok ->
{next_state, connected, StateData#state{heart_timeout_ref =
New_ref,
heart_ref =
Next_heart}};
{error, _} ->
cancel_timer(New_ref),
Heart_fails = StateData#state.heart_fails,
{next_state, connected, StateData#state{heart_fails =
Heart_fails + 1,
heart_timeout_ref =
null,
heart_ref = Next_heart}}
end;
%% Received timeout for a sent command.
handle_info({timeout, Ref, cmd_timeout}, connected, StateData) ->
Cmds = StateData#state.cmds,
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, {error, timed_out}),
ets:delete(Cmds, {cmd, Ref}),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
%% The real stuff - received a reply or heartbeat while connected
handle_info({tcp, Socket, Data}, connected, #state{socket = Socket} =
StateData) ->
Cmds = StateData#state.cmds,
case catch binary_to_term(Data) of
{reply, Ref, Reply} ->
case ets:lookup(Cmds, {cmd, Ref}) of
[{{cmd, Ref}, Reply_to}] ->
gen_fsm:reply(Reply_to, Reply),
ets:delete(Cmds, {cmd, Ref}),
cancel_timer(Ref),
{next_state, connected, StateData};
[] ->
{next_state, connected, StateData}
end;
{heart_reply, Ref} when Ref == StateData#state.heart_timeout_ref ->
cancel_timer(Ref),
{next_state, connected, StateData#state{heart_fails = 0}};
Else ->
io:format("unknown Packet received~p~n", [Else]),
{next_state, connected, StateData}
end;
%%----------------------------------------------------------------------
%% State: all states
%%----------------------------------------------------------------------
handle_info({tcp, _, _}, StateName, StateData) -> % Ignore Packets received
in other states
{next_state, StateName, StateData};
handle_info({tcp_closed, Socket}, StateName, #state{socket = Socket} =
StateData) ->
cancel_timer(StateData#state.heart_ref),
cancel_timer(StateData#state.heart_timeout_ref),
reply_to_all(StateData#state.cmds),
{next_state, connecting, StateData#state{socket = 0},0};
handle_info({tcp_error, Socket, Reason}, StateName, #state{socket = Socket}
= StateData) ->
io:format("Tcp_Error: ~p~n", [Reason]),
{next_state, StateName, StateData};
handle_info(Info, StateName, StateData) ->
io:format("Unexpected Info: ~p~n", [Info]),
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(Reason, StateName, StatData) ->
ok.
%%----------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%%----------------------------------------------------------------------
code_change(OldVsn, StateName, StateData, Extra) ->
{ok, StateName, StateData}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
cancel_timer(none) ->
ok;
cancel_timer(Ref) ->
erlang:cancel_timer(Ref),
receive
{timeout, Ref, _} ->
ok
after 0 ->
ok
end.
reply_to_all(Ets) ->
ok.
md5(Ref, Secret) ->
crypto:md5(concat_binary([term_to_binary(Ref), Secret])).
Post generated using Mail2Forum (http://m2f.sourceforge.net) |
|
|
| Back to top |
|
| wuji |
Posted: Tue Sep 18, 2012 8:07 am |
|
|
|
User
Joined: 10 Aug 2012
Posts: 654
|
way society perceives unhealthy behaviors, like smoking, drinking and drug drug designer replica *beep* drug use, which used to fall under the general perception
a rebellious "cool."Still, what gives? Why is Slickster Danny Zuko Zuko replica designer *beep* Zuko from "Grease" out and Justin Bieber, the popstar with
boy-next-door charm in?"[Being] cool by definition requires a reference point-- point-- cheap real jordans point-- what is boring, normal, or even uncool," said Lewis.
day culture stops changing is the day our notions of of cheap designer *beep* of coolness will also be frozen in time."Unemployment Unchanged at
Pct, 80,000 Jobs AddedAnother So-So Jobs ReportBy SUSANNA KIM and and cheap designer *beep* and BILL McGUIREJuly 6, 2012 Employers added 80,000 jobs in
and the unemployment rate remained unchanged at 8.2 percent, the the [h4]discount designer *beep*[/h4] the Labor Department announced Friday, in another so-so report on
U.S. economy that promises to frame the debate for the the cheap authentic jordans the fall presidential election.Economists had expected that employers added around
jobs in June, higher than the revised 77,000 jobs added added [h2]discount designer *beep*[/h2] added in May, but lower than what is needed for |
|
|
| Back to top |
|
| mbtshoes88 |
Posted: Wed Sep 19, 2012 9:17 am |
|
|
|
User
Joined: 18 Sep 2012
Posts: 30
|
|
| Back to top |
|
|
|
All times are GMT
|
|
You cannot post new topics in this forum You cannot reply to topics in this forum You cannot edit your posts in this forum You cannot delete your posts in this forum You cannot vote in polls in this forum You cannot attach files in this forum You cannot download files in this forum
|
|
|