gen_server obsługujący DB

0

Napisałem sobie moduł obsługujący bazę danych.

%%% -------------------------------------------------------------------
%%% Author  : Johny
%%% Description : obsługa DB postgres
%%%
%%% Created : 11-04-2012
%%% -------------------------------------------------------------------
-module(cs_DB).

-behaviour(gen_server).
%% --------------------------------------------------------------------
%% Include files
%% --------------------------------------------------------------------

%% --------------------------------------------------------------------
%% External exports
-export([start_link/0,insert_timestamp/4]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {db}).

%% ====================================================================
%% External functions
%% ====================================================================
-define(HOST, "127.0.0.1").
-define(DB, "db").
-define(USER, "XXX").
-define(PASS, "YYY").
-define(DATABASE, "ZZZ").

start_link() ->
	State = #state{},
	
	gen_server:start_link({local, ?MODULE}, ?MODULE, State, []).

  
%% ====================================================================
%% Server functions
%% ====================================================================

%%
%% Zapis czasu obslugi
insert_timestamp(ID,Timestamp1,Timestamp2,Typ) ->
	gen_server:call(?MODULE, {insert_timestamp,ID,Timestamp1,Timestamp2,Typ} ).



%% --------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%% --------------------------------------------------------------------
init(State) ->
	case pgsql:connect(?HOST, ?USER, ?PASS, [{database, ?DATABASE}]) of
		{ok, DB} ->
			{ok, #state{db = DB}};
		Unkn ->
			error_logger:error_msg("Connectiont: ~p",[Unkn]),
			{false}
	end.

%% --------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%% --------------------------------------------------------------------
handle_call({insert_timestamp,ID,T1 ,T2 ,Typ}, State) ->
	%%przeliczam na timestamp z ms
	T3 = T2 - T1,
	Query = io_lib:format("INSERT INTO \"statystyki\" (serialid, timestamp1, timestamp2, timestamp, typ) VALUES (~p,~p , ~p,~p,~p)",[ID,T1,T2,T3,Typ]),
	Reply = case pgsql:squery(State#state.db,Query) of
		{ok, X} ->
			ok;
		_->
			error
	end,
	{reply, Reply, State};
handle_call(Request, From, State) ->
    Reply = ok,
    {reply, Reply, State}.

%% --------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%% --------------------------------------------------------------------

handle_cast(Msg, State) ->
    {noreply, State}.

%% --------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%% --------------------------------------------------------------------
handle_info(Info, State) ->
    {noreply, State}.

%% --------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%% --------------------------------------------------------------------
terminate(Reason, State) ->
    ok.

%% --------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%% --------------------------------------------------------------------
code_change(OldVsn, State, Extra) ->
    {ok, State}.

%% --------------------------------------------------------------------
%%% Internal functions
%% --------------------------------------------------------------------

Kod działa prawidłowo i nie ma z nim raczej problemów od strony funkcjonalnej. Schody zaczynają się jednak w momencie napływu dużej ilości informacji. Do obsługi DB wykorzystany został pgsql. Moduł ten działa i tutaj z nim raczej nie ma problemu. Puszczając w pętli wpisywanie danych do bazy spokojnie osiąga 1000 wpisów na sekundę. Ale jak tylko wykorzystałem gen_server to maksymalną wartość jaką udało mi się wyciągnąć to 150 wpisów/s. Dlaczego to tak wolno działa? Spotkał się ktoś z was z takim problemem?

Zastanawiam się również czy problemem nie jest sam sposób testowania. Wszystkie operacje wykonywane są na dość leciwym PC z tylko jednym rdzeniem.

0

Mógłbyś pokazać w jaki sposób testujesz oba rozwiązania, bo wątpię żeby samo zastosowanie OTP dawało taki drastyczny spadek wydajności

0

Ponownie moim problemem okazała się metoda testowania oraz sprzęt.
Wszystkie testy tak jak już wspomniałem wykonuje na słabym sprzęcie a na dodatek jeszcze proces "realplayer.exe" obciążał mi znacznie procek. Nie ma tego złego co by na dobre nie wyszło. Widzę teraz, że konieczne i tak jest dodanie kolejki połączeń do DB w celu delikatnego przyspieszenia wpisywania danych do bazy.

kod na którym początkowo testowałem umożliwiał mi wprowadzenie jedynie 500 rekordów do bazy. Nie najgorszy wynik ale lekko odbiega od oczekiwań i stawianych przeze mnie wymagań.
Zauważyłem tutaj jeszcze problem. Odpalając 1000 procesów po kilku minutach mój PC przestaje się wyrabiać nawet z 500 rekordami i całkowicie zatyka mi się proces obsługujący bazę danych wywalając timeouty.
Poniżej kod testujący 1:


-module(test_sk_device).

-behaviour(gen_server).
-export([start/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-record(state, {now, now2,socket,id}).

%%%%%%%%%%%%%%%%%%%%%%%%
start(0) ->
	ok;
start(ID) ->
	 gen_server:start_link(?MODULE, [ID], []),
	 start(ID-1).
%%%%%%%%%%%%%%%%%%%%%%%%
	 

init([ID]) ->
 	erlang:send_after(1000, self(), {check_update1}),
    {ok, #state{now = now(), now2 = now(), socket = Socket,id = ID}}.

handle_call(Request, From, State) ->
    Reply = ok,
    {reply, Reply, State}.

handle_cast(Msg, State) ->
    {noreply, State}.

handle_info({check_update1}, State) ->
{Mega, Secs, MS} =State#state.now2,
	T1 = Mega*1000000*1000000 + Secs*1000000 +MS,	
	timer:sleep(1000),
	{Mega2, Secs2, MS2} = now(),
	T2 = Mega2*1000000*1000000 + Secs2*1000000 +MS2,
	cs_DB:insert_timestamp(State#state.id,T1, T2, 1),
 	erlang:send_after(1000, self(), {check_update1}),
	{noreply, State#state{now2 = now()}};
handle_info(Info, State) ->
 	io:format("Info ~p\n",[Info]),
	{noreply, State}.

terminate(Reason, State) ->
    ok.

code_change(OldVsn, State, Extra) ->
    {ok, State}.
 

Wykorzystując następny kod udało mi się przyspieszyć do 600 rekordów/s trochę niewiele ale przynajmniej jakiś zysk.
Poniżej kod testujący 2:

 
start2(0) ->
	ok;
start2(ID) ->
	 spawn(?MODULE,insert,[]),
	 start2(ID-1).

insert() ->
	{Mega, Secs, MS} =now(),
	T1 = Mega*1000000*1000000 + Secs*1000000 +MS,	
	timer:sleep(1000),
	{Mega2, Secs2, MS2} = now(),
	T2 = Mega2*1000000*1000000 + Secs2*1000000 +MS2,
	cs_DB:insert_timestamp(1,T1, T2, 1),
	insert().

Najlepiej wyszedł test, który kompletnie nie używał gen_servera. Jego osiąg to około 700 przy jednym procesie oraz 750 przy zapuszczonych 3 testach jednocześnie. Czyżby jednak rezygnacja z gen_server jednak była dobrym rozwiązaniem?

Poniżej kod testujący 3:

test_3() ->
	case pgsql:connect(?HOST, ?USER, ?PASS, [{database, ?DATABASE}]) of
		{ok, DB} ->
			test(DB),
			{ok};
		Unkn ->
			error_logger:error_msg("Connectiont: ~p",[Unkn]),
			{false}
	end.

	
test(DB) ->
	{Mega, Secs, MS} = now(),
	T1 = Mega*1000000*1000000 + Secs*1000000 +MS,	
%% 	timer:sleep(1000),
	{Mega2, Secs2, MS2} = now(),
	T2 = Mega2*1000000*1000000 + Secs2*1000000 +MS2,
	T3 = (T2 - T1)/1000000,
	Query = io_lib:format("INSERT INTO \"statystyki\" (serialid, \"timestamp1\", \"timestamp2\", \"timestamp\", \"typ\") VALUES (~p,~p , ~p,~p,~p)",
						  [1,T1,T2,T3,1]),
	Reply = case pgsql:squery(DB,Query) of
		{ok, X} ->
			ok;
		Unknown ->
			io:format("ERROR\n"),
			error
	end,
	test(DB). 

Sprawdziłem jeszcze testy 2 i 3 na sprzęcie 4 rdzeniowym i otrzymałem następujące wyniki
Test 2: 650 rekordów/s - czyli tyle samo co wcześniej. Odpalając kilka procesów wyciągnąłem z tego 1500 rekordów
Test 3: około 3000 rekordów/s przy 5 wątkach czyli na tym można troszkę zyskać.

Jak wy obsługujecie bazę danych? Czy moje podejście jest w miarę słuszne czy jednak, źle podchodzę do tematu?

1 użytkowników online, w tym zalogowanych: 0, gości: 1