Wielowatkowośc - pytanie

0

Cześć
uczę się wielowątkowości gdzie i jak tylko moge :)

Bogaty Kot w wątku
https://4programmers.net/Forum/1342548

dal poniższy przykład

template<typename T> class Queue
{
public:
    T pop()
    {
        std::unique_lock<std::mutex> lk(m_mutex);
        bool found = false;
        T res;
        do
        {
            m_cv.wait(lk);
            if (!m_queue.empty())
            {
                found = true;
                res = m_queue.front();
                m_queue.pop_front();
                m_cv.notify_one();
            }
            lk.unlock();
        }
        while (!found);
        return res;
    }
 
    void push(const T& e)
    {
        std::unique_lock<std::mutex> lk(m_mutex);
        m_queue.push_back(e);
        m_cv.notify_one();
    }
 
private:
    std::deque<T> m_queue;
    std::mutex m_mutex;
    std::condition_variable m_cv;
};

z komentarzem

Implementacja totalnie zwalona (mozliwy deadlock, race condition + 1 UB).
Os dependent tez zwalil ale nie chce mi sie juz tego pisac.

Proszę o wyjasnienie czy dobrze rozumiem

  1. mozliwy deadlock,- nie wiem o co chodzi? DL kojarzył mi się z sytuacją
fun1()
{
    lock_guard<mutex> g1(myMutex1);
    lock_guard<mutex> g2(myMutex2);
}

fun2()
{
    lock_guard<mutex> g1(myMutex2);
    lock_guard<mutex> g2(myMutex1);
}

i jeden sobie czeka na drugiego, drugi na pierwszego itd
nie widzę w powyższym kodzie deadlocka - może tu chodzi że jest coś nie tak z condition_variable?

  1. race condition - chodzi o to, że
    a) nie sprawdzamy empty specjalnie z mutexem (m_queue.empty()), chodzi mi o specjalną metodą dla ThreadSafe Queue
    nie potrafię niczego innego wymyśleć

3)1 UB ? co to w góle znaczy :)

z góry dziękuję za odpowiedź

2

Deadlock jest tutaj

m_cv.wait(lk);
if (!m_queue.empty())

W sytuacji gdy producent zawolal juz notfiy_one() a konsument jeszcze nie zawolal wait(), notify_one bedzie po prostu zgubione. Jezeli nie bedzie juz wiecej elementow do dodania wtedy watek nie wyjdzie z wait() - czyli deadlock.
Zeby to naprawic trzeba by zaimplementowac to tak:

std::unique_lock<std::mutex> lk(m_mutex);
while (m_queue.empty())
{
  m_cv.wait(lk);
}

Drugi blad to zwolnienie blockady na koniec do while'a:

lk.unlock();

W tym momencie w drugiej iteracji nie trzymamy juz locka. m_cv.wait(lk) bez zalozonego locka to Undefined Behavior (czyli UB). Z doukmentacji C++:
Calling this function if lock.mutex() is not locked by the current thread is undefined behavior.

Teraz jezeli program nawet sie nie wywali przez UB to kolejka bedzie modyfikowana bez zalozonego locka.

Kolejny blad to wolanie notify_one() w metodzie pop. Gosc prawdopodobnie zauwazyl ze notify_one z pusha sa gubione i probowal to naprawic w ten sposob. Tym samym tylko zminimalizowal prawdopodobienstwo deadlocka/crasha.

2

Tak ku potomności, wait ma przeładowanie, które upraszcza zapis:

while (m_queue.empty())
{
  m_cv.wait(lk);
}

do

m_cv.wait(lk, [this]{ return !m_queue.empty(); });
1

Raczej chodzilo ci o:

m_cv.wait(lk, [this]{ return !m_queue.empty(); });
0

Teraz kod tego samego goscia tyle ze windows dependent:

tamplate<typename T> class Queue
{
public:
	Queue()
	{
		InitializeCriticalSection(&m_CS);
		m_Event = CreateEvent(NULL, FALSE, FALSE, NULL);
		if (!m_Event)
		{
			throw std::exception();
		}
	}

	~Queue()
	{
		DeleteCriticalSection(&m_CS);
		CloseHandle(m_Event);
	}


	T pop()
	{
		T e;
		if (WaitForSingleObject(m_Event, INFINITE) == WAIT_OBJECT_0)
		{
			EnterCriticalSection(&m_CS);
			e = m_queue.front();
			m_queue.pop_front();

			if (!m_queue.empty())
			{
				SetEvent(m_Event);
			}
			LeaveCriticalSection(&m_CS);
		}
		return e;
	}

	void push(const T& e)
	{
		EnterCriticalSection(&m_CS);
		m_queue.push_back(e);
		SetEvent(m_Event);
		LeaveCriticalSection(&m_CS);
	}
	
private:
	CRITICAL_SECTION m_CS;
	HANDLE m_Event;
	std::deque<T> m_queue;
};

Błedów jeszcze więcej niż w poprzednim kodzie.
Miłej analizy.

0
Bogaty Kot napisał(a):

Teraz kod tego samego goscia tyle ze windows dependent:

dziękuję za odpowiedź
ale - może to głupie pytanie - co to znaczy windows dependent?

0

To znaczy, ze wykorzystuje WinApi bezposrednio.

0
Bogaty Kot napisał(a):

To znaczy, ze wykorzystuje WinApi bezposrednio.

Czytam sobie książkę Concurency in Action - na tej podtawie coś zrobiłem

mam pytanie - jak takie rzeczy testować - jak na razie chodzi ładnie (inaczej byc nie może skoro jest ten przykład opisany jako The full class definition for a thread-safe queue using condition variables :) )

