scala - Akka MergeHub and BroadcastHub via Actor to support multiple clients through websockets -


at moment use websocket with

  • 1 client
  • 2 sources on server side

the server pushes out keepalive messages , answers requests client.

now spice things bit , add possibility handle n clients.

so had at:

https://github.com/playframework/play-scala-chatroom-example

this n-inlet ~> n-outlet if of n clients writes through respective websocket, of them notified (including itself).

what need bit more sophisticated server should

  1. still send keepalive messages all connected clients and
  2. if 1 of clients asks something/triggers server-side "event", again all clients should notified of this.

so it's 1 step in between in abstract way of thinking.

naive thought maybe done by:

type allowedwsmessage = string  val myactor = system.actorof(props{new myactor}, "myactor") val myactorsink = sink.actorrefwithack(myactor, "init", "acknowledged", "completed") import scala.concurrent.duration._  val tickingsource: source[allowedwsmessage, cancellable] = source.tick(initialdelay = 1 second, interval = 10 seconds, tick = notused)   .map(_ => "staying alive")  val servermessagesource = source .queue[allowedwsmessage](10, overflowstrategy.backpressure) .mapmaterializedvalue { queue => myactor ! inittunnel(queue)}  val serversource: source[allowedwsmessage, cancellable] = tickingsource.merge(servermessagesource)  private val (clientsink, clientsource) = {     // don't log mergehub$producerfailed error if client disconnects.     // recoverwithretries -1 "recoverwith"     val source = mergehub.source[allowedwsmessage]       .log("source")       .recoverwithretries(-1, { case _: exception ⇒ source.empty})      val sink: sink[allowedwsmessage, source[allowedwsmessage, notused]] = broadcasthub.sink[allowedwsmessage]     source.via(serversource).tomat(sink)(keep.both).run()   } 

(note source.via(serversource)...)

but of course it's not easy.

in end want basically:

(client -> websocket ->) mergehub ~> myactor ~> broadcasthub (-> websocket -> client) 

now wonder, elegant way of doing this? or mergehub , broadcasthub wrong tools challenge?

you have server source , sink, said work, i'm not digging them.

val fanin = mergehub.source[allowedwsmessage].to(myactorsink).run() val fanout = serversource.tomat(broadcasthub.sink[allowedwsmessage])(keep.right).run()  // now, somewhere in (route-)method handle websocket connection  flow.fromsinkandsource(fanin, fanout) 

easy that, hope knot in head unravels :)


Comments

Popular posts from this blog

ubuntu - PHP script to find files of certain extensions in a directory, returns populated array when run in browser, but empty array when run from terminal -

php - How can i create a user dashboard -

javascript - How to detect toggling of the fullscreen-toolbar in jQuery Mobile? -