apache spark - How to ensure that each partition of an RDD has some data -


i have rdd of 36 elements. have cluster of 3 nodes 4 cores each. have repartitioned rdd in 36 parts each partition might have element process entire 36 elements partitioned such 4 parts has 9 elements each , rest of parts empty , hence have nothing process , server resources underutilized.

how can repartition data ensure every part has data process? how can ensure every part has 3 element process?

by definition, repartition(numpartitions) reshuffles data in rdd randomly create either more or fewer partitions , balance across them, shuffles data on network.

the guarantee apache spark give is distributed evenly, won't yields same number of elements per partition. (also size of dataset small !)

you might consider using hashpartitioner :

scala> val rdd = sc.parallelize(for { x <- 1 36 } yield (x, none), 8)  rdd: org.apache.spark.rdd.rdd[(int, none.type)] = parallelcollectionrdd[31] @ parallelize @ <console>:27  scala> import org.apache.spark.rdd.rdd import org.apache.spark.rdd.rdd  scala> import org.apache.spark.hashpartitioner import org.apache.spark.hashpartitioner  scala> def countbypartition(rdd: rdd[(int, none.type)]) = rdd.mappartitions(iter => iterator(iter.length)) countbypartition: (rdd: org.apache.spark.rdd.rdd[(int, none.type)])org.apache.spark.rdd.rdd[int]  scala> countbypartition(rdd).collect res25: array[int] = array(4, 5, 4, 5, 4, 5, 4, 5)  scala> countbypartition(rdd.partitionby(new hashpartitioner(12))).collect res26: array[int] = array(3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3) 

i have borrowed example , helper zero323's answer how hashpartitioner work?

i hope helps !

edit:

if have done following :

scala> val rdd = sc.parallelize(for { x <- 1 36 } yield (x, none), 12)  rdd: org.apache.spark.rdd.rdd[(int, none.type)] = parallelcollectionrdd[36] @ parallelize @ <console>:29  scala> countbypartition(rdd).collect res28: array[int] = array(4, 5, 4, 5, 4, 5, 4, 5) 

results wouldn't same.


Comments

Popular posts from this blog

python - Operations inside variables -

Generic Map Parameter java -

arrays - What causes a java.lang.ArrayIndexOutOfBoundsException and how do I prevent it? -