java - RabbitMQ Consumer only consumes a portion of the messages sent -


i sending small number of messages rabbitmq, consumer consumes 2 of them. find below code producer & consumer:

@component public class producer implements commandlinerunner {  private final rabbittemplate rabbittemplate; private final consumer receiver; private final configurableapplicationcontext context;  public producer(consumer receiver, rabbittemplate rabbittemplate,         configurableapplicationcontext context) {     this.receiver = receiver;     this.rabbittemplate = rabbittemplate;     this.context = context; }  private list<message> extractphonenumbernumbers(string path){     list<message> messages = new arraylist();     try{         bufferedreader filereader = new bufferedreader(                   new inputstreamreader(new classpathresource(path).getinputstream()));         jsonreader reader = new jsonreader(filereader);         gson gson = new gsonbuilder().create();         messages = new gson().fromjson(reader, new typetoken<list<message>>() {}.gettype());     }catch(exception e){         system.out.println(e.getmessage());     }     return messages; }  public void run(string... args) throws exception {     list<message> messagestopublish = extractphonenumbernumbers("phonenumbers");     atomicinteger messageid = new atomicinteger();     for(message message : messagestopublish){         system.out.println("sent " + message.tostring());         rabbittemplate.convertandsend(app.queuename, new message(messageid.incrementandget(), message.getphonenumber()));     }     receiver.getlatch().await(10000, timeunit.milliseconds);      context.close(); }  }   @component public class consumer { private countdownlatch latch = new countdownlatch(1); private hashmap<string, integer> phonecountrycode = new hashmap<string, integer>(); private int = 0;  public void receivejsonmessages(message message){     system.out.println("received " + message.tostring());     phonecountrycode.put(message.getcountrycode(), phonecountrycode.getordefault(message.getcountrycode(), 0) + 1);      for(map.entry<string, integer> entry : phonecountrycode.entryset()){         system.out.println(entry.getkey() + " " + entry.getvalue());     }      latch.countdown(); }  public countdownlatch getlatch() {     return latch; } } 

received 5 - +40722579707 - ro sent 0 - +447984627687 - gb ro 1 received 6 - +40722579717 - ro sent 0 - +447984627657 - gb ro 2 sent 0 - +447984627667 - gb sent 0 - +447984627677 - gb sent 0 - +40722579707 - ro sent 0 - +40722579717 - ro sent 0 - +40722579727 - ro 

goal group , count phone numbers country. i'm playing around rabbitmq @ moment, having no experience it, grouping ... silly now.

the question though, why there 2 messages received, rather 7?

edit: believe due fact beans stop once workers (the producer) finished. how can configure stop after consumers finished?

@springbootapplication public class app { final static string queuename = "rtap-queue";  @bean queue queue() {     return new queue(queuename, false); }  @bean topicexchange exchange() {     return new topicexchange("rtap-exchange"); }  @bean binding binding(queue queue, topicexchange exchange) {     return bindingbuilder.bind(queue).to(exchange).with(queuename); }  @bean simplemessagelistenercontainer container(connectionfactory connectionfactory,         messagelisteneradapter listeneradapter) {     simplemessagelistenercontainer container = new simplemessagelistenercontainer();     container.setconnectionfactory(connectionfactory);     container.setqueuenames(queuename);     container.setmessagelistener(listeneradapter);     container.setconcurrentconsumers(10);     container.setacknowledgemode(acknowledgemode.manual);     return container; }  @bean messagelisteneradapter listeneradapter(consumer receiver) {     return new messagelisteneradapter(receiver, "receivejsonmessages"); }  public static void main(string[] args) throws interruptedexception {     springapplication.run(app.class, args); } } 

follow sample here, https://www.rabbitmq.com/tutorials/tutorial-one-spring-amqp.html . important piece

@rabbithandler public void receive(string in) {     system.out.println(" [x] received '" + in + "'"); } 

the method runs in loop behind scene , consumes messages when ever available. if want close context above, need know number of messages going consumed , configure countdownlatch number of messages expected. when ever receive message, can count down , await on latch in main thread. however, not guaranty receive messages, there no guaranty may happen such network failure. may configure timeout on latch wait.


Comments

Popular posts from this blog

ubuntu - PHP script to find files of certain extensions in a directory, returns populated array when run in browser, but empty array when run from terminal -

php - How can i create a user dashboard -

javascript - How to detect toggling of the fullscreen-toolbar in jQuery Mobile? -