scala - Spark takes 0.5 second to average 100 numbers -
i've dataset of 70 millions rows in csv of users locations , date times, , wrote following code average number of points of top 100 users:
val spark = org.apache.spark.sql.sparksession.builder .appname("test") .getorcreate import spark.implicits._ val watch = new stopwatch() watch.start() val schema = new structtype().add("user_id", stringtype).add("datetime", longtype) val df = spark.read.format("csv").option("header", "true").schema(schema).csv(inputfile) df.createorreplacetempview("paths") val pathds = spark.sql("select user_id, min(datetime) started, max(datetime) finished, " + "count(*) total, max(datetime) - min(datetime) timedelta " + "from paths group user_id order total desc limit 100") pathds.cache() pathds.collect.foreach(println) println(watch.elapsedtime(timeunit.milliseconds)) val avgpoints = pathds.select(avg("total")).as[double].head() println(avgpoints) println(watch.stop())
what happens here take millions/billions of records (which might take terabytes), , aggregate them 100 records of 5 columns. question not how long part takes or how can speed up, rather happens when work resulting 100 records.
there's simpler way straight via sql, need pathds more processing later on. code works fine, noticed pathds.select(avg("total")).as[double].head()
starts doing lot of work , ends taking around half second, though pathds contains 100 rows.
do know why it's taking long , how can speed up, while operating on small data set 100 rows? .cache , .collect on bring 100 records locally before further aggregation (and i'm running locally right anyway).
i'm using spark 2.2 on scala 2.11, locally.
spark optimizes large data sets. means there overheads negligible large data sets not negligible small ones.
consider happens when run calculate avgpoints:
- spark calculates "transformation", i.e. defines calculation needs done (this portion of select, avg etc.).
- you call "head" action causes spark take expression tree made , turn physical plan. includes optimizations comparing several possible solutions. note expression includes expression of calculating cached portion. in practice, steps skipped (you can see in spark ui) still considered spark might decide in cases recalculate of cached data (almost not in case).
- spark compiles physical plan code using whole stage code generation, serializes code , sends relevant executors.
- when spark created plan partitioned data (probably 200 partitions default groupby). means have 200 tasks divided between executors. partitions have 0 or 1 elements task immediate spark has start 200 tasks.
- spark sends result each of 200 tasks buffer , sent single executor final aggregation. final aggregation task doesn't begin until tasks finished , sent data.
- once final aggregation finishes, result send driver.
as can see there many stages here including network transmission , starting/ending tasks (which need managed). overall overhead here can half second no real data.
if change limit 1000 instead, see little change in overall time though processing 10 times data.
it common use case use spark reduce size of problem, i.e. have lots of data, aggregation , smaller number of elements (100 in case), collect them driver , act directly on them instead of using spark avoid overheads (e.g. in case save result of collect , instead of doing foreach println, sum them up).
one thing can used coalesce(1) when calculate pathds. mean have 1 partition (the joining of part of first stage). not different using collect result except if want change limit larger size coalescing small not 1 value can useful (e.g. might limit of 10000 , coalesce 4 still parallelism).
update
based on comments result of limit 1 partition coalesce won't (it means there no real reason not collect unless want use dataframe functions on result). process described above still correct except 1 partition used instead of multiple ones.
Comments
Post a Comment