Tarea no serializable: java. io. NotSerializableException al llamar a la función fuera del cierre solo en clases no objetos


Obtener un comportamiento extraño al llamar a una función fuera de un cierre:

  • cuando la función está en un objeto todo está funcionando
  • cuando la función está en una clase get:

Tarea no serializable: java.io.NotSerializableException: prueba

El problema es que necesito mi código en una clase y no en un objeto. ¿Alguna idea de por qué está pasando esto? Es un objeto Scala serializado (por defecto?)?

Este es un ejemplo de código de trabajo:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

Esto es el ejemplo que no funciona :

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,2,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}
Author: Derlin, 2014-03-23

6 answers

No creo que la otra respuesta sea totalmente correcta. Los RDD son de hecho serializables, así que esto no es lo que está causando que su tarea falle.

Spark es un motor de computación distribuida y su abstracción principal es un conjunto de datos distribuido resiliente (RDD ), que puede verse como una colección distribuida. Básicamente, los elementos de RDD se particionan a través de los nodos del clúster, pero Spark abstrae esto del usuario, permitiendo que el usuario interactúe con el RDD (colección) como si fuera una local.

No para entrar en demasiados detalles, pero cuando se ejecutan diferentes transformaciones en un RDD (map, flatMap, filter y otros), su código de transformación (cierre) es:

  1. serializado en el nodo driver,
  2. enviado a los nodos apropiados en el clúster,
  3. deserializado,
  4. y finalmente ejecutado en los nodos

Por supuesto, puede ejecutar esto localmente (como en su ejemplo), pero todas esas fases (aparte del envío a través de la red) todavía se producen. [Esto le permite detectar cualquier error incluso antes de implementarlo en producción]

Lo que sucede en su segundo caso es que está llamando a un método, definido en la clase testing desde dentro de la función map. Spark ve eso y dado que los métodos no pueden ser serializados por sí mismos, Spark intenta serializar el todo testing class, para que el código siga funcionando cuando se ejecute en otra JVM. Tienes dos posibilidades:

O haces testing de clase serializable, por lo que toda la clase puede ser serializada por Spark:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

O haces que someFunc funcione en lugar de un método (las funciones son objetos en Scala), para que Spark pueda serializarlo:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new Test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

Similar, pero no el mismo problema con la serialización de clases puede ser de su interés y puede leerlo en esta presentación de Spark Summit 2013.

Como nota al margen, puedes reescribir rddList.map(someFunc(_)) a rddList.map(someFunc), son exactamente iguales. Generalmente, el segundo es preferido ya que es menos detallado y más limpio de leer.

EDITAR (2015-03-15): SPARK-5307introducido SerializationDebugger y Spark 1.3.0 es la primera versión en usarlo. Añade la ruta de serialización a una NotSerializableException. Cuando se encuentra una excepción NotSerializableException, el depurador visita el gráfico de objetos para encontrar la ruta hacia el objeto que no se puede serializar y construye información para ayudar al usuario a objeto.

En el caso de OP, esto es lo que se imprime en stdout:

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)
 256
Author: Grega Kešpret,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-03-15 17:19:51

La respuesta de Grega es excelente para explicar por qué el código original no funciona y dos formas de solucionar el problema. Sin embargo, esta solución no es muy flexible; considere el caso en el que su cierre incluye una llamada a un método en una clase no-Serializable sobre la que no tiene control. No puede agregar la etiqueta Serializable a esta clase ni cambiar la implementación subyacente para cambiar el método en una función.

Nilesh presenta una gran solución para esto, pero la solución puede ser hecho más conciso y general:

def genMapper[A, B](f: A => B): A => B = {
  val locker = com.twitter.chill.MeatLocker(f)
  x => locker.get.apply(x)
}

Este serializador de funciones se puede usar para envolver automáticamente cierres y llamadas a métodos:

rdd map genMapper(someFunc)

Esta técnica también tiene el beneficio de no requerir las dependencias adicionales de Shark para acceder KryoSerializationWrapper, ya que el Chill de Twitter ya es atraído por core Spark

 29
Author: Ben Sidhom,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:26:36

