java - How to reorder Stream of CompletableFutures? -


i deal streams of completablefutures. these take different times complete. taking longer block stream processing while others might have completed (and know parallel streams)

therefore reorder items in stream (e.g. buffer) move completed futures ahead.

for example, code blocks stream processing if 1 getuser call takes long

public static boolean isvalid(user user) { ... }  emails.stream()    // not using ::    // getuser() returns completablefuture<user>   .map( e -> getuser(e))   // line blocks stream processing   .filter( userf -> isvalid( userf.get()) )   .map( f -> f.thenapply(user::getname)) 

and have like

emails.stream()    .map( e -> getuser(e))    // moves futures bounded buffer    // , puts finished first    // completionservice [1]    // , returns stream again    .collect(futurereorderer.collector())    // not same stream    // 1 created futurereorderer.collector()    .filter( userf -> isvalid( userf.get()) )    .map( f -> f.thenapply(user::getname)) 

[1] example completionservice https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/executorcompletionservice.html returns completed tasks when calling take() , blocks otherwise. completionservice not take futures, 1 need cs.sumbit( () -> f.get() ) ?

how that?

[edit]

  1. changed example include filter()
  2. added comment
  3. added completionservice link

having more context in tailoring answer - have feeling problem somewhere else , can solved in easier way.

but if question how somehow keep completed futures @ beginning, there few options:


sorting stream using custom comparator:

.sorted(comparator.comparing(f -> !f.isdone())) 

keep in mind isdone returns true not when future completes successfully.


storing futures in priorityqueue

priorityqueue<completablefuture<string>> queue  = new priorityqueue<>(comparator.comparing(f -> !f.isdone())); 

when polling elements, queue returning elements according provided ordering.

here in action:

priorityqueue<completablefuture<string>> queue  = new priorityqueue<>(comparator.comparing(f -> !f.isdone()));  queue.add(completablefuture.supplyasync(() -> {     try {         thread.sleep(integer.max_value);     } catch (interruptedexception e) {  }      return "42"; }));  queue.add(completablefuture.completedfuture("completed"));  queue.poll(); // "completed" queue.poll(); // still going on 

it's important remember if want convert priorityqueue stream, can't using stream() - not preserve priority order.

this right way go:

stream.generate(queue::poll).limit(queue.size()) 

Comments

Popular posts from this blog

python - Operations inside variables -

Generic Map Parameter java -

arrays - What causes a java.lang.ArrayIndexOutOfBoundsException and how do I prevent it? -