Reading binaryFile with Spark Streaming -
does 1 know how setup `
streamingcontext.filestream [keyclass, valueclass, inputformatclass] (datadirectory)
to consume binary files.
where can find inputformatclass ? documentation give no links that. imagine valueclass related inputformatclass somehow.
in non-streaming version using method binaryfiles, can bytearrays each files. there way can same sparkstreaming ? if not can find details. meaning inputformat supportted , value class produces. can 1 pick keyclass, aren't element connected ?
if clarify use of method.
edit1
i have tried following:
val bfiles = ssc.filestreambyteswritable, byteswritable, sequencefileasbinaryinputformat
however compiler complain such:
[error] /xxxxxxxxx/src/main/scala/estimatorstreamingapp.scala:14: type arguments [org.apache.hadoop.io.byteswritable,org.apache.hadoop.io.byteswritable,org.apache.hadoop.mapred.sequencefileasbinaryinputformat] conform bounds of none of overloaded alternatives of [error] value filestream: [k, v, f <: org.apache.hadoop.mapreduce.inputformat[k,v]](directory: string, filter: org.apache.hadoop.fs.path => boolean, newfilesonly: boolean, conf: org.apache.hadoop.conf.configuration)(implicit evidence$10: scala.reflect.classtag[k], implicit evidence$11: scala.reflect.classtag[v], implicit evidence$12: scala.reflect.classtag[f])org.apache.spark.streaming.dstream.inputdstream[(k, v)] <and> [k, v, f <: org.apache.hadoop.mapreduce.inputformat[k,v]](directory: string, filter: org.apache.hadoop.fs.path => boolean, newfilesonly: boolean)(implicit evidence$7: scala.reflect.classtag[k], implicit evidence$8: scala.reflect.classtag[v], implicit evidence$9: scala.reflect.classtag[f])org.apache.spark.streaming.dstream.inputdstream[(k, v)] <and> [k, v, f <: org.apache.hadoop.mapreduce.inputformat[k,v]](directory: string)(implicit evidence$4: scala.reflect.classtag[k], implicit evidence$5: scala.reflect.classtag[v], implicit evidence$6: scala.reflect.classtag[f])org.apache.spark.streaming.dstream.inputdstream[(k, v)] [error] val bfiles = ssc.filestream[byteswritable, byteswritable, sequencefileasbinaryinputformat]("/xxxxxxxxx/casalini_streamed")
what doing wrong ?
follow link read about hadoop input formats
i found here documented answer sequence file format.
you facing compilation issue because of import missmatch. hadoop mapred vs mapreduce
e.g.
java
javapairinputdstream<text,byteswritable> dstream= sc.filestream("/somepath",org.apache.hadoop.io.text.class, org.apache.hadoop.io.byteswritable.class, org.apache.hadoop.mapreduce.lib.input.sequencefileasbinaryinputformat.class);
i didn't try in scala should similar;
val dstream = sc.filestream("/somepath", classof[org.apache.hadoop.io.text], classof[org.apache.hadoop.io.byteswritable], classof[org.apache.hadoop.mapreduce.lib.input.sequencefileasbinaryinputformat] ) ;
Comments
Post a Comment