a z tym wykorzystywaniem WinApi bezposrednio.- to po co, skoro od c++ thread jest w bibliotece standardowej

(przepraszam, może za swoją niewiedze, ale w pracy z wielowatkowością mam tyle wspólnego co kot napłakał i próbuję ograrnąć temat na własną rękę)

#include <condition_variable>
#include <mutex>
#include <queue>
#include <chrono>
#include <exception>
#include <string>
#include <iostream>

template<typename T>
class threadsafe_queue
{
private:
	mutable std::mutex mut;
	std::queue<T> data_queue;
	std::condition_variable data_cond;
public:
	threadsafe_queue()
	{}

	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(std::move(new_value));
		data_cond.notify_one();
	}
	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });
		value = std::move(data_queue.front());
		data_queue.pop();
	}
	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this] {return !data_queue.empty(); });
		
		std::shared_ptr<T> res(std::make_shared<T>(std::move(data_queue.front())));
	
		data_queue.pop();
		return res;
	}
	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = std::move(data_queue.front());
		data_queue.pop();
		return true;
	}
	std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res(
			std::make_shared<T>(std::move(data_queue.front())));
		data_queue.pop();
		return res;
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
};



void fun_push(threadsafe_queue<int> &queue)
{
	int i = 0;
	while (i < 10)
	{
		queue.push(i * 2 + 2);
		
		std::string vS = "fun_push: ";
		vS.append(std::to_string(i * 2 + 2)).append("\n");

		std::cout << vS;
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
		++i;
	}
}

void fun_wait_and_pop(threadsafe_queue<int> &queue)
{
	int i = 0;
	while (i < 10)
	{
		int popVal = 0;
		queue.wait_and_pop(popVal);

		std::string vS = "fun_wait_and_pop: ";
		vS.append(std::to_string(popVal)).append("\n");

		std::cout << vS;
		std::this_thread::sleep_for(std::chrono::milliseconds(150));
		++i;
	}
}

int main()
{

	threadsafe_queue<int> ts_q;

	std::thread th1(fun_push, std::ref(ts_q));
	std::thread th2(fun_wait_and_pop, std::ref(ts_q));


	th1.join();
	th2.join();

	return 0;
}


0

Pod katem wielowatkowosci wyglada dobrze. Pozbylbym sie inteligentnych wskaznikow. Na przyklad:

    T try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if (data_queue.empty())
            throw queue_empty;
        T res(std::move(data_queue.front()));
        data_queue.pop();
        return res;
    }
0

@Bogaty Kot to nie mój stos technologiczny ale tak na oko:

        if (WaitForSingleObject(m_Event, INFINITE) == WAIT_OBJECT_0)
        {
            EnterCriticalSection(&m_CS);
            e = m_queue.front();

Race condition, bo zgaduje że wiele wątków moze przejść przez ten WaitForSingleObject (szczególnie że zarówno producent jak i konsument wysyła eventy...) i potem czekać na mutexa, a jeśli w kolejce był już tylko jeden element to kolejny wątek wywali sie na front() bo kolejka pusta ;] Dodatkowo nie wiem jak działa to API ale nie widzę powiazania tego wait z kolejką, więc co się stanie jeśli producenci już nie pracują bo wypełnili kolejkę i dopiero startują konsumenci? Czy czasem nie będą wisieć tutaj do końca świata bo nikt nie wyśle im tego eventa?

            if (!m_queue.empty())
            {
                SetEvent(m_Event);
            }

Czemu to konsumer sygnalizuje że w kolejce są jeszcze elementy? o_O

0

Bledy w drugim kodzie:

  1. Jak WaitFosrSignleObject zfailuje to zostanie zwrocona niezainicjalizowana wartosc
  2. Jak queue rzuci wyjatkiem to lock nie zostanie zwolniony (wiec nastepny call do push/pop zdeadlockuje)
  3. Dokladnie to co opisales - gosc nie obsluzyl spurious wake'upow (ktore z reszta sam bezsensownie generuje wolajac SetEvent z pop) tak wiec bedzie probowal zrobic pop'a z pustej kolejki.
  4. Znowu notyfikacje sa gubione. Event nie zlicza ilosci wywolan SetEvent (do tego jest semafor). Tak wiec jak najpierw zostanie wywolany 2 razy push a dopiero potem pop, event zostanie zresetowany (przez WaitForSignleObject) i zostanie przetworzony tylko jeden element (a w kolejce sa dwa). Kolejny pop zdeadlockuje.

Czemu to konsumer sygnalizuje że w kolejce są jeszcze elementy? o_O

Prawodopodobnie dlatego, ze gosc zczail sie ze gubi notyfikacje i sprobowal to "naprawic" (podobnie z reszta jak w poprzednim kodzie).

0
multi_lotek napisał(a):

a z tym wykorzystywaniem WinApi bezposrednio.- to po co, skoro od c++ thread jest w bibliotece standardowej

Pewnie po to samo, po co istnieje tyle różnych stringów, mimo istnienia std::string.

Być może standardowy thread nie robi dokładnie tego co chcemy, bo zależy nam na konkretnej fladze Windowsa, do której nie ma dostępu za pomocą std::thread. Albo piszemy „nowy, lepszy thread” (na wzór nowych, lepszych stringów...) i wtedy logicznym jest użycie bezpośrednio API systemu a nie opakowania z biblioteki standardowej.
Albo wymagane jest, by projekt się kompilował pod starszym kompilatorem w którym nie ma std::thread.

1 użytkowników online, w tym zalogowanych: 0, gości: 1