Jak sprawdzić ile jeszcze porcji czeka na przetworzenie przez wątek?

0

Na podstawie inspiracji tą stroną https://doc.qt.io/qt-6/qthread.html popełniłem prosta klasę która można używać w taki sposób i powinna w wątku przetwarzać kolejne porcje Data:

Ale koncepcja ta ma jeden feler, nie można sprawdzić ile jest do końca pracy wątku, ile jeszcze porcji Data czeka na przetworzenie przez watek.

Ktoś może polecić jakiś przykład gdzie byłą by kolejka zadań aby można było policzyć ilość ?

Ewentualnie jak to przerobić ?

for (int i = 0; i < 100; ++i)
{
    Data d{i};
    emit Controler::Get().addWork(d);
}

for (int i = 0; i < 10; ++i)
{
    Data d{i};
    emit Controler::Get().addWork(d);
}
class Controler : public QObject
{
    Q_OBJECT

    Worker *worker{nullptr};
    QThread workerThread;
public:
    // Prevents any type of copy or new instance
    Controler(const Controler&) = delete;
    void operator=(const Controler&) = delete;

    // singleton Controler&
    static Controler& Get()
    {
        static Controler instance;
        return instance;
    }
private:
    Controler(QObject *parent = nullptr)
    {
        worker = new Worker;
        worker->moveToThread(&workerThread);
        connect(this, &Controler::addWork, worker, &Worker::doWork);
        workerThread.start();
    }

    ~Controler()
    {
        stopAndWait();
    }
private:
    void stopAndWait()
    {
        qDebug()<<"stopAndWait begin";
        workerThread.requestInterruption(); //   worker->stop=true;

        workerThread.quit();
        workerThread.wait();

        // workerThread.wait();
        qDebug()<<"stopAndWait end";
    }
signals:
    void addWork(const Data&);
};


class Worker : public QObject
{
    Q_OBJECT
public:
     Worker()= default;
    ~Worker() override = default;
public slots:
    void doWork(const Data &data){
        if(QThread::currentThread()->isInterruptionRequested()) return; // if(stop) return;

        qDebug()<< "doWork begin";
        for(int i = 0 ; i < 5; ++i)
        {
            if(QThread::currentThread()->isInterruptionRequested()) return; // if(stop) return;
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            qDebug()<< i;
            if(QThread::currentThread()->isInterruptionRequested()) return; // if(stop) return;
        }
        qDebug()<< "doWork end";
    }

};

0

nie można sprawdzić ile jest do końca pracy wątku, ile jeszcze porcji Data czeka na przetworzenie przez watek.

sygnały i sloty? Dawno nic nie robiłem w qt ale jak pamiętam dwa różne wątki mogą użyć slotów i sygnałów i będzie to bezpieczne wątkowo.

czyli wątek workera by nadawał sam gdzie jest.

0

W takiej koncepcji sygnał/slot jest to bezpieczne wątkowo, tylko że w momencie gdy Controler wyśle dane to trafiają one do kolejki 'Workera`
Nie widzę jednak sposobu odczytania jak długa jest kolejka , ile sygnałów czeka na zrealizowanie.

Wiec albo trzeba to jakoś skomplikować:
dodać do Controler np. std::jakisKontener<Data> daneTODO ; wtedy jednak trzeba jakoś w dwóch watkach dodawać i usuwać elementy z daneTODO
i prosta jak drut koncepcja się komplikuje :)

Jaki kontener jest zalecany ? Chciałbym dodawać i usuwać elementy z dowolnego pocatku/końca , std::deque bedzie OK ?

Jak zabezpieczyć wielowątkową modyfikację kontenera std::deque ?

0

czekaj czekaj.

Zgadza się dane trafiają do workera. Pomyślmy o tym może. Najprostszy przypadek.
Controler dodaje coś do workera -> worker ma kolejkę z Data -> pętla bierze kolejny element z kolejki -> emituje sygnał ile elementów w kolejce pozostało(emisja z workera, prosty int).

To taki najbardziej naiwny mechanizm mi sie wydaje może być.

edit:

Jak zabezpieczyć wielowątkową modyfikację kontenera std::deque ?

w język c++ i przetwarzanie współbierzne w akcji jest cały rozdział na takie tematy.

3

Nie na temat, ale kuje mnie w oczy:

Marius.Maximus napisał(a):
    emit Controler::Get().addWork(d);

emit to pusta makro, które używa się jako odnotowanie w kodzie, że tu wywojujemy sygnał.
Wywoływanie sygnału, który nie należy do bieżącej klasy to dość poważne naruszenie enkapsulacji.
Zresztą jak patrzę an samą nazwę addWork to nie wygląd a mi to na sygnał ale na slot, który dodaje coś do kolejki.
Sygnał nazwałbym: onWordAdded(const Data&). Nawet gdzieś w coding convention dla Qt napisane jest, że sygnał powinien mieć czasownik czasu przeszłego.

