I am creating one Spark-Cassandra App (Spark 1.6.0 & spark-cassandra-connector 1.6.0-M1), in which i am asking multiple users to enter their Cassandra properties like Host, Username, Password, Keyspace, Table and others.
To change the above properties dynamically and create dataframe from Cassandra table, I Googled and found out some information
http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/
val csc = new CassandraSQLContext(SparkConnection._sc) csc.setConf(s"${cluster}/spark.cassandra.connection.host", host) csc.setConf(s"${cluster}/spark.cassandra.connection.port", port) csc.setConf(s"${cluster}/spark.cassandra.auth.username", username) csc.setConf(s"${cluster}/spark.cassandra.auth.password", password) csc.read.format("org.apache.spark.sql.cassandra") .options(Map("cluster" -> cluster, "keyspace" -> keySpace, "table" -> table)) .load()
I tried with mention properties, Clusters those doesn't require authentication is connecting successfully but when i try to connect with secure cluster using username & password properties, i am getting some error.
Exception in thread "Thread-10" java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.17}:9042 at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148) at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109) at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184) at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267) at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119) at com.bdbizviz.pa.spark.util.ServiceUtil$.readData(ServiceUtil.scala:97) at com.bdbizviz.pa.spark.services.SparkServices$$anon$1.run(SparkServices.scala:114) at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /192.168.1.17:9042: Host /192.168.1.17:9042 requires authentication, but no authenticator found in Cluster configuration at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40) at com.datastax.driver.core.Connection$5.apply(Connection.java:250) at com.datastax.driver.core.Connection$5.apply(Connection.java:234) at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861) at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297) at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156) at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145) at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185) at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1174) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005) at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:831) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:346) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:254) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ... 1 more
0 comments:
Post a Comment