I am trying the sample code on Kafka Twitter streaming from the following tutorial.
https://www.tutorialspoint.com/apache_kafka/apache_kafka_real_time_application.htm
Here is my code:
import java.util.Arrays; import java.util.Properties; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.*; import twitter4j.conf.*; import twitter4j.StatusListener; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaTwitterProducer { public static void main(String[] args) throws Exception { LinkedBlockingQueue<Status> queue = new LinkedBlockingQueue<Status>(1000); String consumerKey = “XXXXXXXXXXXXXXXXX”; //args[0].toString(); String consumerSecret = "XXXXXXXXXXXXXXXXX"; //args[1].toString(); String accessToken = "XXXXXXXXXXXXXXXXX" ; //args[2].toString(); String accessTokenSecret = "XXXXXXXXXXXXXXXXX" ; //args[3].toString(); String topicName = "twittertest" ; //args[4].toString(); //String[] arguments = args.clone(); String[] keyWords = {“Hello”,”Hi”,”Welcome”}; //Arrays.copyOfRange(arguments, 5, arguments.length); ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret); TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance(); StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); System.out.println("@" + status.getUser().getScreenName() + " - " + status.getText()); // System.out.println("@" + status.getUser().getScreen-Name()); /*for(URLEntity urle : status.getURLEntities()) { System.out.println(urle.getDisplayURL()); }*/ /*for(HashtagEntity hashtage : status.getHashtagEntities()) { System.out.println(hashtage.getText()); }*/ } @Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId()); } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { System.out.println("Got track limitation notice:" + numberOfLimitedStatuses); } @Override public void onScrubGeo(long userId, long upToStatusId) { System.out.println("Got scrub_geo event userId:" + userId + "upToStatusId:" + upToStatusId); } @Override public void onStallWarning(StallWarning warning) { // System.out.println("Got stall warning:" + warning); } @Override public void onException(Exception ex) { ex.printStackTrace(); } }; twitterStream.addListener(listener); FilterQuery query = new FilterQuery().track(keyWords); twitterStream.filter(query); Thread.sleep(5000); //Add Kafka producer config settings Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("client.id", "SampleProducer"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //props.put("key.serializer", // "org.apache.kafka.common.serialization.StringSerializer"); //props.put("value.serializer", // "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); int i = 0; int j = 0; while(i < 10) { Status ret = queue.poll(); if (ret == null) { Thread.sleep(100); i++; }else { for(HashtagEntity hashtage : ret.getHashtagEntities()) { System.out.println("Hashtag: " + hashtage.getText()); producer.send(new ProducerRecord<String, String>( topicName, Integer.toString(j++), hashtage.getText())); } } } producer.close(); Thread.sleep(5000); twitterStream.shutdown(); } }
When I run this as Java application, I am getting the following error: (this is not compile/build error)
Read timed out Relevant discussions can be found on the Internet at: http://www.google.co.jp/search?q=1169356e or http://www.google.co.jp/search?q=c04b39f0 TwitterException{exceptionCode=[1169356e-c04b39f0 c2863472-491bffd7], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.4} at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:179) at twitter4j.HttpClientBase.request(HttpClientBase.java:57) at twitter4j.HttpClientBase.post(HttpClientBase.java:86) at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:346) at twitter4j.TwitterStreamImpl$8.getStream(TwitterStreamImpl.java:322) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:552) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:170) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) at sun.security.ssl.InputRecord.read(InputRecord.java:503) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:930) at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) at java.io.BufferedInputStream.read(BufferedInputStream.java:345) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1536) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441) at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480) at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338) at twitter4j.HttpResponseImpl.<init>(HttpResponseImpl.java:35) at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:143) ... 5 more
I am not sure what is the problem here. Could someone suggest me the solution or fix please?
Ok Update here: It is working now if key words are generic like String[] keyWords = {"USA","Basketball","Sports};
If I change this to my requirement with specific keywords like my company name, product name etc., for ex: String[] keyWords = {"XXX","YYY","ZZZ"}; then the java application is getting terminated. What could be the reason? How to fix it in this code? Please advise?
1 Answers
Answers 1
The Twitter4J source code shows that this exception is thrown because of Http connection time out.
I get similar exception by setting a low value for connection timeout.
ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey(consumerKey) .setOAuthConsumerSecret(consumerSecret) .setOAuthAccessToken(accessToken) .setOAuthAccessTokenSecret(accessTokenSecret) .setHttpStreamingReadTimeout(10);
This is the stack trace I get.
TwitterException{exceptionCode=[1169356e-c3c3770e 1169356e-c3c376e4], statusCode=-1, message=null, code=-1, retryAfter=-1, rateLimitStatus=null, version=4.0.6} at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:179) at twitter4j.HttpClientBase.request(HttpClientBase.java:57) at twitter4j.HttpClientBase.post(HttpClientBase.java:86) at twitter4j.TwitterStreamImpl.getFilterStream(TwitterStreamImpl.java:347) at twitter4j.TwitterStreamImpl$8.getStream(TwitterStreamImpl.java:323) at twitter4j.TwitterStreamImpl$TwitterStreamConsumer.run(TwitterStreamImpl.java:554) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) at sun.security.ssl.InputRecord.read(InputRecord.java:503) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559) at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185) at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1316) at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1291) at sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:250) at twitter4j.HttpClientImpl.handleRequest(HttpClientImpl.java:137) ... 5 more
For your example, please try setting a higher value for HttpStreamingReadTimeout. The default value in the code is 40 seconds. Try setting it to 120,000 (milliseconds) or higher. That should work.
0 comments:
Post a Comment