java - RabbitMQ Consumer only consumes a portion of the messages sent -
i sending small number of messages rabbitmq, consumer consumes 2 of them. find below code producer & consumer:
@component public class producer implements commandlinerunner { private final rabbittemplate rabbittemplate; private final consumer receiver; private final configurableapplicationcontext context; public producer(consumer receiver, rabbittemplate rabbittemplate, configurableapplicationcontext context) { this.receiver = receiver; this.rabbittemplate = rabbittemplate; this.context = context; } private list<message> extractphonenumbernumbers(string path){ list<message> messages = new arraylist(); try{ bufferedreader filereader = new bufferedreader( new inputstreamreader(new classpathresource(path).getinputstream())); jsonreader reader = new jsonreader(filereader); gson gson = new gsonbuilder().create(); messages = new gson().fromjson(reader, new typetoken<list<message>>() {}.gettype()); }catch(exception e){ system.out.println(e.getmessage()); } return messages; } public void run(string... args) throws exception { list<message> messagestopublish = extractphonenumbernumbers("phonenumbers"); atomicinteger messageid = new atomicinteger(); for(message message : messagestopublish){ system.out.println("sent " + message.tostring()); rabbittemplate.convertandsend(app.queuename, new message(messageid.incrementandget(), message.getphonenumber())); } receiver.getlatch().await(10000, timeunit.milliseconds); context.close(); } } @component public class consumer { private countdownlatch latch = new countdownlatch(1); private hashmap<string, integer> phonecountrycode = new hashmap<string, integer>(); private int = 0; public void receivejsonmessages(message message){ system.out.println("received " + message.tostring()); phonecountrycode.put(message.getcountrycode(), phonecountrycode.getordefault(message.getcountrycode(), 0) + 1); for(map.entry<string, integer> entry : phonecountrycode.entryset()){ system.out.println(entry.getkey() + " " + entry.getvalue()); } latch.countdown(); } public countdownlatch getlatch() { return latch; } }
received 5 - +40722579707 - ro sent 0 - +447984627687 - gb ro 1 received 6 - +40722579717 - ro sent 0 - +447984627657 - gb ro 2 sent 0 - +447984627667 - gb sent 0 - +447984627677 - gb sent 0 - +40722579707 - ro sent 0 - +40722579717 - ro sent 0 - +40722579727 - ro
goal group , count phone numbers country. i'm playing around rabbitmq @ moment, having no experience it, grouping ... silly now.
the question though, why there 2 messages received, rather 7?
edit: believe due fact beans stop once workers (the producer) finished. how can configure stop after consumers finished?
@springbootapplication public class app { final static string queuename = "rtap-queue"; @bean queue queue() { return new queue(queuename, false); } @bean topicexchange exchange() { return new topicexchange("rtap-exchange"); } @bean binding binding(queue queue, topicexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(queuename); } @bean simplemessagelistenercontainer container(connectionfactory connectionfactory, messagelisteneradapter listeneradapter) { simplemessagelistenercontainer container = new simplemessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.setqueuenames(queuename); container.setmessagelistener(listeneradapter); container.setconcurrentconsumers(10); container.setacknowledgemode(acknowledgemode.manual); return container; } @bean messagelisteneradapter listeneradapter(consumer receiver) { return new messagelisteneradapter(receiver, "receivejsonmessages"); } public static void main(string[] args) throws interruptedexception { springapplication.run(app.class, args); } }
follow sample here, https://www.rabbitmq.com/tutorials/tutorial-one-spring-amqp.html . important piece
@rabbithandler public void receive(string in) { system.out.println(" [x] received '" + in + "'"); }
the method runs in loop behind scene , consumes messages when ever available. if want close context above, need know number of messages going consumed , configure countdownlatch number of messages expected. when ever receive message, can count down , await on latch in main thread. however, not guaranty receive messages, there no guaranty may happen such network failure. may configure timeout on latch wait.
Comments
Post a Comment