Applying a schema to a Spark's Dataset of a java object -
there's similar issue here: how add schema dataset in spark?
however issue i'm facing have predefined dataset<obj1>
, want define schema match data-members. end goal able join between 2 java objects.
sample code:
dataset<row> rowdataset = spark.getspark().sqlcontext().createdataframe(rowrdd, schema).todf(); dataset<myobj> objresult = rowdataset.map((mapfunction<row, myobj>) row -> new myobj( row.getint(row.fieldindex("field1")), row.isnullat(row.fieldindex("field2")) ? "" : row.getstring(row.fieldindex("field2")), row.isnullat(row.fieldindex("field3")) ? "" : row.getstring(row.fieldindex("field3")), row.isnullat(row.fieldindex("field4")) ? "" : row.getstring(row.fieldindex("field4")) ), encoders.javaserialization(myobj.class));
if i'm printing schema of row dataset schema expected:
rowdataset.printschema(); root |-- field1: integer (nullable = false) |-- field2: string (nullable = false) |-- field3: string (nullable = false) |-- field4: string (nullable = false)
if i'm printing object dataset i'm losing actual schema
objresult.printschema(); root |-- value: binary (nullable = true)
the question how can apply schema dataset<myobj>
?
below code snippet, tried , spark behaves expected, seems rootcause of problem else not map function.
sparksession session = sparksession.builder().config(conf).getorcreate(); dataset<row> ds = session.read().text("<some path>"); encoder<employee> employeeencode = encoders.bean(employee.class); ds.map(new mapfunction<row, employee>() { @override public employee call(row value) throws exception { return new employee(value.getstring(0).split(",")); } }, employeeencode).printschema();
output:
root |-- age: integer (nullable = true) |-- name: string (nullable = true)
//employee bean
public class employee { public string name; public integer age; public employee(){ } public employee(string [] args){ this.name=args[0]; this.age=integer.parseint(args[1]); } public string getname() { return name; } public void setname(string name) { this.name = name; } public integer getage() { return age; } public void setage(integer age) { this.age = age; } }
Comments
Post a Comment