[Spark Streams, Scala] - działanie stream i foreachRdd

0

Chciałbym się dowiedzieć kilku rzeczy związanych ze streamami w Sparku :

  1. Jak dokładnie pracują streamy w Sparku ? Czy kolejne batche są dołączane do tego samego strumienia, czy to działa w ten sposób że w strumieniu jest tylko ostatni batch (chyba że zastosujemy funkcje okienkowe) ?
  2. Jak pracuje foreachRdd w takim razie ? Wczytuje cały batch (jeśli w strumieniu mamy tylko ostatni batch) czy ze strumienia wczytuje to co aktualnie doszło (jeśli batche są kolejno dołączane w strumieniu) ?
0

Ja rozumiem tak:

W StreamingContext określasz "batchDuration", tj. przedział czasu, który będzie dzielił dane wejściowe na "batche", taki batch to RDD (podstawowy koncept sparka), zaś sekwencja RDD to DStream (czyli strumień). Jak źródło coś wyprodukuje, to Reciever (skojarzony ze strumieniem) zapisuje to "coś" w pamięci sparka. Grupa "cosi" wyprodukowanych w okresie "batchDuration" składa się na RDD/batch.

Teraz jak przetwarzasz te dane, to masz 2 opcje przetwarzania:

  1. Bezstanowe - przetwarzasz pojedynczy batch/RDD i nie masz żadnych zależności do innych RDD
  2. "Stanowe" - dla przetwarzanego RDD masz możliwość zajrzenia do wcześniejszych RDD (dla tego przypadku definiujesz okienko, które pozwala zaglądać w przeszłość, "sliding window", które zwraca Ci DStream obejmujący okres ["teraz"-"długość okna"; "teraz"])

Nie wiem jak spark wewnętrznie ogarnia RDD, które przestały być potrzebne, ale zakładam, że jeśli nie ma np. okienek które zaglądają dalej niż 1h, to spark postawiony przed faktem rychłego braku pamięci usuwa RDD "starsze niż" 1h.

Uwaga, mogę to źle rozumieć :-)

0

No właśnie zastanawiam się jak to dokładnie działa, a czy konkretniej w ten sposób (zakładając że nie używam funkcji okienkowej):
a) załóżmy że zbieram dane na batch-a przez 1 s i dodaję do strumienia
b) potem na tym strumieniu wykonuję foreachRdd i w nim operacje które zajmują 2.5 s
c) w czasie przetwarzania pierwszego batcha do strumienia zostały dodane dwa kolejne
d) po przetworzeniu pierwszego batch-a z pierwotnego strumienia jest on usuwany i przetwarzane są następne w kolejce

EDIT: d) ewentualnie kolejne batche nie siedzą w kolejce tylko są wykonywane operacje na nich równolegle, ale potem są i tak usuwane ze strumienia

1

Jak spojrzysz na imaplementację DStreama:

  1. RDD trzymane są w mapie [Time, RDD[T]]
    https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L87

  2. foreachRDD tworzy ForEachDStream:
    https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L651

  3. ForEachDStream generuje joba na określony punkt w czasie w określony sposób:
    https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala#L47

 override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

...jak dla mnie foreachRDD wykona się dla RDD z określonego batch intervalu (czyli "aktualnie" przetwarzanego)
I będzie się wykonywać co batch interval, tzn. job będzie tworzony.

W opisanym przypadku nowy RDD co sekundę, przetwarzanie 2.5 sekundy, pewnie ilość jobów będzie rosła...

1

Stare RDD usuwane są via DStream.clearMetadata, które dla określenia "stare" uwzględnia coś co się nazywa "rememberDuration" (zależne od slideDuration/checkpointDuration). Jak określisz okno, to strumień będzie pamiętał to co potrzebne do obsługi okna, może nawet więcej, jeśli chceckpointDuration jest większe niż to wynikające z okna.

Nie doszukiwałem się, gdzie wywoływane jest to clearMetadata, podejrzewam, że po zakończeniu przetwarzania joba.

0

A jakby działało coś takiego ?

val x = dstream.foreach{ x => ...}
val y = dstream.reduceByWindow(...)

slideDuration musiałoby być nie większe niż rememberDuration, ale w takim razie foreach nie mógłby wywalać tych RDD. Czyli Spark sam by musiał ustalić ten warunek zanim wykonał się jeszcze foreach ?

1

Ten dstream, którego używasz w przykładzie nie wziął się ot tak z nicości, wcześniej musiał zostać zainicjalizowany i mieć ustawione to rememberDuration.
Jak robisz coś na streamie, to najczęściej dostajesz nowy stream, dla którego "parentem" jest ten wyjściowy, i ten potomny mówi parentowi "hej, potrzebuję RDD za okres X", parent stream sprawdza sobie czy aktualnie obejmuje pamięcią dłuższy okres, czy krótszy i w razie potrzeby "zwiększa" okres zapamiętywania.

1 użytkowników online, w tym zalogowanych: 0, gości: 1