BlockingQueue z unikatowymi elementami

0

Podobny problem jak tu:
https://stackoverflow.com/questions/5237283/java-blocking-queue-containing-only-unique-elements

Potrzebuję tylko metody put i take.
Naklepałem tak o:

public class ConcurrencyBufferSetTest {

    @Test
    public void testDuplicates() throws InterruptedException {

        // given:
        int x = 10;
        int y = 20;

        // when:
        ConcurrencyBuffer<Integer> buffer = new ConcurrencyBufferSet<>();
        buffer.put(x);
        buffer.put(y);
        buffer.put(x);
        int sizeBeforeTake = buffer.size();
        int elem1 = buffer.take();
        int sizeAfterTake1 = buffer.size();
        int elem2 = buffer.take();
        int sizeAfterTake2 = buffer.size();

        // then:
        assertEquals(2, sizeBeforeTake);
        assertEquals(1, sizeAfterTake1);
        assertEquals(0, sizeAfterTake2);
        assertEquals(10, elem1);
        assertEquals(20, elem2);
    }

    @Test
    public void testEmpty() throws InterruptedException {
        ConcurrencyBuffer<Integer> buffer = new ConcurrencyBufferSet<>();

        Thread consumer = new Thread(() -> {
            try {
                int x = buffer.take();
                assertNotNull(x);
            } catch (InterruptedException e) {
                fail();
            }
        });

        Thread producer = new Thread(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
                buffer.put(1);
            } catch (InterruptedException e) {
                fail();
            }
        });

        consumer.start();
        producer.start();
        consumer.join();
        producer.join();
    }
}
public class ConcurrencyBufferSet<T> implements ConcurrencyBuffer<T> {

    private Set<T> set = new LinkedHashSet<>();

    @Override
    public void put(T t) throws InterruptedException {
        synchronized (this) {
            set.add(t);
            notify();
        }
    }

    @Override
    public T take () throws InterruptedException {
        synchronized (this) {
            while (set.isEmpty()) {
                wait();
            }
            Iterator<T> it = set.iterator();
            T t = it.next();
            it.remove();
            notify();
            return t;
        }
    }

    @Override
    public int size() {
        return set.size();
    }

    @Override public Set<T> getBufferSnapshot () {
        return new LinkedHashSet<>(set);
    }
}
  1. Jak na proste rozwiązanie czy może być?
  2. Jak to porządnie przetestować?
  3. Jak w drugim teście zrobić by consumer leciał przed producerem? Wojciech Seliga zabronił używać locków.
1
  1. Wygląda spoko, ja bym użył ReentrantLock, powinno być ciut szybsze (nie mam liczb, nie robiłem benchmarków).
  2. Nie wiem, nie umiem. Ciężko dowieźć testem, że implementacja działa wielowątkowo (jest bezpieczna + nie głodzi wątków), ponieważ przebieg zależy od architektury, na której jest odpalany kod. Taki test byłby niedeterministyczny. Raczej przetestuj funkcjonalnie.
  3. To jest po prostu synchronizacja 2 watków. Producer może zawiesić się na CountDownLatch, którą podniesie Consumer.
2

Nie widzę błędu w tym kodu. Oczywiście nawet nie wiem ile razy to powiedziałem, a bład był.
Na tego typu zagadnienia można zapuszczać testy jcstress https://github.com/jwoschitz/jcstress-examples
https://shipilev.net/talks/jvmls-July2013-jcstress.pdf

EDIT: Niestety dość specyficznie i trudno sie te testy pisze. Narzedzie dosc kaprysne jesli chodzi o srodowisko, wersje jvm, build.

0

@Charles_Ray: działa :)

    @Test(timeout = 200)
    public void testEmptyBufferWhenConsumerTakesBeforeProducerPuts() throws InterruptedException {
        ConcurrencyBuffer<Integer> buffer = new ConcurrencyBufferSet<>();

        CountDownLatch signal = new CountDownLatch(1);

        Thread consumer = new Thread(() -> {
            try {
                signal.countDown();
                int x = buffer.take();
                assertNotNull(x);
            } catch (InterruptedException e) {
                fail();
            }
        });

        Thread producer = new Thread(() -> {
            try {
                signal.await();
                TimeUnit.MILLISECONDS.sleep(10);
                buffer.put(1);
            } catch (InterruptedException e) {
                fail();
            }
        });
        
        consumer.start();
        producer.start();
        consumer.join();
        producer.join();
    }
}
3