3

W oficjalnej dokumentacji Qt masz dobry przykład jak to zrobić poprawnie:
https://doc.qt.io/qt-5/qthread.html#details

class Worker : public QObject
{
    Q_OBJECT

public slots:
    void doWork(const QString &parameter) {
        QString result;
        /* ... here is the expensive or blocking operation ... */
        emit resultReady(result);
    }

signals:
    void resultReady(const QString &result);
};

class Controller : public QObject
{
    Q_OBJECT
    QThread workerThread;
public:
    Controller() {
        Worker *worker = new Worker;
        worker->moveToThread(&workerThread);
        connect(&workerThread, &QThread::finished, worker, &QObject::deleteLater);
        connect(this, &Controller::operate, worker, &Worker::doWork);
        connect(worker, &Worker::resultReady, this, &Controller::handleResults);
        workerThread.start();
    }
    ~Controller() {
        workerThread.quit();
        workerThread.wait();
    }
public slots:
    void handleResults(const QString &);
signals:
    void operate(const QString &);
};

Więc w twoim przypadku to powinno wyglądać tak:

// .h
class Data
{
public:
	Data() : m_i{ 0 } {}

	explicit Data( int i ) : m_i( i ) {}

public:
	int m_i;
};

Q_DECLARE_METATYPE( Data ) // ważne, bez tego Data jako argument w signal/slot nie zadziała

class Worker : public QObject
{
    Q_OBJECT
public:
    Worker() = default;
    ~Worker() override = default;

public slots:
	void doWork( const Data &d );

signals:
	void workFinished( const Data &d );
};

class Controller: public QObject
{
    Q_OBJECT

    QThread workerThread;

public:
    // Prevents any type of copy or new instance
	Controller( const Controller & ) = delete;
    void operator=( const Controller & ) = delete;

    // singleton Controler&
    static Controller &Get()
    {
        static Controller instance;
        return instance;
    }

	int getRemainingJobsCount() const { return m_activeJobs; }

private:
	Controller( QObject *parent = nullptr )
    {
        Worker *worker = new Worker;
        worker->moveToThread( &workerThread );
        connect( &workerThread, &QThread::finished, worker, &QObject::deleteLater );
        connect( this, &Controller::addedWork, worker, &Worker::doWork );
        connect( worker, &Worker::workFinished, this, &Controller::handleFinishedWork );
        workerThread.start();
    }

	~Controller()
    {
        stopAndWait();
    }
private:
    void stopAndWait()
    {
        qDebug() << "stopAndWait begin";
        workerThread.requestInterruption(); //   worker->stop=true;
        workerThread.quit();
        workerThread.wait();
        qDebug() << "stopAndWait end";
    }

public slots:
    void addWork( const Data &d );

signals:
	void addedWork( const Data &d, QPrivateSignal );

private slots:
	void handleFinishedWork( const Data &d );

private:
	int m_activeJobs = 0;
};
// .cpp
	qRegisterMetaType< Data >(); // ważne, bez tego Data jako argument w signal/slot nie zadziała

	QTimer::singleShot( 200, this, [ this ]()
	{
		qDebug() << "adding jobs";

        for( int i = 0; i < 100; ++i )
        {
            Data d{ i };
            Controller::Get().addWork( d );
        }
	});

	QTimer *timer = new QTimer( this );
	connect( timer, &QTimer::timeout, this, [ this ]
	{
		printf( "Remaining jobs: %d\n", Controller::Get().getRemainingJobsCount() );
	} );
	timer->setInterval( 1000 );
	timer->start();
// .cpp
void Worker::doWork( const Data &data ) {
    if( QThread::currentThread()->isInterruptionRequested() ) return; // if(stop) return;

    qDebug() << "doWork begin" << data.m_i;
    for( int i = 0; i < 1; ++i )
    {
        if( QThread::currentThread()->isInterruptionRequested() ) return; // if(stop) return;
        std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
        qDebug() << i;
        if( QThread::currentThread()->isInterruptionRequested() ) return; // if(stop) return;
    }
    qDebug() << "doWork end";

	emit workFinished( data );
}

//////////////////////////////////////////////////////////////////////////

void Controller::addWork( const Data &d )
{
	++m_activeJobs;

	emit addedWork( d, QPrivateSignal() );
}

void Controller::handleFinishedWork( const Data &d )
{
	--m_activeJobs;
}

Sprawdziłem i u mnie działa.

0

@mwl4 dziękuje , później to zweryfikuje i sprawdza jak bardzo sie myle

na razie uwzględniłem uwagi @MarekR22 @revcorey

Worker dostał listę obiektow w ktorej przechowuje dane do zrobienia ,
oraz musiałem dodać jakis obiekt do synchronizacji np. mutex

class Worker : public QObject
{
    Q_OBJECT
    friend class Controler;
    std::mutex mutex;    
public:
     Worker()= default;
    ~Worker() override = default;
    std::deque<Data> queueData = {};

    int workCount(){
        std::lock_guard<std::mutex> l{mutex};
        return queueData.size();
    }
public slots:
    void addWork(const Data &data){
        std::lock_guard<std::mutex> l{mutex};
        queueData.emplace_back(data); 
        qDebug() << queueData.size();
    }

    void doWork(){
        if(QThread::currentThread()->isInterruptionRequested()) return; 

        while(!queueData.empty())
        {
            auto const &_data=queueData.begin();
            qDebug()<< "doWork begin" << " value:"<< _data->Value;
            for(int i = 0 ; i < 5; ++i)
            {
                if(QThread::currentThread()->isInterruptionRequested()) return; 
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
                qDebug()<< i;
                if(QThread::currentThread()->isInterruptionRequested()) return; 
            }
            qDebug()<< "doWork end";
            std::lock_guard<std::mutex> l{mutex};
            queueData.erase(queueData.begin());
        }
    }
};

class Controler : public QObject
{
    Q_OBJECT

    Worker *worker{nullptr};
    QThread workerThread;
public:
    // Prevents any type of copy or new instance
    Controler(const Controler&) = delete;
    void operator=(const Controler&) = delete;

    // singleton Controler&
    static Controler& Get()
    {
        static Controler instance;
        return instance;
    }

    void addWork(const Data&d) { 
        worker->addWork(d); // dodaje do listy porcje danych do synchronizacji uzyty std::lock_guard<std::mutex> 
        emit onWorkAdded(); // wysylam sygnal aby robota ruszyła  
    };

    int workCount(){
        return worker->workCount();
    }
private:
    Controler(QObject *parent = nullptr):worker(new Worker) 
    {
        worker->moveToThread(&workerThread);
        connect(this, &Controler::onWorkAdded, worker, &Worker::doWork);
        workerThread.start();
    }

    ~Controler()
    {
        stopAndWait();
    }
private:
    void stopAndWait()
    {
        qDebug()<<"stopAndWait begin";
        workerThread.requestInterruption(); 
        workerThread.quit();
        workerThread.wait();
        qDebug()<<"stopAndWait end";
    }
signals:
    void onWorkAdded();
};

3
revcorey napisał(a):

nie można sprawdzić ile jest do końca pracy wątku, ile jeszcze porcji Data czeka na przetworzenie przez watek.

sygnały i sloty? Dawno nic nie robiłem w qt ale jak pamiętam dwa różne wątki mogą użyć slotów i sygnałów i będzie to bezpieczne wątkowo.

czyli wątek workera by nadawał sam gdzie jest.

To jest udokumentowane w dziwnym miejscu więc łatwo przeoczyć.
Po piersze trzeba zauważyć, że QObject::connect na jeszcze argument Qt::ConnectionType type = Qt::AutoConnection.
A tam jest wyjaśnione:

Qt Namespace | Qt Core 6.6.1

enum Qt::ConnectionType

This enum describes the types of connection that can be used between signals and slots. In particular, it determines whether a particular signal is delivered to a slot immediately or queued for delivery at a later time.

Constant Value Description
Qt::AutoConnection 0 (Default) If the receiver lives in the thread that emits the signal, Qt::DirectConnection is used. Otherwise, Qt::QueuedConnection is used. The connection type is determined when the signal is emitted.
Qt::DirectConnection 1 The slot is invoked immediately when the signal is emitted. The slot is executed in the signalling thread.
Qt::QueuedConnection 2 The slot is invoked when control returns to the event loop of the receiver's thread. The slot is executed in the receiver's thread.
Qt::BlockingQueuedConnection 3 Same as Qt::QueuedConnection, except that the signalling thread blocks until the slot returns. This connection must not be used if the receiver lives in the signalling thread, or else the application will deadlock.
Qt::UniqueConnection 0x80 This is a flag that can be combined with any one of the above connection types, using a bitwise OR. When Qt::UniqueConnection is set, QObject::connect() will fail if the connection already exists (i.e. if the same signal is already connected to the same slot for the same pair of objects). This flag was introduced in Qt 4.6.
Qt::SingleShotConnection 0x100 This is a flag that can be combined with any one of the above connection types, using a bitwise OR. When Qt::SingleShotConnection is set, the slot is going to be called only once; the connection will be automatically broken when the signal is emitted. This flag was introduced in Qt 6.0.

