Akka Http | Graph with feedback makes the response hang -
i using stream source in akka http route. stream uses graph has feedback loop in it. when try action using wget, hangs. if remove loop , make plain left right stream, response terminates expected. see both requests broadcast-2 in proxy client , there output in wget response, hangs although see expected output.
does know can causing this? actor "service" sends http xml request , transforms httpresponse. ive tried using connection pool , http.singlerequest, neither make difference, stream still hangs.
ive included graph below route.
//graph cycle def create(actor: actorref)(implicit log: loggingadapter, executioncontext: executioncontextexecutor, system: akka.actor.actorsystem, timeout: akka.util.timeout, materializer: akka.stream.materializer) = graphdsl.create() { implicit builder => import graphdsl.implicits._ import akka.pattern.ask val mergein = builder.add(merge[(int, boolean)](2)) val broadcast = builder.add(broadcast[either[int, nodeseq]](2)) // val a3 = builder.add(flow[(int, boolean)].mapasync(1)(x => { (actor ? x).mapto[either[int, nodeseq]] })) val e1 = builder.add(flow[either[int, nodeseq]].map(_.left.getorelse(-1) ).filter(_ > 0).map((_, false))) val e2 = builder.add(flow[either[int, nodeseq]].map(_.right.getorelse(nodeseq.empty))) val = builder.add(flow[int].map((_, true))) ~> mergein ~> a3 ~> broadcast ~> e2 broadcast ~> e1 mergein <~ e1 flowshape(a.in, e2.out) } ... lazy val service = system.actorof(props(new apiservice), "extra-retriever") ... ... // akka http route val importpath = path("import") { { complete { import clientexport._ httpentity(contenttypes.`text/plain(utf-8)`, source.fromfuture(future((1 3))) .mapconcat(toimmutable) .via(create(service)) .filternot(_.isempty).map(x => bytestring(s"${x.tostring}")) ) } }
}
Comments
Post a Comment