Czy masz dobry akumulator? Stream.reduce, concurrency

1

Taki ciekawy wątek się rozwinął jako off-topic w komentarzach: https://4programmers.net/Forum/1391030

No więc czy akumulator używany w Collector albo w Stream.reduce() musi być niemutowalny? Czy musi być thread-safe?

Ostatecznie zgodziliśmy się, że wystarczy, że będzie thread-safe, a najprostsza (a może i najszybsza) droga, prowadzi przez niemutowalność. Tylko uwaga, żeby nie zrobić sobie po drodze kwadratowej złożoności, np. tworząc kopię tablicy wyników.

(edit: Te przydługie i podejrzane wywody można pominąć i przejść od razu do mojego 3 postu, po drodze czytając JarkaR)

A co na to źródła? Potwierdzają:

Tomasz Nurkiewicz, Introduction to writing custom collectors in Java 8:

Generic type A simply defines the type of intermediate mutable data structure that we are going to use to accumulate items of type T in the meantime.

Czyli jest zgoda na mutowalność.

Na współbieżność bardzo musi uważać twórca implementujący interfejs Collector. Od niego zależy, jakie wymagania będziemy stawiać akumulatorowi. Ogólne wymagania opisane są w javadocu do Collectora. Ale prościej jest użyć gotowego kolektora, który Java dostarcza nam w postaci funkcji Stream.reduce().

Czy korzystając ze świetnego kolektora Oracle'a, Stream.reduce(), musimy mieć thread-safe accumulator? Nie znalazłem nigdzie bezpośredniej odpowiedzi, ale dokumentacja do reduce odsyła poprzez termin reduction, do dokumentacji pakietu java.util.stream. Tam wszystko (no, czy na pewno...) możemy doczytać.

Using reduce() instead removes all of the burden of parallelizing the reduction operation, and the library can provide an efficient parallel implementation with no additional synchronization required.

Czyli akumulator używany w Stream.reduce() nie musi być thread-safe. Potwierdza to dokumentacja do reduce. Skoro nie wymagają tam tej cechy, to znaczy, że nie musimy jej wypełniać.

Intuicyjnie przeczuwałem, że tak będzie. Przecież nie ma powodu uruchamiać tego samego akumulatora jednocześnie. Na początek każdy wątek dostanie do swojej porcji danych nowy, pusty akumulator. Czyli jest bezpiecznie. W którymś momencie będziemy łaczyć te wyniki. Ale nadal nie widzę potrzeby, by jednocześnie używać tego samego akumulatora. Najpierw wątek "1" i "2" się połączy, gdzieś tam wątek "3" i "4", a potem połączy się to wszystko razem. No i gdzie miałby się tu akumulator wysypać?

0

Kopia złożoności to nie jest kwardrat, tylko 2x mógł się mały błąd wkraść.

Akumulator to dla mnie zawsze będzie coś, co chemicznie przechowuje energie.
Więc nie wiem jak wy takie abstrakcje używacie, może nie rozumiem czegoś.

1

@jarekczek: Dobrze, że rozwinąłeś i sprawdziłes u źródeł. Warto wiedzieć dokładnie. A teraz - dlaczego moje rozwiązania były immutable, bo nie dlatego, że stream tego wymaga.
U mnie wymaga tego zdrowy rozsądek. Robienie czegokolwiek mutable jeśli nie ma potrzeby (np. wydajnosciowej) to dla mnie błąd.

Czasem (w projekcie na Spring to prawie raz dziennie) niestety trudno zrobić coś immutable i czytelnie, bez tony dodatkowego kodu, wtedy odpuszczam - zwłaszcza w Javie się to zdarza (ale i czasem w scali). Ale to jest na zasadzie - ok trudno, nie da się czysto - to będzie lekko brudno. Staram się takie zwały minimalizować. Generalnie zasada przyjęta ze Scali jest taka:
w danym lokalnym konteście (metoda, klasa) maximum 1 mutująca rzecz. Najlepiej jedna zmienna. Ewentualnie jedna mutowalna kolekcja (ale to gorsze).

