Cómo definir la partición de DataFrame?


He empezado a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacer esto.

Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, silimar al siguiente ejemplo.

Account   Date       Type       Amount
1001    2014-04-01  Purchase    100.00
1001    2014-04-01  Purchase     50.00
1001    2014-04-05  Purchase     70.00
1001    2014-04-01  Payment    -150.00
1002    2014-04-01  Purchase     80.00
1002    2014-04-02  Purchase     22.00
1002    2014-04-04  Payment    -120.00
1002    2014-04-04  Purchase     60.00
1003    2014-04-02  Purchase    210.00
1003    2014-04-03  Purchase     15.00

Al menos inicialmente, la mayoría de los cálculos se realizarán entre las transacciones dentro de una cuenta. Así que me gustaría tener los datos particionados para que todas las transacciones para una cuenta está en la misma partición de Spark.

Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado ' repartition (Int)', donde puede especificar el número de particiones a crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como se puede especificar para un RDD.

Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame a Parquet, puede especificar una columna para particionar, por lo que presumiblemente Podría decirle a Parquet que particione sus datos por la columna "Cuenta". Pero podría haber millones de cuentas, y si estoy entendiendo Parquet correctamente, se crearía un directorio distinto para cada Cuenta, por lo que no suena como una solución razonable.

¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una Cuenta estén en la misma partición?

Author: Community, 2015-06-23

5 answers

Spark > = 2.3.0

SPARK-22614 expone particiones de rango.

val partitionedByRange = df.repartitionByRange(42, $"k")

partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
//    +- LocalRelation [_1#2, _2#3]
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
// 
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]

SPARK-22389 expone particiones de formato externo en la API de origen de datos v2.

Spark > = 1.6.0

En Spark >= 1.6 es posible usar particiones por columna para consultas y almacenamiento en caché. Véase: SPARK-11410 y SPARK-4849 utilizando el método repartition:

val df = Seq(
  ("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")

val partitioned = df.repartition($"k")
partitioned.explain

// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
// 
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
//    +- Scan PhysicalRDD[_1#5,_2#6]

A diferencia de RDDs Spark Dataset (incluyendo Dataset[Row] a.k.a DataFrame) no puede utilice particionador personalizado como por ahora. Por lo general, puede abordar eso creando una columna de partición artificial, pero no le dará la misma flexibilidad.

Spark

Una cosa que puede hacer es pre-particionar los datos de entrada antes de crear un DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner

val schema = StructType(Seq(
  StructField("x", StringType, false),
  StructField("y", LongType, false),
  StructField("z", DoubleType, false)
))

val rdd = sc.parallelize(Seq(
  Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
  Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))

val partitioner = new HashPartitioner(5) 

val partitioned = rdd.map(r => (r.getString(0), r))
  .partitionBy(partitioner)
  .values

val df = sqlContext.createDataFrame(partitioned, schema)

Dado que DataFrame la creación a partir de un RDD requiere solo una fase de mapa simple, se debe preservar el diseño de la partición existente*:

assert(df.rdd.partitions == partitioned.partitions)

De la misma manera que puede reparticionar existente DataFrame:

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)

Así que parece que no es imposible. La pregunta sigue siendo si tiene sentido en absoluto. Voy a argumentar que la mayoría de las veces no lo hace:

  1. Reparticionar es un proceso costoso. En un escenario típico, la mayoría de los datos deben ser serializados, mezclados y deserializados. Por otro lado, el número de operaciones que pueden beneficiarse de datos pre-particionados es relativamente pequeño y se limita aún más si la API interna no está diseñada para aprovechar esto propiedad.

    • se une en algunos escenarios, pero requeriría un soporte interno, {[76]]}
    • window functions llama con el particionador correspondiente. Igual que el anterior, limitado a una definición de ventana única. Sin embargo, ya está particionado internamente, por lo que el pre-particionado puede ser redundante,
    • agregaciones simples con GROUP BY - es posible reducir la huella de memoria de los búferes temporales**, pero el costo general es mucho mayor. Más o menos equivalente a groupByKey.mapValues(_.reduce) (comportamiento actual) vs reduceByKey (pre-partición). Poco probable que sea útil en la práctica.
    • compresión de datos con SqlContext.cacheTable. Dado que parece que está utilizando la codificación de longitud de ejecución, la aplicación de OrderedRDDFunctions.repartitionAndSortWithinPartitions podría mejorar la relación de compresión.
  2. El rendimiento depende en gran medida de la distribución de las teclas. Si está sesgado, resultará en una utilización de recursos subóptima. En el peor de los casos será imposible terminar el trabajo en absoluto.

  3. Todo un punto de usar una alta la API declarativa de nivel es aislarse de los detalles de implementación de bajo nivel. Como ya se mencionó por @dwysakowicz y @RomiKuntsman una optimización es un trabajo del Catalyst Optimizer . Es una bestia bastante sofisticada y realmente dudo que pueda mejorar fácilmente sin sumergirse mucho más en sus componentes internos.