W samym kodzie są dwa błędy - metody size i getBufferSnapshot również powinny być synchronizowane jeśli ma być zachowana konsystencja. Dodatkowo, jeśli zdarzy się, że jeden wątek będzie dodawał coś do bufora, a drugi w tym czasie wywoła getBufferSnapshot, to może polecieć ConcurrentModificationException - jako ćwiczenie zostawiam analizę dlaczego :)

Co do testowania takiego kodu, to tak jak już zostało wspomniane wcześniej, można sobie pomagać CountDownLatch i Semaphore żeby kontrolować przebieg testu. Co do zasady, przez testy nie udowodnisz, że kod wielowątkowy działa poprawnie, ale możesz zminimalizować ryzyko wystąpienia buga. Jeżeli np. Twoja kolekcja ma zachowywać semantykę kolejki, to możesz mieć test, gdzie odpalasz dwa wątki - w pierwszym writer zapisuje N elementów do bufora, w drugim wątku reader odczytuje elementy. Na końcu sprawdzasz czy reader odczytał elementy w takiej samej kolejności jak writer zapisał.

Ja zwykle definiuję w takim teście N aktorów gdzie każdy z nich wykonuje określoną ilość akcji (w kodzie to będzie N wątków które robią coś w pętli). Suma wszystkich akcji ma jakiś rezultat końcowy - zależy on od tego jak podzielisz sobie aktorów. W Twoim przypadku możesz mieć N = W + R i W == R gdzie W to liczba writerów a R to liczba readerów. Zakładając, że każdy aktor zrobi taką samą ilość akcji, powiedzmy M, to ilość elementów które przejdą przez bufor będzie wynosiła N * M i spodziewasz się, że po zakończeniu pracy aktorów rozmiar Twojego bufora będzie wynosił 0 (tyle samo operacji read i write) i zbiór elementów odczytanych przez readerów będzie równy zbiorowi elementów z którego brali writerzy.

Generalnie, im bardziej w takim teście przyciśniesz CPU, tym większa szansa na wyłapanie buga, ale nigdy nie jest ona równa 100%. Takie testy spowalniają build, ale dotyczą one zwykle krytycznych elementów systemu, więc warto poświęcić trochę czasu na nie.

0
damianem napisał(a):

Ja zwykle definiuję w takim teście N aktorów gdzie każdy z nich wykonuje określoną ilość akcji (w kodzie to będzie N wątków które robią coś w pętli). Suma wszystkich akcji ma jakiś rezultat końcowy - zależy on od tego jak podzielisz sobie aktorów. W Twoim przypadku możesz mieć N = W + R i W == R gdzie W to liczba writerów a R to liczba readerów. Zakładając, że każdy aktor zrobi taką samą ilość akcji, powiedzmy M, to ilość elementów które przejdą przez bufor będzie wynosiła N * M i spodziewasz się, że po zakończeniu pracy aktorów rozmiar Twojego bufora będzie wynosił 0 (tyle samo operacji read i write) i zbiór elementów odczytanych przez readerów będzie równy zbiorowi elementów z którego brali writerzy.

Coś takiego?

@Test(timeout = 2000)
    public void testManyThreadsOnSingleBuffer() throws InterruptedException {

        // given:
        AtomicInteger putCount = new AtomicInteger();
        AtomicInteger takenCount = new AtomicInteger();
        ConcurrencyBuffer<Integer> buffer = new ConcurrencyBufferSet<>();

        int[] producerIterations = {54, 93, 1, 44, 20};
        int[] consumerIterations = {90, 99, 10, 2, 11};
        int producerIterationsSum = Arrays.stream(producerIterations).sum();
        int consumerIterationsSum = Arrays.stream(consumerIterations).sum();

        // when:
        ExecutorService exec = Executors.newFixedThreadPool(10);
        for(int i = 0; i < producerIterations.length; i ++) {
            exec.execute(createProducerThread(buffer, putCount, producerIterations[i]));
            exec.execute(createConsumerThread(buffer, takenCount, consumerIterations[i]));
        }
        exec.shutdown();
        exec.awaitTermination(2000, TimeUnit.MILLISECONDS);

        // then:
        assertEquals(producerIterationsSum, consumerIterationsSum);
        assertEquals(putCount.get(), takenCount.get());
        assertEquals(producerIterationsSum, putCount.get());
        assertEquals(consumerIterationsSum, takenCount.get());
        assertEquals(producerIterationsSum - consumerIterationsSum, buffer.size());
    }

