Cómo definir la partición de DataFrame?
He empezado a usar Spark SQL y DataFrames en Spark 1.4.0. Quiero definir un particionador personalizado en DataFrames, en Scala, pero no veo cómo hacer esto.
Una de las tablas de datos con las que estoy trabajando contiene una lista de transacciones, por cuenta, silimar al siguiente ejemplo.
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
Al menos inicialmente, la mayoría de los cálculos se realizarán entre las transacciones dentro de una cuenta. Así que me gustaría tener los datos particionados para que todas las transacciones para una cuenta está en la misma partición de Spark.
Pero no veo una manera de definir esto. La clase DataFrame tiene un método llamado ' repartition (Int)', donde puede especificar el número de particiones a crear. Pero no veo ningún método disponible para definir un particionador personalizado para un DataFrame, como se puede especificar para un RDD.
Los datos de origen se almacenan en Parquet. Vi que al escribir un DataFrame a Parquet, puede especificar una columna para particionar, por lo que presumiblemente Podría decirle a Parquet que particione sus datos por la columna "Cuenta". Pero podría haber millones de cuentas, y si estoy entendiendo Parquet correctamente, se crearía un directorio distinto para cada Cuenta, por lo que no suena como una solución razonable.
¿Hay alguna manera de hacer que Spark particione este DataFrame para que todos los datos de una Cuenta estén en la misma partición?
5 answers
Spark > = 2.3.0
SPARK-22614 expone particiones de rango.
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
SPARK-22389 expone particiones de formato externo en la API de origen de datos v2.
Spark > = 1.6.0
En Spark >= 1.6 es posible usar particiones por columna para consultas y almacenamiento en caché. Véase: SPARK-11410 y SPARK-4849 utilizando el método repartition
:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
A diferencia de RDDs
Spark Dataset
(incluyendo Dataset[Row]
a.k.a DataFrame
) no puede utilice particionador personalizado como por ahora. Por lo general, puede abordar eso creando una columna de partición artificial, pero no le dará la misma flexibilidad.
Spark
Una cosa que puede hacer es pre-particionar los datos de entrada antes de crear un DataFrame
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
Dado que DataFrame
la creación a partir de un RDD
requiere solo una fase de mapa simple, se debe preservar el diseño de la partición existente*:
assert(df.rdd.partitions == partitioned.partitions)
De la misma manera que puede reparticionar existente DataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
Así que parece que no es imposible. La pregunta sigue siendo si tiene sentido en absoluto. Voy a argumentar que la mayoría de las veces no lo hace:
-
Reparticionar es un proceso costoso. En un escenario típico, la mayoría de los datos deben ser serializados, mezclados y deserializados. Por otro lado, el número de operaciones que pueden beneficiarse de datos pre-particionados es relativamente pequeño y se limita aún más si la API interna no está diseñada para aprovechar esto propiedad.
- se une en algunos escenarios, pero requeriría un soporte interno, {[76]]}
- window functions llama con el particionador correspondiente. Igual que el anterior, limitado a una definición de ventana única. Sin embargo, ya está particionado internamente, por lo que el pre-particionado puede ser redundante,
- agregaciones simples con
GROUP BY
- es posible reducir la huella de memoria de los búferes temporales**, pero el costo general es mucho mayor. Más o menos equivalente agroupByKey.mapValues(_.reduce)
(comportamiento actual) vsreduceByKey
(pre-partición). Poco probable que sea útil en la práctica. - compresión de datos con
SqlContext.cacheTable
. Dado que parece que está utilizando la codificación de longitud de ejecución, la aplicación deOrderedRDDFunctions.repartitionAndSortWithinPartitions
podría mejorar la relación de compresión.
El rendimiento depende en gran medida de la distribución de las teclas. Si está sesgado, resultará en una utilización de recursos subóptima. En el peor de los casos será imposible terminar el trabajo en absoluto.
- Todo un punto de usar una alta la API declarativa de nivel es aislarse de los detalles de implementación de bajo nivel. Como ya se mencionó por @dwysakowicz y @RomiKuntsman una optimización es un trabajo del Catalyst Optimizer . Es una bestia bastante sofisticada y realmente dudo que pueda mejorar fácilmente sin sumergirse mucho más en sus componentes internos.
Conceptos relacionados
Particionamiento con fuentes JDBC :
Soporte de fuentes de datos JDBC predicates
argumento. Se puede utilizar de la siguiente manera:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
Crea una sola partición JDBC por predicado. Tenga en cuenta que si los conjuntos creados utilizando predicados individuales no son disjuntos, verá duplicados en la tabla resultante.
partitionBy
método en DataFrameWriter
:
Spark DataFrameWriter
proporciona el método partitionBy
que se puede usar para "particionar" los datos al escribir. Separa los datos al escribir utilizando el conjunto proporcionado de columnas
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
Esto habilita predicado empuje hacia abajo en leer para consultas basadas en clave:
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
, Pero no es equivalente a DataFrame.repartition
. En particular, agregaciones como:
val cnts = df1.groupBy($"k").sum()
Todavía requerirá TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
método en DataFrameWriter
(Spark >= 2.0):
bucketBy
tiene aplicaciones similares a partitionBy
pero solo está disponible para tablas (saveAsTable
). La información de bucketing se puede usar para optimizar joins:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
* By partition layout Me refiero solo a un dato distribución. partitioned
RDD ya no tiene un particionador.
** Asumiendo que no hay proyección temprana. Si la agregación cubre solo un pequeño subconjunto de columnas, probablemente no haya ganancia alguna.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2018-04-12 22:42:14
En Spark HiveContext, no el simple SqlContext
, puede usar el HiveQL DISTRIBUTE BY colX...
(asegura que cada uno de los reductores N obtiene rangos no superpuestos de x) & CLUSTER BY colX...
(acceso directo para Distribuir Por y Ordenar Por) por ejemplo;
df.registerTempTable("partitionMe")
hiveCtx.sql("select * from partitionMe DISTRIBUTE BY accountId SORT BY accountId, date")
No estoy seguro de cómo encaja esto con la api DF de Spark. Estas palabras clave no son compatibles con el SQLContext normal (tenga en cuenta que no necesita tener una tienda de meta hive para usar el HiveContext)
EDITAR: Spark 1.6 + ahora tiene esto en el nativo DataFrame API
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-01-21 04:11:43
Utilice el DataFrame devuelto por:
yourDF.orderBy(account)
No hay una forma explícita de usar partitionBy en un DataFrame, solo en un PairRDD, pero cuando ordena un DataFrame, lo usará en su plan lógico y eso le ayudará cuando necesite hacer cálculos en cada Cuenta.
Me topé con el mismo problema exacto, con un dataframe que quiero particionar por cuenta. Asumo que cuando dices " quieres tener los datos particionados para que todas las transacciones de una cuenta estén en la misma partición Spark", la quieres para la escala y el rendimiento, pero tu código no depende de ella (como usar mapPartitions (), etc.), ¿verdad?
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-08-06 08:42:51
Pude hacer esto usando RDD. Pero no se si esta es una solución aceptable para ti.
Una vez que tenga el DF disponible como RDD, puede solicitar repartitionAndSortWithinPartitions
para realizar particiones personalizadas de datos.
He aquí una muestra que utilicé:
class DatePartitioner(partitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
val start_time: Long = key.asInstanceOf[Long]
Objects.hash(Array(start_time)) % partitions
}
override def numPartitions: Int = partitions
}
myRDD
.repartitionAndSortWithinPartitions(new DatePartitioner(24))
.map { v => v._2 }
.toDF()
.write.mode(SaveMode.Overwrite)
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-10-03 22:53:02
Así que para empezar con algún tipo de respuesta:) - No puedes
No soy un experto, pero por lo que entiendo los DataFrames, no son iguales a rdd y DataFrame no tiene tal cosa como Particionador.
Generalmente la idea de DataFrame es proporcionar otro nivel de abstracción que maneje tales problemas por sí mismo. Las consultas en DataFrame se traducen en plan lógico que se traduce a operaciones en RDDs. La partición que sugirió probablemente se aplicará automáticamente o al menos debería serlo.
Si no confía en SparkSQL que proporcionará algún tipo de trabajo óptimo, siempre puede transformar DataFrame a RDD[Row] como se sugiere en los comentarios.
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-09-29 20:26:49