Przekazywanie danych z wątku do wątku

0

Witam!

Wątek A przygotowywuje dane, po przygotowaniu wykonuje funkcję przekazując dane do wątku B.
Dane są kopiowane i wątek B jest wybudzany w celu dalszej obróbki danych.
Dane nie mogą być kolejkowane, jeśli wątek B nie dokończył przetwarzania i nie jest gotowy na przyjęcie nowych danych, wątek A czeka.

Czemu wątki? Ponieważ podczas obróbki danych przez wątek B, wątek A może przygotowywać kolejne dane.

Czyli kolejność działań wątków(zachodzą na siebie) powinien być taki:

Wątek A
Wątek B
Wątek A
Wątek B
....

Przygotowałem prosty kod, aby upewnić się że kolejność działań jest prawidłowa:

#include <stdio.h>
#include <pthread.h>

struct worker_t{
	int terminate;
	
	pthread_t thread;
	pthread_cond_t cond;
	pthread_mutex_t mutex;
	int tg;
};

// thread B
static void *worker_run(void *arg){
	int cnt = 0;
	struct worker_t * worker = (struct worker_t*)arg;
	
	pthread_mutex_lock(&worker->mutex);
	while(!worker->terminate){
		pthread_cond_wait(&worker->cond, &worker->mutex);
		// tutaj przyszłościowo: obrobienie danych przekazanych z wątku A
		printf("Thread B: %d\n", cnt);
		++cnt;
	}
	pthread_mutex_unlock(&worker->mutex);
}

void worker_create(struct worker_t * worker){
	worker->terminate = 0;
	pthread_cond_init(&worker->cond, NULL);
	pthread_mutex_init(&worker->mutex, NULL);
	pthread_create(&worker->thread, NULL, worker_run, (void *)(worker));
}

void worker_destroy(struct worker_t * worker){
	printf("%s\n",__FUNCTION__);
	worker->terminate = 1;
	worker->tg = 0;
	
	pthread_join(worker->thread, NULL);
	pthread_cond_destroy(&worker->cond);
	pthread_mutex_destroy(&worker->mutex);
}

void worker_wakeup(struct worker_t * worker){
	pthread_mutex_lock(&worker->mutex);
	// tutaj przyszłościowo: przekopiowanie danych do wątku B z wątku A
	pthread_cond_signal(&worker->cond);
	pthread_mutex_unlock(&worker->mutex);
}

// thread A
int main(){
	struct worker_t worker;
	int i;
	worker_create(&worker);
	usleep(10 * 1000);

	for(i=0; i<3; ++i){
		printf("Thread B: %d\n", i); fflush(stdout);
		worker_wakeup(&worker);
	}
	
	worker_destroy(&worker);
	return 0;
}

Moje rozumowanie podczas tworzenia kodu było następujące:

  1. W wątku B jest blokowany mutex, który jest zwalniany tylko podczas oczekiwania na sygnał.
  2. Po otrzymaniu sygnału, atomicznie mutex jest blokowany.
  3. Nie ma możliwości, aby kolejny raz funkcja pthread_cond_signal została wywołana, ponieważ blokuje go mutex

Wynik jednak jest inny:

Mainer: 0
Mainer: 1
Mainer: 2
worker_destroy
Worker: 0  // nieistotne

Wygląda na to, że po wysłaniu sygnału, wątek B nie wybudza się i nie blokuje atomicznie mutex'a, stąd "signal" wywołuje się kilkukrotnie.
Jest to wg. mnie nie logiczne, po co w takim razie "wait" odblokowuje mutexy i je blokuje? (może ma to jakiś sens kiedy jest kilka wątków czekających)

Powyższy kod w QT:

#include <QDebug>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>

class Worker: public QThread
{
public:
    Worker() : QThread(){
        mTerminate = false;
        start();
        msleep(100);
    }
    ~Worker(){
        mTerminate = true;
        this->wait();
    }


    void wakeup();

private:
    QMutex mMutex;
    QWaitCondition mWaitCondition;
    bool mTerminate;

protected:
    void run();
};


void Worker::run(){
    int cnt = 0;
    QMutexLocker ml(&mMutex);

    while(!mTerminate){
        // here: mutex is lock
        mWaitCondition.wait(&mMutex);
        /* wait - how it's work:
         *
         * unlock + wait (atomic)
         * awake  + lock (atomic)
         *
         */

        // here: mutex is lock
        qDebug() << "Thread B:" << cnt;
        cnt++;
    }
}

void Worker::wakeup(){
    QMutexLocker ml(&mMutex);
    mWaitCondition.wakeAll();
}

int main(){
    Worker * worker = new Worker();

    for(int i=0; i<3; ++i){
        qDebug() << "Thread A:"<<i;
        worker->wakeup();
    }

    delete worker;
    return 0;
}

Oczywiście można to zrobić za pomocą 2 pthread_cond_t/QWaitCondition czy też za pomocą semaforów(2x), ale interesuje mnie dobre rozwiązanie.
Jak to najlepiej rozwiązać?

0

Mała poprawka, wcześniej wątki były nazwane Mainer/Worker, dla powyższych kodów powinno być:

Thread A: 0
Thread A: 1
Thread A: 2
worker_destroy
Thread B: 0  // nieistotne

1 użytkowników online, w tym zalogowanych: 0, gości: 1