Scala Future[T] overhead heavy? -
i wrote merge sort test scala future[t] type async calculation performance.
i have 4 cores cpu expected async computation approximately 4 times faster sync computation because use full cpu capability (stall time should small since size of sub-task same). result shows async merge sort slower normal merge sort.
is wrote concurrent badly or because future[t] overhead? me explain this?
package kai.concurrent import scala.concurrent.duration.duration import scala.concurrent.{await, future} import scala.concurrent.executioncontext.implicits.global import scala.util.random object mergesort { lazy val regressthreadhold = 10000 def mergesortedlist[t](a: seq[t], b: seq[t])(implicit ord: ordering[t]): seq[t] = { def loop(a: seq[t], b: seq[t], acc: seq[t]): seq[t] = { if (a.isempty && b.isempty) acc else if (a.isempty) b.reverse ++: acc else if (b.isempty) a.reverse ++: acc else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) else loop(a, b.tail, b.head +: acc) } loop(a, b, seq()).reverse } def mergesortasync0[t](x: seq[t])(implicit ord: ordering[t]): future[seq[t]] = if (x.size <= regressthreadhold) future(mergesort(x)) else { val (left, right) = x.splitat(x.size / 2) val seq(leftsorted, rightsorted) = seq(left, right).map(seq => future(mergesortasync0(seq)).flatten) leftsorted.zip(rightsorted).map(pair => mergesortedlist(pair._1, pair._2)) } def mergesortasync[t](x: seq[t])(implicit ord: ordering[t]): seq[t] = await.result(mergesortasync0(x), duration.inf) def mergesort[t](x: seq[t])(implicit ord: ordering[t]): seq[t] = if (x.size <= 1) x else { val (left, right) = x.splitat(x.size / 2) val (leftsorted, rightsorted) = (mergesort(left), mergesort(right)) mergesortedlist(leftsorted, rightsorted) } } object mergesorttest extends app { import kai.util.profileutil.timeresult val seq: vector[double] = (1 1000000).map(i => random.nextdouble()).tovector val seqmergesortasync = mergesort.mergesortasync(seq) withwalltimeprinted "mergesortasync" val seqmergesort = mergesort.mergesort(seq) withwalltimeprinted "mergesort" val seqsort = seq.sorted withwalltimeprinted "sorted" println(seqsort == seqmergesort && seqmergesort == seqmergesortasync) }
output:
mergesortasync elapsed time: 3186 ms mergesort elapsed time: 3300 ms sorted elapsed time: 581 ms true
i've copied test , ran via jmh (using sbt-jmh). used predefined scala.concurrent.executioncontext.implicits.global
underlying execution context in test.
results:
[info] benchmark mode cnt score error units [info] mergesorttest.benchmergesortasync avgt 25 1.534 +–’ 0.212 s/op [info] mergesorttest.benchmergesortsync avgt 25 2.325 +–’ 0.437 s/op [info] mergesorttest.benchscalasort avgt 25 0.382 +–’ 0.006 s/op
you can see here running parallel version x1.5 faster sequential version, while scala sort x6 times faster sequential merge sort.
one needs remember when doing micro benchmarks such these, there many factors take account. best let jmh handle subtleties jvm runtime has you.
plugins.sbt:
addsbtplugin("pl.project13.scala" % "sbt-jmh" % "0.2.27")
build.sbt:
enableplugins(jmhplugin)
test code:
import java.util.concurrent.timeunit import org.openjdk.jmh.annotations._ import scala.concurrent.duration.duration import scala.concurrent.{await, future} import scala.util.random import scala.concurrent.executioncontext.implicits.global /** * created yuval.itzchakov on 21/08/2017. */ @state(scope.thread) @warmup(iterations = 3, time = 1) @measurement(iterations = 5, timeunit = timeunit.milliseconds) @benchmarkmode(array(mode.averagetime)) @fork(5) class mergesorttest { var seq: seq[double] = _ @setup def setup(): unit = { seq = (1 1000000).map(i => random.nextdouble()).tovector } lazy val regressthreadhold = 10000 def mergesortedlist[t](a: seq[t], b: seq[t])(implicit ord: ordering[t]): seq[t] = { def loop(a: seq[t], b: seq[t], acc: seq[t]): seq[t] = { if (a.isempty && b.isempty) acc else if (a.isempty) b.reverse ++: acc else if (b.isempty) a.reverse ++: acc else if (ord.lt(a.head, b.head)) loop(a.tail, b, a.head +: acc) else loop(a, b.tail, b.head +: acc) } loop(a, b, seq()).reverse } def mergesortasync0[t](x: seq[t])(implicit ord: ordering[t]): future[seq[t]] = if (x.size <= regressthreadhold) future(mergesort(x)) else { val (left, right) = x.splitat(x.size / 2) val seq(leftsorted, rightsorted) = seq(left, right).map(seq => future(mergesortasync0(seq)).flatten) leftsorted.zip(rightsorted).map(pair => mergesortedlist(pair._1, pair._2)) } def mergesortasync[t](x: seq[t])(implicit ord: ordering[t]): seq[t] = await.result(mergesortasync0(x), duration.inf) def mergesort[t](x: seq[t])(implicit ord: ordering[t]): seq[t] = if (x.size <= 1) x else { val (left, right) = x.splitat(x.size / 2) val (leftsorted, rightsorted) = (mergesort(left), mergesort(right)) mergesortedlist(leftsorted, rightsorted) } @benchmark def benchmergesortsync(): unit = { mergesort(seq) } @benchmark def benchmergesortasync(): unit = { mergesortasync(seq) } @benchmark def benchscalasort(): unit = { seq.sorted } }
Comments
Post a Comment