Thursday, March 8, 2018

Kafka Twitter streaming TwitterException error

Leave a Comment

I am trying the sample code on Kafka Twitter streaming from the following tutorial.

https://www.tutorialspoint.com/apache_kafka/apache_kafka_real_time_application.htm

Here is my code:

import java.util.Arrays; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue;  import twitter4j.*; import twitter4j.conf.*; import twitter4j.StatusListener;  import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;  public class KafkaTwitterProducer {    public static void main(String[] args) throws Exception {       LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Status>(1000);         String consumerKey = “XXXXXXXXXXXXXXXXX”; //args[0].toString();       String consumerSecret = "XXXXXXXXXXXXXXXXX"; //args[1].toString();       String accessToken = "XXXXXXXXXXXXXXXXX" ; //args[2].toString();       String accessTokenSecret = "XXXXXXXXXXXXXXXXX" ; //args[3].toString();       String topicName = "twittertest" ; //args[4].toString();       //String[] arguments = args.clone();       String[] keyWords = {“Hello”,”Hi”,”Welcome”}; //Arrays.copyOfRange(arguments, 5, arguments.length);        ConfigurationBuilder cb = new ConfigurationBuilder();       cb.setDebugEnabled(true)          .setOAuthConsumerKey(consumerKey)          .setOAuthConsumerSecret(consumerSecret)          .setOAuthAccessToken(accessToken)          .setOAuthAccessTokenSecret(accessTokenSecret);        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();        StatusListener listener = new StatusListener() {           @Override          public void onStatus(Status status) {                   queue.offer(status);               System.out.println("@" + status.getUser().getScreenName()                       + " - " + status.getText());             // System.out.println("@" + status.getUser().getScreen-Name());              /*for(URLEntity urle : status.getURLEntities()) {                System.out.println(urle.getDisplayURL());             }*/              /*for(HashtagEntity hashtage : status.getHashtagEntities()) {                System.out.println(hashtage.getText());             }*/          }           @Override          public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {             System.out.println("Got a status deletion notice id:"                 + statusDeletionNotice.getStatusId());          }           @Override          public void onTrackLimitationNotice(int numberOfLimitedStatuses) {              System.out.println("Got track limitation notice:" +                       numberOfLimitedStatuses);          }           @Override          public void onScrubGeo(long userId, long upToStatusId) {              System.out.println("Got scrub_geo event userId:" + userId +                       "upToStatusId:" + upToStatusId);          }                 @Override          public void onStallWarning(StallWarning warning) {             // System.out.println("Got stall warning:" + warning);          }           @Override          public void onException(Exception ex) {             ex.printStackTrace();          }       };       twitterStream.addListener(listener);        FilterQuery query = new FilterQuery().track(keyWords);       twitterStream.filter(query);        Thread.sleep(5000);        //Add Kafka producer config settings       Properties props = new Properties();       props.put("bootstrap.servers", "localhost:9092");        props.put("client.id", "SampleProducer");       props.put("auto.commit.interval.ms", "1000");       props.put("session.timeout.ms", "30000");            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //props.put("key.serializer",          // "org.apache.kafka.common.serialization.StringSerializer");       //props.put("value.serializer",          // "org.apache.kafka.common.serialization.StringSerializer");        Producer<String, String> producer = new KafkaProducer<String, String>(props);       int i = 0;       int j = 0;        while(i < 10) {          Status ret = queue.poll();           if (ret == null) {             Thread.sleep(100);             i++;          }else {             for(HashtagEntity hashtage : ret.getHashtagEntities()) {                System.out.println("Hashtag: " + hashtage.getText());                producer.send(new ProducerRecord<String, String>(                   topicName, Integer.toString(j++), hashtage.getText()));             }          }       }       producer.close();       Thread.sleep(5000);       twitterStream.shutdown();    } } 

When I run this as Java application, I am getting the following error: (this is not compile/build error)

Read timed out Relevant discussions can be found on the Internet at:     http://www.google.co.jp/search?q=1169356e or     http://www.google.co.jp/search?q=c04b39f0 TwitterException{exceptionCode=[1169356e-c04b39f0 c2863472-491bffd7], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.4}     at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:179)     at twitter4j.HttpClientBase.request(HttpClientBase.java:57)     at twitter4j.HttpClientBase.post(HttpClientBase.java:86)     at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:346)     at twitter4j.TwitterStreamImpl$8.getStream(TwitterStreamImpl.java:322)     at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:552) Caused by: java.net.SocketTimeoutException: Read timed out     at java.net.SocketInputStream.socketRead0(Native Method)     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)     at java.net.SocketInputStream.read(SocketInputStream.java:170)     at java.net.SocketInputStream.read(SocketInputStream.java:141)     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)     at sun.security.ssl.InputRecord.read(InputRecord.java:503)     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973)     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930)     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)     at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)     at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)     at java.io.BufferedInputStream.read(BufferedInputStream.java:345)     at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)     at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)     at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536)     at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)     at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)     at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)     at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:35)     at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:143)     ... 5 more 

I am not sure what is the problem here. Could someone suggest me the solution or fix please?

Ok Update here: It is working now if key words are generic like String[] keyWords = {"USA","Basketball","Sports};

If I change this to my requirement with specific keywords like my company name, product name etc., for ex: String[] keyWords = {"XXX","YYY","ZZZ"}; then the java application is getting terminated. What could be the reason? How to fix it in this code? Please advise?

1 Answers

Answers 1

The Twitter4J source code shows that this exception is thrown because of Http connection time out.

I get similar exception by setting a low value for connection timeout.

ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true)  .setOAuthConsumerKey(consumerKey)  .setOAuthConsumerSecret(consumerSecret)  .setOAuthAccessToken(accessToken)  .setOAuthAccessTokenSecret(accessTokenSecret)  .setHttpStreamingReadTimeout(10); 

This is the stack trace I get.

TwitterException{exceptionCode=[1169356e-c3c3770e 1169356e-c3c376e4], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.6} at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:179) at twitter4j.HttpClientBase.request(HttpClientBase.java:57) at twitter4j.HttpClientBase.post(HttpClientBase.java:86) at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:347) at twitter4j.TwitterStreamImpl$8.getStream(TwitterStreamImpl.java:323) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:554)                                                                            Caused by: java.net.SocketTimeoutException: Read timed out     at java.net.SocketInputStream.socketRead0(Native Method)     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)     at java.net.SocketInputStream.read(SocketInputStream.java:171)     at java.net.SocketInputStream.read(SocketInputStream.java:141)     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)     at sun.security.ssl.InputRecord.read(InputRecord.java:503)     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)     at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)     at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)     at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)     at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)     at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)     at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316)     at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291)     at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250)     at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:137)     ... 5 more 

For your example, please try setting a higher value for HttpStreamingReadTimeout. The default value in the code is 40 seconds. Try setting it to 120,000 (milliseconds) or higher. That should work.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment