Producer-Consumer programowanie wielowątkowe - program nie może się zakończyć.

Odpowiedz Nowy wątek
2019-01-19 15:55
0

Cześć

Próbuje zaimplementować prostą relacje gdzie Producer tworzy wartość int a Consumer je pobiera i wszystko ma działać współbieżnie.
Program niby wykonuje się poprawnie natomiast nie wiem jak go zakończyć. Gdy producer wyprodukuje już wszystkie wartość i kończy swoją metode run() consumer nadal oczekuje na nowe wartości. Ktoś byłby miły pomóc to poprawnie zaimplementować i ew. wskazać jakie błędy popęłniłem?

class Buffor {

    private int[] buff;
    private int readIdx = 0;
    private int writeIdx = 0;
    private int totalCap = 0;

    { 
      buff = new int[5];
      Arrays.fill(buff,-1);
    }

    public void push(int val) {
        try {
            synchronized(this) {
                while(totalCap == buff.length) {
                    System.out.println("przepelnienie - czekanie");
                    this.wait();
                }

                buff[writeIdx % buff.length] = val;
                writeIdx++;
                totalCap++;
                this.notify();
            }
        }
        catch(InterruptedException e) {
            System.out.println("przerwanie [push]: " + e);
        }
    }

     public int pop() {
        try {
            synchronized(this) {        //proba pozyskania blokady monitora obiektu
                while(totalCap == 0) {
                    System.out.println("pusto - czekanie");
                    this.wait();
                }

                int val = buff[readIdx % buff.length];
                buff[readIdx % buff.length] = -1;
                readIdx++;
                totalCap--;
                this.notify();
                return val;
            }
        }
        catch(InterruptedException e) {
            System.out.println("przerwanie [push]: " + e);
        }
        return -666;
    }

    public synchronized void show() {
        System.out.println(Arrays.toString(buff));
    }
}

class Producer implements Runnable {
    int[] products;
    private Buffor buff;
    private final int numOfProd = 10;
    private int produceCounter = 0;
    public Producer(Buffor buff) {
        this.buff = buff;
        SecureRandom rand = new SecureRandom();
        products = (int[])IntStream.iterate(0, x->x+1).limit(numOfProd).toArray();

        System.out.println("products: " + Arrays.toString(products));
    }

    @Override public void run() {
        for(int i=0; i<numOfProd; ++i) {
            buff.push(products[i]);
            produceCounter++;
            buff.show();
        }
        System.out.println("Koniec producer!!!!!!!");
    }

}

class Consumer implements Runnable {
     private Buffor buff;
     public Consumer(Buffor buff) {
         this.buff = buff;
     }

     @Override public void run() {
         while(!Thread.interrupted()) {
            int val = buff.pop();
            System.out.println("consume val:" + val);
            buff.show();
         }
     }
}

public class ProducentConsumentNotifyWait {
    public static void main(String[] args) throws Exception {

       Buffor buff = new Buffor();
       Producer prod = new Producer(buff);
       Consumer con = new Consumer(buff);

       ExecutorService exec = Executors.newFixedThreadPool(2);

       Future<?> f1 = exec.submit(prod);
       Future<?> f2 = exec.submit(con);

       if(f1.isDone()) {
          System.out.println("Robota zrobiona");
          f2.cancel(true);
       }

    }

}
edytowany 2x, ostatnio: WojtekPoczatkujacy, 2019-01-19 15:57

Pozostało 580 znaków

2019-01-19 18:42
0

Tak na szybko to myślę, że

f1.isDone()

w twoim ifie będzie false, bo to nie czeka, aż task się zrobi, tylko w danym momencie zwraca false/true w zależności czy task się już wykonał czy nie.

Może jakbyś zamiast

if(f1.isDone()) {
          System.out.println("Robota zrobiona");
          f2.cancel(true);
       }

dał

f1.get();
System.out.println("Robota zrobiona");
f2.cancel(true);

?

Pozostało 580 znaków

2019-01-19 19:12
cs
1

Zrób pop i push synchronizowane, będzie przejrzyściej., lepiej też nie przechwytywać wyjątków w tych metodach tylko zgłaszać wątkom wywołującym obie metody. Zakończenie działania konsumenta robi się różnie, możesz wstawiać jakąś specjalną wartość lub ciąg wartości np. 0 jako sygnał zakończenia działania. Możesz też dodać metody do bufora, które służą do przekazywania informacji między producentem i konsumentem np.:

class Buffor {

  private int[] buff;
  private int readIdx = 0;
  private int writeIdx = 0;
  private int totalCap = 0;
  private boolean stop= false;

  {
    buff = new int[5];
    Arrays.fill(buff,-1);
  }

  synchronized public void push(int val) throws InterruptedException {
        while(totalCap == buff.length) {
          System.out.println("przepelnienie - czekanie");
          this.wait();
        }

        buff[writeIdx % buff.length] = val;
        writeIdx++;
        totalCap++;
        this.notify();
    }

  synchronized public int pop() throws InterruptedException {
        while(totalCap == 0) {
          System.out.println("pusto - czekanie");
          this.wait();
        }

        int val = buff[readIdx % buff.length];
        buff[readIdx % buff.length] = -1;
        readIdx++;
        totalCap--;
        this.notify();
        return val;
  }

  synchronized public void stop(){
    stop = true;
  }

  synchronized public boolean shouldStop(){
    return stop && totalCap == 1;
  }

  public synchronized void show() {
    System.out.println(Arrays.toString(buff));
  }
}

Producent po zakończeniu wywołuje na buforze stop() a konsument przed pobranie testuje :

if (buff.shouldStop()) {
    Thread.currentThread().interrupt();
}

Wtedy w main wystarczy:

Buffor buff = new Buffor();
    Producer prod = new Producer(buff);
    Consumer con = new Consumer(buff);

    ExecutorService exec = Executors.newFixedThreadPool(2);

    exec.submit(prod);
    exec.submit(con);
    exec.shutdown();

Pozostało 580 znaków

Odpowiedz
Liczba odpowiedzi na stronę

1 użytkowników online, w tym zalogowanych: 0, gości: 1, botów: 0