java.nio.BufferUnderflowException while reading from kafka -


i running bufferunderflowexception error while consuming events kafka. able read 100k messages , after hit error.

i have come across few stackoverflow posts can happen when server version , consumer version not match per link. but, consumer , server versions match.

i appreciate thoughts on how debug this.

org.apache.kafka.common.protocol.types.schemaexception: error reading field 'responses': error reading field 'topic': java.nio.bufferunderflowexception     @ org.apache.kafka.common.protocol.types.schema.read(schema.java:71)     @ org.apache.kafka.clients.networkclient.handlecompletedreceives(networkclient.java:464)     @ org.apache.kafka.clients.networkclient.poll(networkclient.java:279)     @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.clientpoll(consumernetworkclient.java:303)     @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient.java:197)     @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient.java:187)     @ org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:877)     @ org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:829)     @ com.howtoprogram.kafka.consumertest.main(consumertest.java:58)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) 

kafka server version

kafka_2.10-0.9.0.0 

kafka consumer version

<dependency>     <groupid>org.apache.kafka</groupid>     <artifactid>kafka_2.10</artifactid>     <version>0.9.0.0</version> </dependency> 

consumer jar

kafka_2.10-0.9.0.0.jar 

code

import java.io.printstream; import java.util.arrays; import java.util.map; import java.util.properties; import java.util.concurrent.atomic.atomiclong;  import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer;  public class consumertest {   public static atomiclong totalmessages = new atomiclong(0);     static string bootstrapservers = "10.0.1.1:9092";   static string consumergroup = consumertest.class.getsimplename();   static string topic = "events.topic";     public static void main(string[] args) {     properties props = new properties();     props.put("bootstrap.servers", bootstrapservers);     props.put("group.id", consumergroup);     props.put("enable.auto.commit", "true");     props.put("auto.commit.interval.ms", "1000");     props.put("auto.offset.reset", "earliest");     props.put("session.timeout.ms", "30000");     props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");     props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer");       system.out.println("init kafka consumer kafka - 0.9.0.0" );     try {         kafkaconsumer<string, string> kafkaconsumer = new kafkaconsumer<>(props);          kafkaconsumer.subscribe(arrays.aslist(topic));           system.out.println("subscribed kafka topic " + topic );          while (true) {           consumerrecords<string, string> records = kafkaconsumer.poll(1000);           totalmessages.addandget(records.count());           system.out.println("totalmessages = " + totalmessages.get());           (consumerrecord<string, string> record : records) {           }     }}     catch (exception e){         system.out.println("something went wrong");         e.printstacktrace(new printstream(system.out));     }    }  } 

not sure if matters but, producer spark job version 0.8.2 ..so, bit older.

<dependency>             <groupid>org.apache.spark</groupid>             <artifactid>spark-streaming-kafka-0-8_2.11</artifactid>             <version>2.0.1</version> </dependency> 

-----update------

i able reproduce producer wrote. producer based on code using kafka_2.10-0.9.0.0.jar (same consumer)

import java.io.printstream; import java.util.properties; import java.util.concurrent.atomic.atomiclong;  import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producerrecord;  import com.google.gson.gson; import com.google.gson.gsonbuilder;  public class producertest {      static string topic = "events.topic";     static string bootstrapservers = "10.0.1.1:9092";      static atomiclong counter = new atomiclong();    public static void main(string[] args) {     properties props = new properties();     props.put("bootstrap.servers", bootstrapservers);     props.put("acks", "all");     props.put("retries", 0);     props.put("batch.size", 16384);     props.put("linger.ms", 1);     props.put("buffer.memory", 33554432);     props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");     props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");     producer<string, string> producer = null;      dummyobject event = new dummyobject();     event.id = "somerandompsid";     event.country_cd = "us";     event.cont_id = "programid";     event.language = "en";     event.category = "category";     event.subcategory = "sub category";     event.source_id  = "12234";     gson gson = new gsonbuilder().create();     string json = gson.tojson(event);      try {       producer = new kafkaproducer<>(props);       system.out.println("kafka producer " + bootstrapservers + ",topic= " + topic);       (int = 0; < 1000000; i++) {         producer.send(new producerrecord<string, string>(topic, json));         counter.addandget(1); //        system.out.println("sent:" + json);       }       system.out.println("sent messages " + counter.get());     } catch (exception e) {       e.printstacktrace();         e.printstacktrace(new printstream(system.out));     } {       producer.close();     }    }  } 

-------another update-------

downgraded consumer use - kafka_2.11-0.8.2.1.jar , changed o use kafka streams . although didn't hit above bufferunderflow exception, consumer gets stuck @ times , not read messages on kafka. don't see exception either. below code,

import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import java.util.concurrent.atomic.atomiclong;  import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.message.messageandmetadata;  public class kafkaconsumerthread extends thread {     private final consumerconnector consumer;     private final string topic;     atomiclong counter = new atomiclong(0);      public kafkaconsumerthread(string topic) {         consumer = kafka.consumer.consumer.createjavaconsumerconnector(createconsumerconfig());         this.topic = topic;     }      private static consumerconfig createconsumerconfig() {         properties props = new properties();         props.put("zookeeper.connect", kafkaproperties.zkconnect);         props.put("group.id", kafkaproperties.groupid);         props.put("zookeeper.session.timeout.ms", "4000");         props.put("zookeeper.sync.time.ms", "200");         props.put("auto.commit.interval.ms", "1000");          system.out.println("listening zookeper " + kafkaproperties.zkconnect);          return new consumerconfig(props);      }      public void run() {         system.out.println("listening topic " + topic);         try {             map<string, integer> topiccountmap = new hashmap<string, integer>();             topiccountmap.put(topic, new integer(1));             map<string, list<kafkastream<byte[], byte[]>>> consumermap = consumer.createmessagestreams(topiccountmap);             kafkastream<byte[], byte[]> stream = consumermap.get(topic).get(0);             system.out.println("got stream");             consumeriterator<byte[], byte[]> = stream.iterator();             system.out.println("iterating stream");             while (it.hasnext()) {                 messageandmetadata<byte[], byte[]> next = it.next();                 string msg = new string(next.message());                 // system.out.println(msg);                 counter.incrementandget();                 system.out.println("consumed = " + counter.get());             }         } catch (exception e) {             system.out.println("something went wrong ");             e.printstacktrace(system.out);         }     }      public static void main(string args[]) {         try {             kafkaconsumerthread consumerthread = new kafkaconsumerthread(kafkaproperties.topic);             consumerthread.start();         } catch (exception e) {             system.out.println("something went wrong main loop ");             e.printstacktrace(system.out);         }         (;;) {          }     } } 


Comments

Popular posts from this blog

ubuntu - PHP script to find files of certain extensions in a directory, returns populated array when run in browser, but empty array when run from terminal -

php - How can i create a user dashboard -

javascript - How to detect toggling of the fullscreen-toolbar in jQuery Mobile? -