java - Storm Example WordCount Error InvalidTopologyException -
i'm kind of noob in programming , storm. have example book: "getting started apache strom". i'm using storm 1.1.0 , jdk 1.8. when try run codes either in eclipse neon or command "storm jar ...", error:
[main] error o.a.s.s.o.a.z.s.nioservercnxnfactory - thread thread[main,5,main] died org.apache.storm.generated.invalidtopologyexception:null does knows error , how can fix it?
i've written codes this:
spout:
public class wordreader implements irichspout{ topologycontext context; spoutoutputcollector collector; filereader filereader; private boolean completed = false; public void ack(object msgid){ system.out.println("ok: "+msgid); } public void fail(object msgid){ system.out.println("fail: "+msgid); } public void nexttuple(){ if (completed){ try { thread.sleep(1000); } catch (exception e) { //do nothing } //it should return function return; } string str; try { while ((str = reader.readline()) != null){ this.collector.emit(new values(str),str); } }catch (exception e) { throw new runtimeexception("error reading tuple",e); } { completed = true; } } public void open(map conf, topologycontext context, spoutoutputcollector collector){ try { this.context = context; this.filereader = new filereader(conf.get("words").tostring()); }catch(filenotfoundexception e) { throw new runtimeexception("error!"); } this.collector = collector; } public void declareoutputfileds(outputfieldsdeclarer declarer) { declarer.declare(new fields("line")); } public void close() { // todo auto-generated method stub } public void activate() { // todo auto-generated method stub } public void deactivate() { // todo auto-generated method stub } public void declareoutputfields(outputfieldsdeclarer declarer) { // todo auto-generated method stub } public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; } }
i have 2 bolts: first 1 is:
public class wordnormalizer implements irichbolt{ private outputcollector collector; public void execute (tuple input){ string sentence = input.getstring(0); string words[] = sentence.split(" "); //this foreach structure navigate in array (string word: words){ word= word.trim(); word = word.tolowercase(); this.collector.emit(new values(word)); } collector.ack(input); } public void declareoutputfields(outputfieldsdeclarer declarer){ declarer.declare(new fields("word")); } public void prepare(map stormconf, topologycontext context, outputcollector collector) { this.collector = collector; } public void cleanup() { // todo auto-generated method stub } public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; }} and last 1 is:
public class wordcounter implements irichbolt{
string name; integer id; map<string, integer> counters; private outputcollector collector; public void execute(tuple input) { string str = input.getstring(0); if(!counters.containskey(str)){ counters.put(str, 1); }else{ integer c = counters.get(str) + 1; counters.put(str, c); } collector.ack(input); } public void prepare(map conf, topologycontext context, outputcollector collector) { this.counters = new hashmap<string, integer>(); this.collector = collector; this.name = context.getthiscomponentid(); this.id = context.getthistaskid(); } public void cleanup() { system.out.println("-- word counter ["+name+"-"+id+"] --"); for(map.entry<string, integer> entry : counters.entryset()){ system.out.println(entry.getkey()+": "+entry.getvalue()); } } public void declareoutputfields(outputfieldsdeclarer declarer) { // todo auto-generated method stub } public map<string, object> getcomponentconfiguration() { // todo auto-generated method stub return null; }} and main class is:
public class topologymain {
public static void main(string[] args) { topologybuilder builder = new topologybuilder(); builder.setspout("word-reader", new wordreader()); builder.setbolt("word-normalizer", new wordnormalizer()).shufflegrouping("word-reader"); builder.setbolt("word-counter", new wordcounter()).shufflegrouping("word-normalizer"); config conf = new config(); conf.put(config.topology_max_spout_pending, 1); conf.put("word", 0); conf.setdebug(true); localcluster cluster = new localcluster(); cluster.submittopology("word", conf, builder.createtopology()); try{ thread.sleep(2000); } catch (exception e) { // todo: handle exception } cluster.shutdown(); }} t
you have typo in wordnormalizer when constructing topology. should word-normalizer connected word-counter:
builder.setbolt("word-normalizer", new wordnormalizer()).shufflegrouping("word-reader"); builder.setbolt("word-counter", new wordcounter()).shufflegrouping("word-normalizer");
Comments
Post a Comment