Conceptos relacionados

Particionamiento con fuentes JDBC :

Soporte de fuentes de datos JDBC predicates argumento. Se puede utilizar de la siguiente manera:

sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)

Crea una sola partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados utilizando predicados individuales no son disjuntos, verá duplicados en la tabla resultante.

partitionBy método en DataFrameWriter:

Spark DataFrameWriter proporciona el método partitionBy que se puede usar para "particionar" los datos al escribir. Separa los datos al escribir utilizando el conjunto proporcionado de columnas

val df = Seq(
  ("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")

df.write.partitionBy("k").json("/tmp/foo.json")

Esto habilita predicado empuje hacia abajo en leer para consultas basadas en clave:

val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")

, Pero no es equivalente a DataFrame.repartition. En particular, agregaciones como:

val cnts = df1.groupBy($"k").sum()

Todavía requerirá TungstenExchange:

cnts.explain

// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
//    +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
//       +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json

bucketBy método en DataFrameWriter (Spark >= 2.0):

bucketBy tiene aplicaciones similares a partitionBy pero solo está disponible para tablas (saveAsTable). La información de bucketing se puede usar para optimizar joins:

// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")

// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
//    :- *Sort [k#41 ASC NULLS FIRST], false, 0
//    :  +- *Project [k#41, v#42]
//    :     +- *Filter isnotnull(k#41)
//    :        +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
//    +- *Sort [k#46 ASC NULLS FIRST], false, 0
//       +- *Project [k#46, v2#47]
//          +- *Filter isnotnull(k#46)
//             +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>

* By partition layout Me refiero solo a un dato distribución. partitioned RDD ya no tiene un particionador. ** Asumiendo que no hay proyección temprana. Si la agregación cubre solo un pequeño subconjunto de columnas, probablemente no haya ganancia alguna.

 144
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-12 22:42:14

En Spark HiveContext, no el simple SqlContext, puede usar el HiveQL DISTRIBUTE BY colX... (asegura que cada uno de los reductores N obtiene rangos no superpuestos de x) & CLUSTER BY colX... (acceso directo para Distribuir Por y Ordenar Por) por ejemplo;

df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")

No estoy seguro de cómo encaja esto con la api DF de Spark. Estas palabras clave no son compatibles con el SQLContext normal (tenga en cuenta que no necesita tener una tienda de meta hive para usar el HiveContext)

EDITAR: Spark 1.6 + ahora tiene esto en el nativo DataFrame API

 8
Author: NightWolf,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-21 04:11:43

Utilice el DataFrame devuelto por:

yourDF.orderBy(account)

No hay una forma explícita de usar partitionBy en un DataFrame, solo en un PairRDD, pero cuando ordena un DataFrame, lo usará en su plan lógico y eso le ayudará cuando necesite hacer cálculos en cada Cuenta.

Me topé con el mismo problema exacto, con un dataframe que quiero particionar por cuenta. Asumo que cuando dices " quieres tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark", la quieres para la escala y el rendimiento, pero tu código no depende de ella (como usar mapPartitions (), etc.), ¿verdad?

 6
Author: Romi Kuntsman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-06 08:42:51

Pude hacer esto usando RDD. Pero no se si esta es una solución aceptable para ti. Una vez que tenga el DF disponible como RDD, puede solicitar repartitionAndSortWithinPartitions para realizar particiones personalizadas de datos.

He aquí una muestra que utilicé:

class DatePartitioner(partitions: Int) extends Partitioner {

  override def getPartition(key: Any): Int = {
    val start_time: Long = key.asInstanceOf[Long]
    Objects.hash(Array(start_time)) % partitions
  }

  override def numPartitions: Int = partitions
}

myRDD
  .repartitionAndSortWithinPartitions(new DatePartitioner(24))
  .map { v => v._2 }
  .toDF()
  .write.mode(SaveMode.Overwrite)
 4
Author: Developer,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-03 22:53:02

Así que para empezar con algún tipo de respuesta:) - No puedes

No soy un experto, pero por lo que entiendo los DataFrames, no son iguales a rdd y DataFrame no tiene tal cosa como Particionador.

Generalmente la idea de DataFrame es proporcionar otro nivel de abstracción que maneje tales problemas por sí mismo. Las consultas en DataFrame se traducen en plan lógico que se traduce a operaciones en RDDs. La partición que sugirió probablemente se aplicará automáticamente o al menos debería serlo.

Si no confía en SparkSQL que proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD[Row] como se sugiere en los comentarios.

 2
Author: Dawid Wysakowicz,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-29 20:26:49