Czemu Spark scalowy woła o serializację?

0

Cóż on chce zapisywać?
Gdy do map() wcisnę s => s.split(";") to działa bez serializacji. Ale gdy podam do map() argument jako zdefiniowaną wcześniej funkcję def parseLine(line: String) = line.split(";") to woła o serializację:

at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at com.julian.scalastarter.Main$.main(Main.scala:23)
at com.julian.scalastarter.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.julian.scalastarter.Main$

Cała klasa:

package com.julian.scalastarter

import org.apache.spark.{SparkConf, SparkContext}

object Main extends Serializable {

  def parseLine(line: String): Array[String] = line.split(";")

  def main(args: Array[String]): Unit = {

    val pathIn: String = "src/main/resources/CitiesAndValues.txt"
    val pathOut: String = "out-avg"

    val conf = new SparkConf()
      .setAppName("Mean Value")
      .set("spark.master", "local")
      .set("spark.hadoop.validateOutputSpecs", "false") // to overwrite output directory

    val sc = new SparkContext(conf)

    sc.textFile(pathIn)
      .cache()
      .map(parseLine)
      .filter(x => x(1).toInt < 99)
      .map(x => (x(0), (x(1).toDouble, 1)))
      .reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
      .mapValues(x => x._1 / x._2)
      .saveAsTextFile(pathOut)

    sc.stop()
  }
}
1

Serializacja musi nastąpić jeśli przesyłasz obiekt (np funkcję) z jednej JVMki do innej. Czasem ta serializacja się sypnie, a czasem nie.

Tutaj to trochę dziwnie wygląda bo masz jednocześnie:
Caused by: java.io.NotSerializableException: com.julian.scalastarter.Main$
oraz
object Main extends Serializable {

Może coś się skisiło i projekt się nie przekompilował?

Co by się stało gdybyś przeniósł def parseLine do wewnątrz def main?

1 użytkowników online, w tym zalogowanych: 0, gości: 1