java nio java.net.ConnectException: Connection refused: no further thaninformation

Re: Client Error: &java.net.ConnectException: Connection refused: no further information& after update HBase-0.20.6 to HBase0.90.2
hbase-user mailing list archives
Message view
陈加俊 &cjjvict...@gmail.com&
Re: Client Error: &java.net.ConnectException: Connection refused: no further information& after update HBase-0.20.6 to HBase0.90.2
Soryy I had make a mistake at hbase.zookeeper.property.clientPort
On Tue, Apr 12, 2011 at 1:08 PM, 陈加俊 && wrote:
& WARN : 04-12 12:54:13 Session 0x0 for server null, unexpected error,
& closing socket connection and attempting reconnect
& java.net.ConnectException: Connection refused: no further information
& at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:574)
& at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119)
& WARN : 04-12 12:54:14 Session 0x0 for server null, unexpected error,
& closing socket connection and attempting reconnect
& java.net.ConnectException: Connection refused: no further information
& at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:574)
& at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119)
& WARN : 04-12 12:54:16 Session 0x0 for server null, unexpected error,
& closing socket connection and attempting reconnect
& java.net.ConnectException: Connection refused: no further information
& at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:574)
& at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1119)
& Thanks & Best regards
Thanks & Best regards
Unnamed multipart/alternative (inline, None, 0 bytes)
(inline, Quoted Printable, 1476 bytes)sample exception.4Expert tipThis was a bug at HBase 1.0.1, it has been patched, so an update to 1.0.2 or further should solve the problem. URL for the Jira thread:
1Call From localhost.localdomain/127.0.0.1 to 0.0.0.0:8032 failed on connection exception: java.net.ConnectException: Connection refused; For more details see:
http://wiki.apache.org/hadoop/ConnectionRefusedjava.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at (SocketChannelImpl.java:739) at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529) at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:493) at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:547) at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:642) at org.apache.hadoop.ipc.Client$Connection.access$2600(Client.java:314) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1399) at org.apache.hadoop.ipc.Client.call(Client.java:1318) at org.apache.hadoop.ipc.Client.call(Client.java:1300) at (ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy21.getNewApplication(Unknown Source) at (ApplicationClientProtocolPBClientImpl.java:167) at sun.reflect.GeneratedMethodAccessor45.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at (Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186) at (RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy22.getNewApplication(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNewApplication(YarnClientImpl.java:127) at (YarnClientImpl.java:135) at org.apache.hadoop.mapred.ResourceMgrDelegate.getNewJobID(ResourceMgrDelegate.java:175) at org.apache.hadoop.mapred.YARNRunner.getNewJobID(YARNRunner.java:229) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:355) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1268) at (Job.java:1265) at java.security.AccessController.doPrivileged(Native Method) at (Subject.java:415) at (UserGroupInformation.java:1491) at org.apache.hadoop.mapreduce.Job.submit(Job.java:1265) at org.apache.hadoop.mapred.JobClient$1.run(JobClient.java:562) at (JobClient.java:557) at java.security.AccessController.doPrivileged(Native Method) at (Subject.java:415) at (UserGroupInformation.java:1491) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:557) at (JobClient.java:548) at org.apache.hadoop.hive.ql.exec.mr.ExecDriver.execute(ExecDriver.java:420) at org.apache.hadoop.hive.ql.exec.mr.MapRedTask.execute(MapRedTask.java:136) at org.apache.hadoop.hive.ql.exec.Task.executeTask(Task.java:153) at org.apache.hadoop.hive.ql.exec.TaskRunner.runSequential(TaskRunner.java:85) at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1503) at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1270) at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1088) at org.apache.hadoop.hive.ql.Driver.run(Driver.java:911) at (Driver.java:901) at org.apache.hadoop.hive.cli.CliDriver.processLocalCmd(CliDriver.java:268) at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:220) at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:423) at org.apache.hadoop.hive.cli.CliDriver.executeDriver(CliDriver.java:792) at org.apache.hadoop.hive.cli.CliDriver.run(CliDriver.java:686) at org.apache.hadoop.hive.cli.CliDriver.main(CliDriver.java:625) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at (Method.java:606) at (RunJar.java:212)Know the solutions? Share your knowledge to help other developers to debug faster.Write tipAll,I'm having an issue with an integration test I've setup.
This is using0.8-beta1.The test is to verify that no messages are dropped (or the sender gets anexception thrown back if failure), while doing a rolling restart of acluster of 2 brokers.The producer is configured to use 'request.required.acks' = '1'.The servers are set up to run locally on localhost, on different ports, anddifferent data dirs.
The producer connects with a metadata brokerlistlike:
&localhost:2024,localhost:1025& (so no vip).
The servers are setup with a default replication factor of 2.
The servers have controlledshutdown enabled, as well.The producer code looks like this:&&&&&...&&&&&Producer&Integer, T& producer = getProducer();&&&&&try {&&&&&&&KeyedMessage&Integer, T& msg = new KeyedMessage&Integer, T&(topic,message);&&&&&&&producer.send(msg);&&&&&&&&&&&&} catch (RuntimeException e) {&&&&&&&logger.warn(&Error sending data to kafka&, e);&&&&&&&&&&&&}&&&&&...The test sends groups of messages at each stage of the test (e.g. serversup, first server going down, first server coming up, second server goingdown, etc.).
Then a consumer connects and consumes all the messages, tomake sure they all arrived ok.It seems intermittently, a single message gets dropped, right after one ofthe servers starts going down.
It doesn't happen always, seems to happen 1out of every 20 test runs or so.
Here's some sample output.
I see theexception inside the producer code, but I don't see the producer.sendmethod ever having an exception thrown back out to the caller (the log line&Error sending data to kafka& is never triggered).What's interesting, is that it looks like the exceptions are happening onmessage 3, but when the consumer subsequently consumes back all themessages in the broker cluster, it seems message 2 (and not message 3) ismissing:......7136 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 987150 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 997163 [Thread-2] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Shutting down server27163 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 07164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer
- Shuttingdown KafkaServer7176 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 17189 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 27203 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 37394 [kafka-request-handler-5] WARN state.change.logger
- Broker received update metadata request with correlation id 7 from anold controller
with epoch 2. Latest known controller epoch is 37397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis
-[KafkaApi-] error when handling requestName:UpdateMetadataRVersion:0;Controller:;ControllerEpoch:2;CorrelationId:7;ClientId:id_-host_null-port_1026;PartitionState:[test-topic,0]-&(LeaderAndIsrInfo:(Leader:,ISR:,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:);AliveBrokers:id:,host:192.168.1.105,port:1025,id:,host:192.168.1.105,port:1026kafka.common.ControllerMovedException: Broker
received updatemetadata request with correlation id 7 from an old controller with epoch 2. Latest known controller epoch is 3at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)at kafka.server.KafkaApis.handle(KafkaApis.scala:72)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)at java.lang.Thread.run(Thread.java:724)8039 [Controller--to-broker--send-thread] WARNkafka.controller.RequestSendThread
-[Controller--to-broker--send-thread], Controller fails to send a request to broker java.nio.channels.AsynchronousCloseExceptionatjava.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)atkafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer
- Shut downcomplete for KafkaServer17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler
- Failed tosend producer request with correlation id 810 to broker
with datafor partitions [test-topic,0]java.net.SocketTimeoutExceptionat sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)atkafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)at kafka.producer.SyncProducer.send(SyncProducer.scala:100)atkafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)at scala.collection.Iterator$class.foreach(Iterator.scala:631)at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)atscala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)atkafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17319 [Thread-1] ERROR kafka.producer.SyncProducer
- Producer connectionto localhost:1026 unsuccessfuljava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17322 [Thread-1] WARN kafka.client.ClientUtils$
- Fetching topic metadatawith correlation id 811 for topics [Set(test-topic)] from broker[id:1,host:localhost,port:1026] failedjava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17340 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 417353 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 517365 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 6......23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 98'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 99'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 0'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 1'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 3'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 4'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 5'
Search Discussions
The occasional single message loss could happen sincerequired.request.acks=1 and the leader is shut down before the followergets a chance to copy the message. Can you try your test with num acks setto -1 ?Thanks,NehaOn Oct 4,
PM, &Jason Rosenberg& wrote:All,I'm having an issue with an integration test I've setup.
This is using0.8-beta1.The test is to verify that no messages are dropped (or the sender gets anexception thrown back if failure), while doing a rolling restart of acluster of 2 brokers.The producer is configured to use 'request.required.acks' = '1'.The servers are set up to run locally on localhost, on different ports, anddifferent data dirs.
The producer connects with a metadata brokerlistlike:
&localhost:2024,localhost:1025& (so no vip).
The servers are setup with a default replication factor of 2.
The servers have controlledshutdown enabled, as well.The producer code looks like this:...Producer&Integer, T& producer = getProducer();try {KeyedMessage&Integer, T& msg = new KeyedMessage&Integer, T&(topic,message);producer.send(msg);} catch (RuntimeException e) {logger.warn(&Error sending data to kafka&, e);}...The test sends groups of messages at each stage of the test (e.g. serversup, first server going down, first server coming up, second server goingdown, etc.).
Then a consumer connects and consumes all the messages, tomake sure they all arrived ok.It seems intermittently, a single message gets dropped, right after one ofthe servers starts going down.
It doesn't happen always, seems to happen 1out of every 20 test runs or so.
Here's some sample output.
I see theexception inside the producer code, but I don't see the producer.sendmethod ever having an exception thrown back out to the caller (the log line&Error sending data to kafka& is never triggered).What's interesting, is that it looks like the exceptions are happening onmessage 3, but when the consumer subsequently consumes back all themessages in the broker cluster, it seems message 2 (and not message 3) ismissing:......7136 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 987150 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 997163 [Thread-2] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Shutting down server27163 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 07164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer
- Shuttingdown KafkaServer7176 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 17189 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 27203 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 37394 [kafka-request-handler-5] WARN state.change.logger
- Broker received update metadata request with correlation id 7 from anold controller
with epoch 2. Latest known controller epoch is 37397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis
-[KafkaApi-] error when handling requestName:UpdateMetadataRVersion:0;Controller:;ControllerEpoch:2;CorrelationId:7;ClientId:id_-host_null-port_1026;PartitionState:[test-topic,0]-&(LeaderAndIsrInfo:(Leader:,ISR:,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:);AliveBrokers:id:,host:192.168.1.105,port:1025,id:,host:192.168.1.105,port:1026kafka.common.ControllerMovedException: Broker
received updatemetadata request with correlation id 7 from an old controller with epoch 2. Latest known controller epoch is 3at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)at kafka.server.KafkaApis.handle(KafkaApis.scala:72)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)at java.lang.Thread.run(Thread.java:724)8039 [Controller--to-broker--send-thread] WARNkafka.controller.RequestSendThread
-[Controller--to-broker--send-thread], Controller fails to send a request to broker java.nio.channels.AsynchronousCloseExceptionatjava.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)atkafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer
- Shut downcomplete for KafkaServer17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler
- Failed tosend producer request with correlation id 810 to broker
with datafor partitions [test-topic,0]java.net.SocketTimeoutExceptionat sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)atkafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)at kafka.producer.SyncProducer.send(SyncProducer.scala:100)atkafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)at scala.collection.Iterator$class.foreach(Iterator.scala:631)at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)atscala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)atkafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17319 [Thread-1] ERROR kafka.producer.SyncProducer
- Producer connectionto localhost:1026 unsuccessfuljava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17322 [Thread-1] WARN kafka.client.ClientUtils$
- Fetching topic metadatawith correlation id 811 for topics [Set(test-topic)] from broker[id:1,host:localhost,port:1026] failedjava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17340 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 417353 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 517365 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 6......23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 98'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 99'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 0'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 1'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 3'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 4'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 5'
Neha,I'm not sure I understand.
I would have thought that if the leaderacknowledges receipt of a message, and is then shut down cleanly (withcontrolled shutdown enabled), that it would be able to reliably persist anyin memory buffered messages (and replicate them), before shutting down.&&Shouldn't this be part of the contract?
It should be able to make surethis happens before shutting down, no?I would understand a message dropped if it were a hard shutdown.I'm not sure then how to implement reliable delivery semantics, whileallowing a rolling restart of the broker cluster (or even to tolerate asingle node failure, where one node might be down for awhile and need to bereplaced or have a disk repaired).
In this case, if we need to userequired.request.acks=-1, that will pretty much prevent any successfulmessage producing while any of the brokers for a partition is unavailable.&&So, I don't think that's an option.
(Not to mention the performancedegradation).Is there not a way to make this work more reliably with leader onlyacknowledgment, and clean/controlled shutdown?My test does succeed, as expected, with acks = -1, at least for the 100 orso iterations I've let it run so far.
It does on occasion send duplicates(but that's ok with me).JasonOn Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede wrote:The occasional single message loss could happen sincerequired.request.acks=1 and the leader is shut down before the followergets a chance to copy the message. Can you try your test with num acks setto -1 ?Thanks,NehaOn Oct 4,
PM, &Jason Rosenberg& wrote:All,I'm having an issue with an integration test I've setup.
This is using0.8-beta1.The test is to verify that no messages are dropped (or the sender gets anexception thrown back if failure), while doing a rolling restart of acluster of 2 brokers.The producer is configured to use 'request.required.acks' = '1'.The servers are set up to run locally on localhost, on different ports, anddifferent data dirs.
The producer connects with a metadata brokerlistlike:
&localhost:2024,localhost:1025& (so no vip).
The servers are setup with a default replication factor of 2.
The servers have controlledshutdown enabled, as well.The producer code looks like this:...Producer&Integer, T& producer = getProducer();try {KeyedMessage&Integer, T& msg = new KeyedMessage&Integer, T&(topic,message);producer.send(msg);} catch (RuntimeException e) {logger.warn(&Error sending data to kafka&, e);}...The test sends groups of messages at each stage of the test (e.g. serversup, first server going down, first server coming up, second server goingdown, etc.).
Then a consumer connects and consumes all the messages, tomake sure they all arrived ok.It seems intermittently, a single message gets dropped, right after one ofthe servers starts going down.
It doesn't happen always, seems to happen 1out of every 20 test runs or so.
Here's some sample output.
I see theexception inside the producer code, but I don't see the producer.sendmethod ever having an exception thrown back out to the caller (the log line&Error sending data to kafka& is never triggered).What's interesting, is that it looks like the exceptions are happening onmessage 3, but when the consumer subsequently consumes back all themessages in the broker cluster, it seems message 2 (and not message 3) ismissing:......7136 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 987150 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 997163 [Thread-2] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Shutting down server27163 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 07164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer
- Shuttingdown KafkaServer7176 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 17189 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 27203 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 37394 [kafka-request-handler-5] WARN state.change.logger
- Broker received update metadata request with correlation id 7 from anold controller
with epoch 2. Latest known controller epoch is 37397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis
-[KafkaApi-] error when handling requestName:UpdateMetadataRVersion:0;Controller:;ControllerEpoch:2;CorrelationId:7;ClientId:id_-host_null-port_1026;PartitionState:[test-topic,0]-&(LeaderAndIsrInfo:(Leader:,ISR:,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:);AliveBrokers:id:,host:192.168.1.105,port:1025,id:,host:192.168.1.105,port:1026kafka.common.ControllerMovedException: Broker
received updatemetadata request with correlation id 7 from an old controller with epoch 2. Latest known controller epoch is 3atkafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)at kafka.server.KafkaApis.handle(KafkaApis.scala:72)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)at java.lang.Thread.run(Thread.java:724)8039 [Controller--to-broker--send-thread] WARNkafka.controller.RequestSendThread
-[Controller--to-broker--send-thread], Controller fails to send a request to broker java.nio.channels.AsynchronousCloseExceptionatjava.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)atkafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer
- Shut downcomplete for KafkaServer17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler
- Failed tosend producer request with correlation id 810 to broker
with datafor partitions [test-topic,0]java.net.SocketTimeoutExceptionatsun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)atkafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)at kafka.producer.SyncProducer.send(SyncProducer.scala:100)atkafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)at scala.collection.Iterator$class.foreach(Iterator.scala:631)atscala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)atscala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)atkafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17319 [Thread-1] ERROR kafka.producer.SyncProducer
- Producer connectionto localhost:1026 unsuccessfuljava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)atkafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17322 [Thread-1] WARN kafka.client.ClientUtils$
- Fetching topic metadatawith correlation id 811 for topics [Set(test-topic)] from broker[id:1,host:localhost,port:1026] failedjava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)atkafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17340 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 417353 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 517365 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 6......23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 98'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 99'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 0'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 1'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 3'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 4'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 5'
Shouldn't this be part of the contract?
It should be able to make surethis happens before shutting down, no?The leader writes messages to its local log and then the replicas consumemessages from the leader and write those to their local logs. If you setrequest.required.acks=1, the ack is sent to the producer only after theleader has written messages to its local log. What you are asking for, ispart of the contract if request.required.acks=-1.In this case, if we need to userequired.request.acks=-1, that will pretty much prevent any successfulmessage producing while any of the brokers for a partition is unavailable.&&So, I don't think that's an option.
(Not to mention the performancedegradation).You can implement reliable delivery semantics while allowing rollingrestart of brokers by setting request.required.acks=-1. When one of thereplicas is shut down, the ISR reduces to remove the replica being shutdown and the messages will be committed using the new ISR.Thanks,NehaOn Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg wrote:Neha,I'm not sure I understand.
I would have thought that if the leaderacknowledges receipt of a message, and is then shut down cleanly (withcontrolled shutdown enabled), that it would be able to reliably persist anyin memory buffered messages (and replicate them), before shutting down.Shouldn't this be part of the contract?
It should be able to make surethis happens before shutting down, no?I would understand a message dropped if it were a hard shutdown.I'm not sure then how to implement reliable delivery semantics, whileallowing a rolling restart of the broker cluster (or even to tolerate asingle node failure, where one node might be down for awhile and need to bereplaced or have a disk repaired).
In this case, if we need to userequired.request.acks=-1, that will pretty much prevent any successfulmessage producing while any of the brokers for a partition is unavailable.So, I don't think that's an option.
(Not to mention the performancedegradation).Is there not a way to make this work more reliably with leader onlyacknowledgment, and clean/controlled shutdown?My test does succeed, as expected, with acks = -1, at least for the 100 orso iterations I've let it run so far.
It does on occasion send duplicates(but that's ok with me).JasonOn Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede &neha.wrote: The occasional single message loss could happen sincerequired.request.acks=1 and the leader is shut down before the followergets a chance to copy the message. Can you try your test with num acks setto -1 ?Thanks,NehaOn Oct 4,
PM, &Jason Rosenberg& wrote:All,I'm having an issue with an integration test I've setup.
This is using0.8-beta1.The test is to verify that no messages are dropped (or the sender getsanexception thrown back if failure), while doing a rolling restart of acluster of 2 brokers.The producer is configured to use 'request.required.acks' = '1'.The servers are set up to run locally on localhost, on different ports, anddifferent data dirs.
The producer connects with a metadata brokerlistlike:
&localhost:2024,localhost:1025& (so no vip).
The servers aresetup with a default replication factor of 2.
The servers have controlledshutdown enabled, as well.The producer code looks like this:...Producer&Integer, T& producer = getProducer();try {KeyedMessage&Integer, T& msg = new KeyedMessage&Integer,T&(topic,message);producer.send(msg);} catch (RuntimeException e) {logger.warn(&Error sending data to kafka&, e);}...The test sends groups of messages at each stage of the test (e.g.serversup, first server going down, first server coming up, second servergoingdown, etc.).
Then a consumer connects and consumes all the messages,tomake sure they all arrived ok.It seems intermittently, a single message gets dropped, right after one ofthe servers starts going down.
It doesn't happen always, seems to happen 1out of every 20 test runs or so.
Here's some sample output.
I see theexception inside the producer code, but I don't see the producer.sendmethod ever having an exception thrown back out to the caller (the log line&Error sending data to kafka& is never triggered).What's interesting, is that it looks like the exceptions are happeningonmessage 3, but when the consumer subsequently consumes back all themessages in the broker cluster, it seems message 2 (and not message 3)ismissing:......7136 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 987150 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 3, message: 997163 [Thread-2] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Shutting down server27163 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 07164 [Thread-20] INFO com.squareup.kafka.ng.server.KafkaServer
- Shuttingdown KafkaServer7176 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 17189 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 27203 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 37394 [kafka-request-handler-5] WARN state.change.logger
- Broker received update metadata request with correlation id 7 fromanold controller
with epoch 2. Latest known controller epochis 37397 [kafka-request-handler-5] ERROR kafka.server.KafkaApis
-[KafkaApi-] error when handling requestName:UpdateMetadataRVersion:0;Controller:;ControllerEpoch:2;CorrelationId:7;ClientId:id_-host_null-port_1026;PartitionState:[test-topic,0]-&(LeaderAndIsrInfo:(Leader:,ISR:,LeaderEpoch:2,ControllerEpoch:2),ReplicationFactor:2),AllReplicas:);AliveBrokers:id:,host:192.168.1.105,port:1025,id:,host:192.168.1.105,port:1026kafka.common.ControllerMovedException: Broker
receivedupdatemetadata request with correlation id 7 from an old controller with epoch 2. Latest known controller epoch is 3atkafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:114)at kafka.server.KafkaApis.handle(KafkaApis.scala:72)at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)at java.lang.Thread.run(Thread.java:724)8039 [Controller--to-broker--send-thread] WARNkafka.controller.RequestSendThread
-[Controller--to-broker--send-thread], Controller fails to send a request to broker java.nio.channels.AsynchronousCloseExceptionatjava.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:205)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:387)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)atkafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:127)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)8041 [Thread-2] INFO com.squareup.kafka.ng.server.KafkaServer
- Shut downcomplete for KafkaServer17209 [Thread-1] WARN kafka.producer.async.DefaultEventHandler
-Failedtosend producer request with correlation id 810 to broker
with datafor partitions [test-topic,0]java.net.SocketTimeoutExceptionatsun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:226)at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)atjava.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)at kafka.utils.Utils$.read(Utils.scala:394)atkafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)at kafka.network.Receive$class.readCompletely(Transmission.scala:56)atkafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)atkafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:102)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)atkafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)atkafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:101)at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)at kafka.producer.SyncProducer.send(SyncProducer.scala:100)atkafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:244)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:107)atkafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:101)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)atscala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)at scala.collection.Iterator$class.foreach(Iterator.scala:631)atscala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)atscala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)atkafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17319 [Thread-1] ERROR kafka.producer.SyncProducer
- Producerconnectionto localhost:1026 unsuccessfuljava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)atkafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17322 [Thread-1] WARN kafka.client.ClientUtils$
- Fetching topic metadatawith correlation id 811 for topics [Set(test-topic)] from broker[id:1,host:localhost,port:1026] failedjava.net.ConnectException: Connection refusedat sun.nio.ch.Net.connect0(Native Method)at sun.nio.ch.Net.connect(Net.java:465)at sun.nio.ch.Net.connect(Net.java:457)at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)atkafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)atkafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)at kafka.producer.SyncProducer.send(SyncProducer.scala:112)at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)atkafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)atkafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)at kafka.utils.Utils$.swallow(Utils.scala:186)at kafka.utils.Logging$class.swallowError(Logging.scala:105)at kafka.utils.Utils$.swallowError(Utils.scala:45)atkafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)at kafka.producer.Producer.send(Producer.scala:74)at kafka.javaapi.producer.Producer.send(Producer.scala:32)atcom.squareup.kafka.ng.producer.KafkaProducer.sendMessage(KafkaProducer.java:138)atcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest$9.run(AbstractKafkaProducerWithServerTest.java:845)at java.lang.Thread.run(Thread.java:724)17340 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 417353 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 517365 [Thread-1] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Sending message: test-stage: 4, message: 6......23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 98'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 3, message: 99'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 0'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 1'23410 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 3'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 4'23411 [Thread-28] INFOcom.squareup.kafka.ng.producer.AbstractKafkaProducerWithServerTest
-Received msg 'test-stage: 4, message: 5'
Thanks for the explanation Neha.....still holding out hope.....So, if request.required.acks=-1, how does the leader confirm that the otherbrokers have consumed the message, before acking to the producer?
Does theleader just wait for the followers in the ISR to consume?
Or does theleader have a way to push, or ping the followers to consume?Couldn't that mechanism be used, during a clean shutdown, even if themessages were initially produced with acks=1? That is, when shutting down,get acks from all ISR members for each partition, before shutting down.I'm just a bit leery about using -1 across the board, because of theperformance hit (but for now it seems the best option to use for reliablesending).A separate question, can the request.required.acks be set to a higherpositive integer, say &2&, to indicate that 2 of say 3 replicas have acked?&&(&request.required.acks& in the name would seem to indicate this).
I'mnot saying I'd want to use this (we are hoping to use only a replicationfactor of 2).JasonOn Sat, Oct 5, 2013 at 1:00 PM, Neha Narkhede wrote:Shouldn't this be part of the contract?
It should be able to make surethis happens before shutting down, no?The leader writes messages to its local log and then the replicas consumemessages from the leader and write those to their local logs. If you setrequest.required.acks=1, the ack is sent to the producer only after theleader has written messages to its local log. What you are asking for, ispart of the contract if request.required.acks=-1.In this case, if we need to userequired.request.acks=-1, that will pretty much prevent any successfulmessage producing while any of the brokers for a partition is unavailable.So, I don't think that's an option.
(Not to mention the performancedegradation).You can implement reliable delivery semantics while allowing rollingrestart of brokers by setting request.required.acks=-1. When one of thereplicas is shut down, the ISR reduces to remove the replica being shutdown and the messages will be committed using the new ISR.Thanks,NehaOn Fri, Oct 4, 2013 at 11:51 PM, Jason Rosenberg wrote:Neha,I'm not sure I understand.
I would have thought that if the leaderacknowledges receipt of a message, and is then shut down cleanly (withcontrolled shutdown enabled), that it would be able to reliably persist anyin memory buffered messages (and replicate them), before shutting down.Shouldn't this be part of the contract?
It should be able to make surethis happens before shutting down, no?I would understand a message dropped if it were a hard shutdown.I'm not sure then how to implement reliable delivery semantics, whileallowing a rolling restart of the broker cluster (or even to tolerate asingle node failure, where one node might be down for awhile and need to bereplaced or have a disk repaired).
In this case, if we need to userequired.request.acks=-1, that will pretty much prevent any successfulmessage producing while any of the brokers for a partition isunavailable.So, I don't think that's an option.
(Not to mention the performancedegradation).Is there not a way to make this work more reliably with leader onlyacknowledgment, and clean/controlled shutdown?My test does succeed, as expected, with acks = -1, at least for the 100 orso iterations I've let it run so far.
It does on occasion sendduplicates(but that's ok with me).JasonOn Fri, Oct 4, 2013 at 6:38 PM, Neha Narkhede &neha.wrote: The occasional single message loss could happen sincerequired.request.acks=1 and the leader is shut down before the followergets a chance to copy the message. Can you try your test with num acks setto -1 ?Thanks,NehaOn Oct 4,
PM, &Jason Rosenberg& wrote:All,I'm having an issue with an integration test I've setup.
This isusing0.8-beta1.The test is to verify that no messages are dropped (or the sendergetsanexception thrown back if failure), while doing a rolling restart of acluster of 2 brokers.The producer is configured to use 'request.required.acks' = '1'.The servers are set up to run locally on localhost, on differentports,anddifferent data dirs.
The producer connects with a metadatabrokerlistlike:
&localhost:2024,localhost:1025& (so no vip).
The servers aresetup with a default replication factor of 2.
The servers havecontrolledshutdown enabled, as well.The producer code looks like this:...Producer&Integer, T& producer = getProducer

我要回帖

更多关于 java connect refused 的文章

 

随机推荐