If I publish several messages in a row to a Kafka cluster (using the new Producer API), I get a Future
from the producer for each message.
Now, assuming I have configured my producer to have max.in.flight.requests.per.connection = 1
and retries > 0
can I just wait on the last future and be certain that all previous have also been delivered (and in order)? Or do I need to wait on all Futures? In code, can I do this:
Producer<String, String> producer = new KafkaProducer<>(myConfig); Future<?> f = null; for(MessageType message : messages){ f = producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue()); } try { f.get(); } catch(ExecutionException e) { //handle exception }
instead of this:
Producer<String, String> producer = new KafkaProducer<>(myConfig); List<Future<?>> futureList = new ArrayList<>(); for(MessageType message : messages){ futureList.add(producer.send(new ProducerRecord<String,String>("myTopic", message.getKey(), message.getValue())); } try { for(Future<?> f : futureList) { f.get(); } } catch(ExecutionException e) { //handle exception }
and be assured that if nothing is caught here (from first snippet):
try { f.get(); } catch(ExecutionException e) {
then all my messages have been stored in the cluster in order (whether or not the producer performed any retries under the hood) and if something goes wrong then I WILL get an exception there even if it was not the last future (that I'm waiting on) that first encountered the problem?
Are there any more strange corner cases to be aware of?
0 comments:
Post a Comment