Tuesday, February 7, 2017

bulk insert into Cassandra UDT using spark

Leave a Comment

I am using spark to bulk insert Facebook data for analysis I have comment as a UDT in cassandra . the a table fbpost that has set of comments as a column Below is the schema

CREATE TYPE analytics.comment ( commentid text, commenttext varchar, username text, commentdatetime timestamp );  CREATE TABLE analytics.posts ( postid text, username text, comments set<frozen<comment>>, datetime timestamp, posttext varchar, PRIMARY KEY (datetime, username)  

from the spark job written in Java I am creating a JavaRDD(of type FBposts) which i need to save in the Cassandra table. I have pojos for both FBpost and Comments .

 javaFunctions(fbPostFromMysql).writerBuilder("analytics", "posts", fbPostWriter).saveToCassandra(); 

May be I am not doing the mapping for my Java comment pojo to UDTValue correctly which is why i am getting TypeConversionException below is the stacktrace

localhoatastax.spark.connector.types.TypeConversionException: Cannot convert object Comment [commentId=642528915901340_642701759217389, commenttext=Prix, username=Angel Nannousa, commentdatetime=2016-07-0  class com.prophecy.spark.cassandra.model.Comment to com.datastax.driver.core.UDTValue 

Here is the java code for rowwriter

public class FbPostRowWriter implements RowWriter<FbPost> { private static final long serialVersionUID = 1L; private static RowWriter<FbPost> writer = new FbPostRowWriter();  // Factory public static class FbPostRowWriterFactory implements RowWriterFactory<FbPost>, Serializable{     private static final long serialVersionUID = 1L;      @Override     public RowWriter<FbPost> rowWriter(TableDef tableDef, IndexedSeq<ColumnRef> arg1) {         return writer;     } }  @Override public Seq<String> columnNames() {           return scala.collection.JavaConversions.asScalaBuffer(FbPost.columns()).toList(); }  @Override public void readColumnValues(FbPost summary, Object[] buffer) {     buffer[0] = summary.getPostId();     buffer[1] = summary.getUsername();     buffer[3] = summary.getDatetime();     buffer[4] = summary.getPosttext();       buffer[2] = summary.getComments();     } } 

Please suggest How do I map set of Comment to be able to insert data in UDT column

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment