scala - Akka MergeHub and BroadcastHub via Actor to support multiple clients through websockets -
at moment use websocket with
- 1 client
- 2 sources on server side
the server pushes out keepalive messages , answers requests client.
now spice things bit , add possibility handle n clients.
so had at:
https://github.com/playframework/play-scala-chatroom-example
this n-inlet ~> n-outlet
if of n clients writes through respective websocket, of them notified (including itself).
what need bit more sophisticated server should
- still send keepalive messages all connected clients and
- if 1 of clients asks something/triggers server-side "event", again all clients should notified of this.
so it's 1 step in between in abstract way of thinking.
naive thought maybe done by:
type allowedwsmessage = string val myactor = system.actorof(props{new myactor}, "myactor") val myactorsink = sink.actorrefwithack(myactor, "init", "acknowledged", "completed") import scala.concurrent.duration._ val tickingsource: source[allowedwsmessage, cancellable] = source.tick(initialdelay = 1 second, interval = 10 seconds, tick = notused) .map(_ => "staying alive") val servermessagesource = source .queue[allowedwsmessage](10, overflowstrategy.backpressure) .mapmaterializedvalue { queue => myactor ! inittunnel(queue)} val serversource: source[allowedwsmessage, cancellable] = tickingsource.merge(servermessagesource) private val (clientsink, clientsource) = { // don't log mergehub$producerfailed error if client disconnects. // recoverwithretries -1 "recoverwith" val source = mergehub.source[allowedwsmessage] .log("source") .recoverwithretries(-1, { case _: exception ⇒ source.empty}) val sink: sink[allowedwsmessage, source[allowedwsmessage, notused]] = broadcasthub.sink[allowedwsmessage] source.via(serversource).tomat(sink)(keep.both).run() }
(note source.via(serversource)...
)
but of course it's not easy.
in end want basically:
(client -> websocket ->) mergehub ~> myactor ~> broadcasthub (-> websocket -> client)
now wonder, elegant way of doing this? or mergehub , broadcasthub wrong tools challenge?
you have server source , sink, said work, i'm not digging them.
val fanin = mergehub.source[allowedwsmessage].to(myactorsink).run() val fanout = serversource.tomat(broadcasthub.sink[allowedwsmessage])(keep.right).run() // now, somewhere in (route-)method handle websocket connection flow.fromsinkandsource(fanin, fanout)
easy that, hope knot in head unravels :)
Comments
Post a Comment