Grouping of elements in Scala / Akka Streams -
suppose have source of different fruits, , want insert count database.
i can this:
flow[fruits] .map { item => insertitemtodatabase(item) } but slow – why insert database every item, when can group them up? came better solution:
flow[fruits] .grouped(10000) .map { items => insertitemstodatabase(items) } but means have hold 10 000 elements [banana, orange, orange, orange, banana, ...] in memory until flushed database. isn't inefficient? perhaps can this:
flow[fruits] .grouped(100) .map { items => consolidate(items) // return map[string, int] } .grouped(100) // here have seq[map[string, int]] .map { mapofitems=> insertmaptodatabase(mapofitems) } from understanding, should process 10 000 elements @ once, shouldn't take memory (providing elements repeated often). each key still repeated 100 times in memory. sure can .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... isn't there better way? perhaps this:
flow[fruits] .map { item => addtomap(item) if(mymap.length == 10000) { inserttodatabase(mymap) clearmymap() } } but doesn't break concept of akka streams, namely independency (and therefore concurrency) of processing stages?
if cardinality of fruit set low can keep singular map of counts , flush database after streaming through of fruit values.
first, construct flow keep running count:
type count = int type fruitcount = map[fruit, count] val zerocount : fruitcount = map.empty[fruit, count] withdefaultvalue 0 val appendfruittocount : (fruitcount, fruit) => fruitcount = (fruitcount, fruit) => fruitcount + (fruit -> fruitcount(fruit) + 1) val fruitcountflow : flow[fruit, fruitcount, notused] = flow[fruit].scan(zerocount)(appendfruittocount) now create sink receive last fruitcount , materialize stream:
val lastfruitcountsink : sink[fruitcount, _] = sink.lastoption[fruitcount] val fruitsource : source[fruit, notused] = ??? val lastfruitcountfut : future[option[fruitcount]] = fruitsource .via(fruitcountflow) .to(lastfruitcountsink) .run() the lastfruitcountfut can used send values database:
lastfruitcountfut foreach (_ foreach (_ foreach { (fruit, count) => insertitemstodatabase( iterator.fill(count)(fruit) ) })) an iterator used because memory efficient collection constructing traversableonce of fruit items.
this solution keep 1 map in memory have 1 key each distinct fruit type & 1 integer each key.
Comments
Post a Comment