java - RxJava Sliding Window -
i have observable emits data , want buffer 3 seconds , there must sliding of 1 second after initial buffer. more buffer(timespan,unit,skip)
skip on timespan.
sample:
observabledata,timestamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2) expectedlist : {5,10,30},{10,30,40,60},{30,40,60,90}
i can achieve creating custom operator. want know there way without relying on custom operator.
i think can solved builtin operators. code below demonstrates 1 of approaches, though things can tricky when used hot or non-lightweight cold sources - encourage use educational/ get-an-idea purposes, , not production use
@test fun slidingwindow() { val events = flowable.just( data(5, 1.0), data(10, 1.5), data(30, 2.8), data(40, 3.2), data(60, 3.8), data(90, 4.2)) .observeon(schedulers.io()) val windows = window(windowsize = 3, slidesize = 1, data = events).tolist().blockingget() assert.assertnotnull(windows) assert.assertfalse(windows.isempty()) } fun window(windowsize: int, slidesize: int, data: flowable<data>): flowable<list<int>> = window( = 0, = windowsize, slidesize = slidesize, data = data) fun window(from: int, to: int, slidesize: int, data: flowable<data>): flowable<list<int>> { val window = data.takewhile { it.time <= }.skipwhile { it.time < }.map { it.data } val tail = data.skipwhile { it.time <= + slidesize } val nonemptywindow = window.tolist().filter { !it.isempty() } val nextwindow = nonemptywindow.flatmappublisher { window(from + slidesize, + slidesize, slidesize, tail).observeon(schedulers.io()) } return nonemptywindow.toflowable().concatwith(nextwindow) } data class data(val data: int, val time: double)
the test above yields
[[5, 10, 30], [10, 30, 40, 60], [30, 40, 60, 90], [40, 60, 90], [90]]
Comments
Post a Comment