To podejście wynika z doświadczenia - przez 24 lata pisałem imperatywny kod (z małymi przerwami w SML , Haskell, i Scalę - ale traktowałem to jako eksperymenty) (Btw. zacząłem od BASIC (i to z liniamii i GOTO ) i Assemblera i to jeszcze z samomodyfikujacym się kodem - gorszego startu do programowania funkcyjnego mieć nie można chyba).
Ale w końcu mi się znudziło:
Wlazłem kiedyś w Scale na bardziej (czyli nie "Scava") i odkryłem jedną ciekawostkę - mogę sobie do takiego funkcyjnego "niemutowalnego" kawałaka kodu, w jakimś zapomnianym projekcie wrócić po kilku miesiącach, zrobić totalna rozwałkę / refaktoring, i to tak, że nie ogarniam co się dzieje. A ten sku...syn jak się skompiluje to nadal działa. (btw. to słówko skompiluje jest ważne). Nawet testy nie są tak potrzebne! To jest jednak duża wygoda. Potem się okazało, że w Javie też się (do pewnego stopnia) tak da, zwłaszcza jak nie używasz zrypanych frameworków.
Krótko mówiąc:

immutability

0

Próbowałem zastosować teorię w praktyce, niestety wyszedł pewien problem. Do 3-argumentowego Stream.reduce pustego akumulatora nie podaje się w postaci funkcji Supplier, tylko w postaci obiektu identity. Czyli nie jest tak, że każdy wątek tworzy sobie akumulator. Każdy wątek dostaje na dzień dobry ten sam obiekt identity. Ten obiekt nie może być mutowalny, bo wtedy naprawdę się rozjedzie. Dobrze myślę?

Miałem trochę szczęścia pisząc mój kod na zadanie z najdłuższymi stringami, bo przypadkiem mój element identity nie podlegał mutacji. Ale dość łatwo się przed tym zabezpieczyć nadając mu klauzulę read-only (edit: nawet można użyć nulla). Wtedy kod szybko się wykrzaczy, nie będzie ukrytą bombą zegarową.

import java.util.*;

public class LongestString
{
  public static int strLen(List<String> lst)
  {
    if (lst.size() == 0)
      return 0;
    else
      return lst.get(0).length();
  }

  public static List<String> lacz(List<String> lst1, List<String> lst2)
  {
    if (strLen(lst1) < strLen(lst2))
      return lst2;
    else if (strLen(lst1) > strLen(lst2))
      return lst1;
    else {
      //Rowne dlugosci slow w obu listach, trzeba polaczyc.
      lst1.addAll(lst2);
      return lst1;
    }
  }

  public static List<String> dajNajdluzsze(List<String> we)
  {
    // identity musi być immutable, bo będzie dostarczane do wielu wątków.
    List<String> initialList = new ArrayList<>();
    List<String> identity = Collections.unmodifiableList(initialList);
    return we.stream().reduce(
      identity,
      (List<String>lstWe, String s) -> {
        List<String> lstSingle = new ArrayList<>();
        lstSingle.add(s);
        return lacz(lstWe, lstSingle);
      },
      LongestString::lacz
    );
  }

  public static void main(String args[])
  {
    String[] tab = new String[] { "a", "bc", "cd" };
    List<String> lista = Arrays.asList(tab);
    List<String> wynik = dajNajdluzsze(lista);
    for(String s: wynik)
      System.out.println("=" + s);
  }

}

Trochę więc mam zastrzeżeń do swojego pierwotnego postu, niezbyt dokładnie podszedłem do parametrów funkcji reduce. Może jednak lepiej collect? Może Collector, w którym Supplier zwraca zawsze nowy obiekt? Ale główna refleksja jest słuszna. Nie trzeba się martwić o thread-safety akumulatora, jeżeli dostarczy się każdemu wątkowi oddzielny obiekt startowy.

Czyli też poniekąd jest prawdą, że akumulator mutowalny nie thread-safe rozwali równoległe wykonanie. Jeżeli użyjemy jako identity mutowalnego obiektu i później będziemy go rozszerzać z wielu wątków.

@jarekr000000 - no tak. Zamiast rozważać i pamiętać można stosować prawie zawsze obiekty niemutowalne. Zgodnie z Twoimi zastrzeżeniami.

0

Dobra, przepraszam Was za długie wywody. Przyznaję, że dopiero zgłębiam te kolektory itp. Niemniej jednak czytając wypowiedzi, z którymi nie sposób się zgodzić, musiałem coś napisać. A mogłem krótko: zgodnie z dokumentacją reduce przeprowadza reduction, a collect - mutable reduction. Wszystko można przeczytać w klasie Stream.

Or we could use a parallelizable collect form:

ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
                                            (c, e) -> c.add(e.toString()),
                                            (c1, c2) -> c1.addAll(c2));

ArrayList jest mutowalny, nie jest thread-safe. Można używać w kolektorze, jest bezpiecznie i współbieżnie. cbdo.

1 użytkowników online, w tym zalogowanych: 0, gości: 1