java.nio.BufferUnderflowException while reading from kafka -
i running bufferunderflowexception error while consuming events kafka. able read 100k messages , after hit error.
i have come across few stackoverflow posts can happen when server version , consumer version not match per link. but, consumer , server versions match.
i appreciate thoughts on how debug this.
org.apache.kafka.common.protocol.types.schemaexception: error reading field 'responses': error reading field 'topic': java.nio.bufferunderflowexception @ org.apache.kafka.common.protocol.types.schema.read(schema.java:71) @ org.apache.kafka.clients.networkclient.handlecompletedreceives(networkclient.java:464) @ org.apache.kafka.clients.networkclient.poll(networkclient.java:279) @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.clientpoll(consumernetworkclient.java:303) @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient.java:197) @ org.apache.kafka.clients.consumer.internals.consumernetworkclient.poll(consumernetworkclient.java:187) @ org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:877) @ org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:829) @ com.howtoprogram.kafka.consumertest.main(consumertest.java:58) @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)
kafka server version
kafka_2.10-0.9.0.0
kafka consumer version
<dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2.10</artifactid> <version>0.9.0.0</version> </dependency>
consumer jar
kafka_2.10-0.9.0.0.jar
code
import java.io.printstream; import java.util.arrays; import java.util.map; import java.util.properties; import java.util.concurrent.atomic.atomiclong; import org.apache.kafka.clients.consumer.consumerrecord; import org.apache.kafka.clients.consumer.consumerrecords; import org.apache.kafka.clients.consumer.kafkaconsumer; public class consumertest { public static atomiclong totalmessages = new atomiclong(0); static string bootstrapservers = "10.0.1.1:9092"; static string consumergroup = consumertest.class.getsimplename(); static string topic = "events.topic"; public static void main(string[] args) { properties props = new properties(); props.put("bootstrap.servers", bootstrapservers); props.put("group.id", consumergroup); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.stringdeserializer"); system.out.println("init kafka consumer kafka - 0.9.0.0" ); try { kafkaconsumer<string, string> kafkaconsumer = new kafkaconsumer<>(props); kafkaconsumer.subscribe(arrays.aslist(topic)); system.out.println("subscribed kafka topic " + topic ); while (true) { consumerrecords<string, string> records = kafkaconsumer.poll(1000); totalmessages.addandget(records.count()); system.out.println("totalmessages = " + totalmessages.get()); (consumerrecord<string, string> record : records) { } }} catch (exception e){ system.out.println("something went wrong"); e.printstacktrace(new printstream(system.out)); } } }
not sure if matters but, producer spark job version 0.8.2 ..so, bit older.
<dependency> <groupid>org.apache.spark</groupid> <artifactid>spark-streaming-kafka-0-8_2.11</artifactid> <version>2.0.1</version> </dependency>
-----update------
i able reproduce producer wrote. producer based on code using kafka_2.10-0.9.0.0.jar (same consumer)
import java.io.printstream; import java.util.properties; import java.util.concurrent.atomic.atomiclong; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producer; import org.apache.kafka.clients.producer.producerrecord; import com.google.gson.gson; import com.google.gson.gsonbuilder; public class producertest { static string topic = "events.topic"; static string bootstrapservers = "10.0.1.1:9092"; static atomiclong counter = new atomiclong(); public static void main(string[] args) { properties props = new properties(); props.put("bootstrap.servers", bootstrapservers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer"); producer<string, string> producer = null; dummyobject event = new dummyobject(); event.id = "somerandompsid"; event.country_cd = "us"; event.cont_id = "programid"; event.language = "en"; event.category = "category"; event.subcategory = "sub category"; event.source_id = "12234"; gson gson = new gsonbuilder().create(); string json = gson.tojson(event); try { producer = new kafkaproducer<>(props); system.out.println("kafka producer " + bootstrapservers + ",topic= " + topic); (int = 0; < 1000000; i++) { producer.send(new producerrecord<string, string>(topic, json)); counter.addandget(1); // system.out.println("sent:" + json); } system.out.println("sent messages " + counter.get()); } catch (exception e) { e.printstacktrace(); e.printstacktrace(new printstream(system.out)); } { producer.close(); } } }
-------another update-------
downgraded consumer use - kafka_2.11-0.8.2.1.jar , changed o use kafka streams . although didn't hit above bufferunderflow exception, consumer gets stuck @ times , not read messages on kafka. don't see exception either. below code,
import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import java.util.concurrent.atomic.atomiclong; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.message.messageandmetadata; public class kafkaconsumerthread extends thread { private final consumerconnector consumer; private final string topic; atomiclong counter = new atomiclong(0); public kafkaconsumerthread(string topic) { consumer = kafka.consumer.consumer.createjavaconsumerconnector(createconsumerconfig()); this.topic = topic; } private static consumerconfig createconsumerconfig() { properties props = new properties(); props.put("zookeeper.connect", kafkaproperties.zkconnect); props.put("group.id", kafkaproperties.groupid); props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); system.out.println("listening zookeper " + kafkaproperties.zkconnect); return new consumerconfig(props); } public void run() { system.out.println("listening topic " + topic); try { map<string, integer> topiccountmap = new hashmap<string, integer>(); topiccountmap.put(topic, new integer(1)); map<string, list<kafkastream<byte[], byte[]>>> consumermap = consumer.createmessagestreams(topiccountmap); kafkastream<byte[], byte[]> stream = consumermap.get(topic).get(0); system.out.println("got stream"); consumeriterator<byte[], byte[]> = stream.iterator(); system.out.println("iterating stream"); while (it.hasnext()) { messageandmetadata<byte[], byte[]> next = it.next(); string msg = new string(next.message()); // system.out.println(msg); counter.incrementandget(); system.out.println("consumed = " + counter.get()); } } catch (exception e) { system.out.println("something went wrong "); e.printstacktrace(system.out); } } public static void main(string args[]) { try { kafkaconsumerthread consumerthread = new kafkaconsumerthread(kafkaproperties.topic); consumerthread.start(); } catch (exception e) { system.out.println("something went wrong main loop "); e.printstacktrace(system.out); } (;;) { } } }
Comments
Post a Comment