Grouping of elements in Scala / Akka Streams -


suppose have source of different fruits, , want insert count database.

i can this:

flow[fruits] .map { item =>     insertitemtodatabase(item) } 

but slow – why insert database every item, when can group them up? came better solution:

flow[fruits] .grouped(10000) .map { items =>     insertitemstodatabase(items) } 

but means have hold 10 000 elements [banana, orange, orange, orange, banana, ...] in memory until flushed database. isn't inefficient? perhaps can this:

flow[fruits] .grouped(100) .map { items =>     consolidate(items)  // return map[string, int] } .grouped(100) // here have seq[map[string, int]] .map { mapofitems=>     insertmaptodatabase(mapofitems) } 

from understanding, should process 10 000 elements @ once, shouldn't take memory (providing elements repeated often). each key still repeated 100 times in memory. sure can .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map()... isn't there better way? perhaps this:

flow[fruits] .map { item =>     addtomap(item)     if(mymap.length == 10000) {         inserttodatabase(mymap)         clearmymap()     } } 

but doesn't break concept of akka streams, namely independency (and therefore concurrency) of processing stages?

if cardinality of fruit set low can keep singular map of counts , flush database after streaming through of fruit values.

first, construct flow keep running count:

type count = int  type fruitcount = map[fruit, count]  val zerocount : fruitcount =    map.empty[fruit, count] withdefaultvalue 0  val appendfruittocount : (fruitcount, fruit) => fruitcount =    (fruitcount, fruit) => fruitcount + (fruit -> fruitcount(fruit) + 1)  val fruitcountflow : flow[fruit, fruitcount, notused] =   flow[fruit].scan(zerocount)(appendfruittocount) 

now create sink receive last fruitcount , materialize stream:

val lastfruitcountsink : sink[fruitcount, _] = sink.lastoption[fruitcount]  val fruitsource : source[fruit, notused] = ???  val lastfruitcountfut : future[option[fruitcount]] =    fruitsource     .via(fruitcountflow)     .to(lastfruitcountsink)     .run() 

the lastfruitcountfut can used send values database:

lastfruitcountfut foreach (_ foreach (_ foreach { (fruit, count) =>   insertitemstodatabase( iterator.fill(count)(fruit) ) })) 

an iterator used because memory efficient collection constructing traversableonce of fruit items.

this solution keep 1 map in memory have 1 key each distinct fruit type & 1 integer each key.


Comments

Popular posts from this blog

python - Operations inside variables -

Generic Map Parameter java -

arrays - What causes a java.lang.ArrayIndexOutOfBoundsException and how do I prevent it? -