Wypieprza się timeoutem co 2 próbę. :(
a LinkedBlockinQueue działa dobrze.


EDYTOWANE:

    private Thread createProducerRunnable(BlockingQueue<Integer> buffer, AtomicInteger putCount, int n) {
        return new Thread(() -> {
            for (int i = 0; i < n; i++) {
                try {
                    buffer.put(i);
                    putCount.incrementAndGet();
                } catch (InterruptedException e) {
                    fail();
                }
            }
        });
    }

    private Thread createConsumerRunnable(BlockingQueue<Integer> buffer, AtomicInteger takenCount, int n) {
        return new Thread(() -> {
            for (int i = 0; i < n; i++) {
                try {
                    buffer.take();
                    takenCount.incrementAndGet();
                } catch (InterruptedException e) {
                    fail();
                }
            }
        });
    }
0

@Julian_: możesz opisać czego spodziewasz się po wait/notify?

0
yarel napisał(a):

@Julian_: możesz opisać czego spodziewasz się po wait/notify?

wait wstrzymuje wątek aż ktoś go podniesie notifyem.
Tutaj consumer ma się wstrzymać z jedzeniem, gdy na talerezu nic nie ma, a producer jeszcze nie dostarczył żarcia.

3

Chyba zapomniałeś o pewnej właściwości swojej kolekcji - jest oparta na Set więc nie przyjmuje duplikatów. Część elementów jest wysyłana przez wielu producentów, powoduje to, że w buforze nie ma tyle elementów ile spodziewają się konsumerzy, którzy wiszą i powodują timeout. Zbiór elementów do wysłania powinieneś podzielić na rozłączne podzbiory względem producerów. Teraz chyba jasne dlaczego LinkedBlockinQueue działa :)

0

Trochę pokomplikowałem, ale teraz działa pięknie w 25ms.

@Test(timeout = 300000)
    public void testManyThreadsOnSingleBuffer() throws InterruptedException {

        // given:
        AtomicInteger putCount = new AtomicInteger();
        AtomicInteger takenCount = new AtomicInteger();
        ConcurrentBuffer<Integer> buffer = new ConcurrentBufferSet<>();

        int[] producerIterations = {54, 93, 1, 44, 20};
        int[] consumerIterations = {90, 99, 10, 2, 11};
        int producerIterationsSum = Arrays.stream(producerIterations).sum();
        int consumerIterationsSum = Arrays.stream(consumerIterations).sum();
        List<Integer> elems = IntStream.iterate(1, n -> n + 1)
                .limit(producerIterationsSum)
                .boxed()
                .collect(Collectors.toList());

        // when:
        ExecutorService exec = Executors.newFixedThreadPool(10);
        Boolean x=true;
        int cursor = 0;
        for(int i = 0; i < producerIterations.length; i ++) {
            exec.execute(createConsumerRunnable(buffer, takenCount, consumerIterations[i]));
            exec.execute(createProducerRunnable(buffer, elems, cursor, putCount, producerIterations[i]));
            cursor += producerIterations[i];
        }
        exec.shutdown();
        exec.awaitTermination(200000, TimeUnit.MILLISECONDS);

        // then:
        assertEquals(producerIterationsSum, consumerIterationsSum);
        assertEquals(putCount.get(), takenCount.get());
        assertEquals(producerIterationsSum, putCount.get());
        assertEquals(consumerIterationsSum, takenCount.get());
        assertEquals(producerIterationsSum - consumerIterationsSum, buffer.size());
    }

    private Thread createProducerRunnable(ConcurrentBuffer<Integer> buffer, List<Integer> elems, int cursor, AtomicInteger putCount, int n) {
        return new Thread(() -> {
            for (int i = 0; i < n; i++) {
                try {
                    buffer.put(elems.get(cursor + i));
                    putCount.incrementAndGet();
                } catch (InterruptedException e) {
                    fail();
                }
            }
        });
    }

    private Thread createConsumerRunnable(ConcurrentBuffer<Integer> buffer, AtomicInteger takenCount, int n) {
        return new Thread(() -> {
            for (int i = 0; i < n; i++) {
                try {
                    buffer.take();
                    takenCount.incrementAndGet();
                } catch (InterruptedException e) {
                    fail();
                }
            }
        });
    }
}

Dzięki @damianem

1

No dobra a teraz przepisz ten test na wersje czytelną dla ludzi ;) Po pierwsze nazwa testu niczego nie mówi. Zalecam nazewnictwo should X when Y and Z. Po drugie setup testu jest zupełnie nieczytelny. Zrób jakiegoś ładnego buildera i DSL do konfiguracji testu, żeby dało się to czytać (i nawet pisać!) bez wiedzy jak to wszystko działa pod spodem. Plus przepisałbym też logikę samej aplikacji bo jeśli // when wymaga tylu linijek, to cos jest nie tak. Czemu nie ma serwisu który to opakowuje?

1 użytkowników online, w tym zalogowanych: 0, gości: 1