Charla completa explicando completamente el problema, que propone un gran cambio de paradigma para evitar estos problemas de serialización: https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md

La respuesta más votada básicamente sugiere desechar una característica completa del lenguaje, que ya no es usar métodos y solo usar funciones. De hecho, en los métodos de programación funcional en las clases debe evitarse, pero convertirlos en funciones no está resolviendo el problema de diseño aquí (ver el enlace anterior).

Como solución rápida en esta situación en particular, podría usar la anotación @transient para decirle que no intente serializar el valor ofensivo (aquí, Spark.ctx es una clase personalizada, no la de Spark después del nombre de OP):

@transient
val rddList = Spark.ctx.parallelize(list)

También puede reestructurar el código para que rddList viva en otro lugar, pero eso también es desagradable.

El Futuro es Probablemente Esporas

En el futuro Scala incluirá estas cosas llamadas "esporas" que deberían permitirnos controlar el grano fino lo que hace y no exactamente es arrastrado por un cierre. Además, esto debería convertir todos los errores de extraer accidentalmente tipos no serializables (o cualquier valor no deseado) en errores de compilación en lugar de ahora, que es horrible excepciones de tiempo de ejecución / fugas de memoria.

Http://docs.scala-lang.org/sips/pending/spores.html

Un consejo sobre la serialización de Kryo

Cuando use kyro, hágalo de manera que el registro es necesario, esto significará que obtendrá errores en lugar de fugas de memoria:

"Finalmente, sé que kryo tiene kryo.setRegistrationOptional (verdadero) pero estoy teniendo un tiempo muy difícil tratando de averiguar cómo usarlo. Cuando esta opción está activada, kryo todavía parece lanzar excepciones si no he registrado clases."

Estrategia para registrar clases con kryo

Por supuesto, esto solo le da control a nivel de tipo, no a nivel de valor control.

... más ideas por venir.

 22
Author: samthebest,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:02:48

Resolví este problema usando un enfoque diferente. Simplemente necesita serializar los objetos antes de pasar por el cierre, y des-serializar después. Este enfoque simplemente funciona, incluso si sus clases no son serializables, porque utiliza Kryo detrás de las escenas. Todo lo que necesitas es curry. ;)

Aquí hay un ejemplo de cómo lo hice:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])
               (foo: Foo) : Bar = {
    kryoWrapper.value.apply(foo)
}
val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _
rdd.flatMap(mapper).collectAsMap()

object Blah(abc: ABC) extends (Foo => Bar) {
    def apply(foo: Foo) : Bar = { //This is the real function }
}

Siéntase libre de hacer Bla tan complicado como desee, clase, objeto compañero, clases anidadas, referencias a múltiples 3rd party libs.

KryoSerializationWrapper se refiere a: https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala

 6
Author: Nilesh,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-04-14 06:54:59

No estoy completamente seguro de que esto se aplique a Scala, pero, en Java, resolví el NotSerializableException refactorizando mi código para que el cierre no accediera a un campo final no serializable.

 6
Author: Trebor Rude,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-10-13 18:14:02

Me enfrenté a un problema similar, y lo que entiendo de La respuesta de Grega es

object NOTworking extends App {
 new testing().doIT
}
//adding extends Serializable wont help
class testing {

val list = List(1,2,3)

val rddList = Spark.ctx.parallelize(list)

def doIT =  {
  //again calling the fucntion someFunc 
  val after = rddList.map(someFunc(_))
  //this will crash (spark lazy)
  after.collect().map(println(_))
}

def someFunc(a:Int) = a+1

}

Su doIT método está tratando de serializar someFunc(_) método, sino como método no son serializables, intenta serializar clase prueba que no es serializable.

Así que haz que tu código funcione, debes definir someFunc dentro del método doIT. Por ejemplo:

def doIT =  {
 def someFunc(a:Int) = a+1
  //function definition
 }
 val after = rddList.map(someFunc(_))
 after.collect().map(println(_))
}

Y si hay múltiples funciones que entran en la imagen, entonces todos esas funciones deben estar disponibles para el contexto padre.

 4
Author: Tarang Bhalodia,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-05-23 12:10:44