Przekazywanie danych z wątku do wątku

Odpowiedz Nowy wątek
2015-12-30 17:30
Otwornica
0

Witam!

Wątek A przygotowywuje dane, po przygotowaniu wykonuje funkcję przekazując dane do wątku B.
Dane są kopiowane i wątek B jest wybudzany w celu dalszej obróbki danych.
Dane nie mogą być kolejkowane, jeśli wątek B nie dokończył przetwarzania i nie jest gotowy na przyjęcie nowych danych, wątek A czeka.

Czemu wątki? Ponieważ podczas obróbki danych przez wątek B, wątek A może przygotowywać kolejne dane.

Czyli kolejność działań wątków(zachodzą na siebie) powinien być taki:

Wątek A
Wątek B
Wątek A
Wątek B
....

Przygotowałem prosty kod, aby upewnić się że kolejność działań jest prawidłowa:

#include <stdio.h>
#include <pthread.h>

struct worker_t{
    int terminate;

    pthread_t thread;
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    int tg;
};

// thread B
static void *worker_run(void *arg){
    int cnt = 0;
    struct worker_t * worker = (struct worker_t*)arg;

    pthread_mutex_lock(&worker->mutex);
    while(!worker->terminate){
        pthread_cond_wait(&worker->cond, &worker->mutex);
        // tutaj przyszłościowo: obrobienie danych przekazanych z wątku A
        printf("Thread B: %d\n", cnt);
        ++cnt;
    }
    pthread_mutex_unlock(&worker->mutex);
}

void worker_create(struct worker_t * worker){
    worker->terminate = 0;
    pthread_cond_init(&worker->cond, NULL);
    pthread_mutex_init(&worker->mutex, NULL);
    pthread_create(&worker->thread, NULL, worker_run, (void *)(worker));
}

void worker_destroy(struct worker_t * worker){
    printf("%s\n",__FUNCTION__);
    worker->terminate = 1;
    worker->tg = 0;

    pthread_join(worker->thread, NULL);
    pthread_cond_destroy(&worker->cond);
    pthread_mutex_destroy(&worker->mutex);
}

void worker_wakeup(struct worker_t * worker){
    pthread_mutex_lock(&worker->mutex);
    // tutaj przyszłościowo: przekopiowanie danych do wątku B z wątku A
    pthread_cond_signal(&worker->cond);
    pthread_mutex_unlock(&worker->mutex);
}

// thread A
int main(){
    struct worker_t worker;
    int i;
    worker_create(&worker);
    usleep(10 * 1000);

    for(i=0; i<3; ++i){
        printf("Thread B: %d\n", i); fflush(stdout);
        worker_wakeup(&worker);
    }

    worker_destroy(&worker);
    return 0;
}

Moje rozumowanie podczas tworzenia kodu było następujące:

  1. W wątku B jest blokowany mutex, który jest zwalniany tylko podczas oczekiwania na sygnał.
  2. Po otrzymaniu sygnału, atomicznie mutex jest blokowany.
  3. Nie ma możliwości, aby kolejny raz funkcja pthread_cond_signal została wywołana, ponieważ blokuje go mutex

Wynik jednak jest inny:

Mainer: 0
Mainer: 1
Mainer: 2
worker_destroy
Worker: 0  // nieistotne

Wygląda na to, że po wysłaniu sygnału, wątek B nie wybudza się i nie blokuje atomicznie mutex'a, stąd "signal" wywołuje się kilkukrotnie.
Jest to wg. mnie nie logiczne, po co w takim razie "wait" odblokowuje mutexy i je blokuje? (może ma to jakiś sens kiedy jest kilka wątków czekających)

Powyższy kod w QT:

#include <QDebug>
#include <QThread>
#include <QMutex>
#include <QWaitCondition>

class Worker: public QThread
{
public:
    Worker() : QThread(){
        mTerminate = false;
        start();
        msleep(100);
    }
    ~Worker(){
        mTerminate = true;
        this->wait();
    }

    void wakeup();

private:
    QMutex mMutex;
    QWaitCondition mWaitCondition;
    bool mTerminate;

protected:
    void run();
};

void Worker::run(){
    int cnt = 0;
    QMutexLocker ml(&mMutex);

    while(!mTerminate){
        // here: mutex is lock
        mWaitCondition.wait(&mMutex);
        /* wait - how it's work:
         *
         * unlock + wait (atomic)
         * awake  + lock (atomic)
         *
         */

        // here: mutex is lock
        qDebug() << "Thread B:" << cnt;
        cnt++;
    }
}

void Worker::wakeup(){
    QMutexLocker ml(&mMutex);
    mWaitCondition.wakeAll();
}

int main(){
    Worker * worker = new Worker();

    for(int i=0; i<3; ++i){
        qDebug() << "Thread A:"<<i;
        worker->wakeup();
    }

    delete worker;
    return 0;
}

Oczywiście można to zrobić za pomocą 2 pthread_cond_t/QWaitCondition czy też za pomocą semaforów(2x), ale interesuje mnie dobre rozwiązanie.
Jak to najlepiej rozwiązać?

Pozostało 580 znaków

2015-12-30 17:37
Otwornica
0

Mała poprawka, wcześniej wątki były nazwane Mainer/Worker, dla powyższych kodów powinno być:

Thread A: 0
Thread A: 1
Thread A: 2
worker_destroy
Thread B: 0  // nieistotne

Pozostało 580 znaków

Odpowiedz
Liczba odpowiedzi na stronę

1 użytkowników online, w tym zalogowanych: 0, gości: 1, botów: 0