Cóż on chce zapisywać?
Gdy do map()
wcisnę s => s.split(";")
to działa bez serializacji. Ale gdy podam do map()
argument jako zdefiniowaną wcześniej funkcję def parseLine(line: String) = line.split(";")
to woła o serializację:
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:371)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.map(RDD.scala:370)
at com.julian.scalastarter.Main$.main(Main.scala:23)
at com.julian.scalastarter.Main.main(Main.scala)
Caused by: java.io.NotSerializableException: com.julian.scalastarter.Main$
Cała klasa:
package com.julian.scalastarter
import org.apache.spark.{SparkConf, SparkContext}
object Main extends Serializable {
def parseLine(line: String): Array[String] = line.split(";")
def main(args: Array[String]): Unit = {
val pathIn: String = "src/main/resources/CitiesAndValues.txt"
val pathOut: String = "out-avg"
val conf = new SparkConf()
.setAppName("Mean Value")
.set("spark.master", "local")
.set("spark.hadoop.validateOutputSpecs", "false") // to overwrite output directory
val sc = new SparkContext(conf)
sc.textFile(pathIn)
.cache()
.map(parseLine)
.filter(x => x(1).toInt < 99)
.map(x => (x(0), (x(1).toDouble, 1)))
.reduceByKey((a, b) => (a._1 + b._1, a._2 + b._2))
.mapValues(x => x._1 / x._2)
.saveAsTextFile(pathOut)
sc.stop()
}
}