Czyli domyślnie Qt::AutoConnection jeśli połączone QObject przypisane są do tego samego wątku to następuje bezpośrednie wywołanie slotu.
Jak należą do różnych wątków, to wtedy argumenty slotu są pakowane i przesyłane do QEventLoop docelowego wątku, który następnie w trakcie przetwarzania eventów, odpakuje argumenty i odpali docelowy slot.
Ten mechanizm "thread hoping" wymaga meta danych dla typu, by móc skonwertować argumenty slotu do QVariant. Jeśli nie da się dokonać konwersji, to pojawia się ostrzeżenie w logach. Efekt może być taki, że połączenie sygnał/slot może przez to nie działać.

0

Dziękuje wszystkim za parę cegieł z wiedzą !
Koncept zaczerpnięty z postu @mwl4 spisuje się znakomicie

Czy można zmienić kolejność realizacji zadań przez Worker-a ? Jak wysyłam zadanie jako sygnał/slot to jest realizowane w kolejnosci w jakiej zostało wysłane
A jak bym chciał odwrócić kolejność ?
To co zostało dodane jako ostatnie będzie zrealizowane jako pierwsze ? (sterta zamiast fifo)
Czy da się to zrobić bez używania "niskopoziomowych narzędzi (mutex i locki)" ?

1

w sensie chciałbyś zmienić kolejkę eventów qt? Tego chyba się nie da zrobić. Można by zobaczyć czy w kolejce coś jest ale nie odwracać.
Ewentualnie worker potraktować jako jedno zadanie tzn. masz w contorlerze kolejkę zadań i możesz je odwracać jak chcesz, tu znowu wracasz do jakiegoś kontenera ale nie trzeba tu mutexów bo:
w wątku controlera dodajesz/startujesz zadania -> worker dostaje zadanie i tylko zwraca sygnał o zakończeniu i czeka(coś jak klasyczny producent i konsument, chcoiaż ostatecznie thread safe kolejka była by lepsza). W skrócie kontroler szereguje zadania i wrzuca pojedyńczo do workera. Dodam że to znowu pomysł po 10 sekundowym namyśle ale może coś z tego użyjesz.
edit:
There is no risk of deadlocks when using the event system for thread synchronization, unlike using low-level primitives. However, the event system does not enforce mutual exclusion. If invokable methods access shared data, they must still be protected with low-level primitives.
https://doc.qt.io/qt-6/threads-synchronizing.html

0

Chciałbym zmienić kolejność wykonywania zadań , jeżeli można by było zmienić kolejność w kolejce sygnałów to tez była by jakaś metoda , ale ja tez nie widzę możliwości grzebania w tej kolejce

ja to trawiłem chwile dłużej niz 10s ;) i zrozumiałem to tak:

  1. kontroler ma listę zadań i może wybrać te które mu pasuje (pierwsze, ostatnie czy ze środka)
  2. kontroler wysyła jedno zadanie do roboty ( o ile worker nic nie robi i czeka)
  3. worker sygnalizuje że skończył i czeka (kontroler kasuje jedno zadanie z listy)
  4. wracamy do punktu 1 jeżeli lista nie jest pusta

co rozumiemy przez "thread safe kolejka" ?
Jakis wspólny obiekt dostępny dla kontroera i workera ? np. std::deque<Data> queueData = {}; tylko poprawnie zabezpieczony na wielowątkowy dostęp ?

worker by pracował we spolpracy z jakims mutex np. tak:


std::deque<Data> queueData = {};

void worker::doWork(){
  while (true){
      { 
        std::lock_guard<std::mutex> l{mutex};
        auto const &_data=queueData.begin();  // wybieram dowolny element 
        ... praca na danych 
        queueData.erase(_data); 
      }
      // tutaj sie konczy zasieg dla mutex-a

      if(queueData.empty) return 

  }
}
2

moim zdaniem to worker nie powinien o tym wiedzieć że jest kolejka napisze psuedo kod i najprościej jak się da:

class Controller {
slot:
void finished();

private:
queue;
worker;
}
void controller::finished() {
   auto data = queue.pop(); // Jak już będziesz te dane ściągał twoja brocha
   worker.doWork(data) // w skrócie osobny wątek ma zacząć przetważać dane a czy powiadomisz go sygnałem czy jakimś wywołaniem też to szczegół.
}

void doWork(data) {
// rób pracę
emit finished;
}

Generalnie nawet nie trzeba tego robić na sygnałch i slotah bo jest notify w std i qt.

Worker nie ma żadnej listy on pracuje per jedno zadanie. a kontroler wrzuca mu po kolei kolejne i wtedy możesz sterować tym co ma wpadać do wątku. Wiadomo to ma sens dla czasochłonnych zadań.

1 użytkowników online, w tym zalogowanych: 0, gości: 1