public class ShortCircuit { statjava public booleann test1(int val) ......

hadoop git commit: HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)
hadoop-common-commits mailing list archives
Message view
cmcc...@apache.org
hadoop git commit: HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the DFSClient about it because of a network error (cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 6fdef76cc -& 5aa892ed4
HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and fail to tell the
DFSClient about it because of a network error (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5aa892ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5aa892ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5aa892ed
Branch: refs/heads/trunk
Commit: 5aa892ed486d42ae6b94cb382ea640
Parents: 6fdef76
Author: Colin Patrick Mccabe &&
Authored: Fri Mar 13 18:29:49
Committer: Colin Patrick Mccabe &&
Committed: Fri Mar 13 18:29:49
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
.../apache/hadoop/hdfs/BlockReaderFactory.java
| 23 ++++-
.../java/org/apache/hadoop/hdfs/DFSClient.java
.../datatransfer/DataTransferProtocol.java
.../hdfs/protocol/datatransfer/Receiver.java
.../hdfs/protocol/datatransfer/Sender.java
.../hdfs/server/datanode/DataXceiver.java
| 95 ++++++++++++--------
.../server/datanode/ShortCircuitRegistry.java
.../src/main/proto/datatransfer.proto
.../shortcircuit/TestShortCircuitCache.java
| 63 +++++++++++++
10 files changed, 178 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c3f9367..ff00b0c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -77,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7722. DataNode#checkDiskError should also remove Storage when error
is found. (Lei Xu via Colin P. McCabe)
HDFS-7915. The DataNode can sometimes allocate a ShortCircuitShm slot and
fail to tell the DFSClient about it because of a network error (cmccabe)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
index ba48c79..1e915b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import java.io.BufferedOutputS
import java.io.DataInputS
import java.io.DataOutputS
@@ -69,6 +71,12 @@ import mon.base.P
public class BlockReaderFactory implements ShortCircuitReplicaCreator {
static final Log LOG = LogFactory.getLog(BlockReaderFactory.class);
public static class FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
// do nothing
@VisibleForTesting
static ShortCircuitReplicaCreator
createShortCircuitReplicaInfoCallback =
@@ -76,6 +84,11 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
private final DFSClient.C
* Injects failures into specific operations during unit tests.
private final FailureInjector failureI
* The file name, for logging and debugging purposes.
private String fileN
@@ -169,6 +182,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
public BlockReaderFactory(DFSClient.Conf conf) {
this.conf =
this.failureInjector = conf.brfFailureI
this.remainingCacheTries = conf.nCachedConnR
@@ -518,11 +532,12 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId();
new Sender(out).requestShortCircuitFds(block, token, slotId, 1);
new Sender(out).requestShortCircuitFds(block, token, slotId, 1, true);
DataInputStream in = new DataInputStream(peer.getInputStream());
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelper.vintPrefixed(in));
DomainSocket sock = peer.getDomainSocket();
failureInjector.injectRequestFileDescriptorsFailure();
switch (resp.getStatus()) {
case SUCCESS:
byte buf[] = new byte[1];
@@ -532,8 +547,13 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
ExtendedBlockId key =
new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
if (buf[0] == USE_RECEIPT_VERIFICATION.getNumber()) {
LOG.trace("Sending receipt verification byte for slot " + slot);
sock.getOutputStream().write(0);
replica = new ShortCircuitReplica(key, fis[0], fis[1], cache,
Time.monotonicNow(), slot);
return new ShortCircuitReplicaInfo(replica);
} catch (IOException e) {
// This indicates an error reading from disk, or a format error.
// it's not a socket communication problem, we return null rather than
@@ -545,7 +565,6 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator
IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]);
return new ShortCircuitReplicaInfo(replica);
case ERROR_UNSUPPORTED:
if (!resp.hasShortCircuitAccessVersion()) {
LOG.warn("short-circuit read access is disabled for " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index aac7b51..f970fef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -337,6 +337,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final long shortCircuitCacheStaleThresholdMs;
final long keyProviderCacheExpiryMs;
public BlockReaderFactory.FailureInjector brfFailureInjector =
new BlockReaderFactory.FailureInjector();
public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
index 4be42a8..48e931d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -138,10 +138,13 @@ public interface DataTransferProtocol {
to use no slot id.
* @param maxVersion
Maximum version of the block data the client
can understand.
* @param supportsReceiptVerification
True if the client supports
receipt verification.
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token&BlockTokenIdentifier& blockToken,
SlotId slotId, int maxVersion) throws IOE
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOE
* Release a pair of short-circuit FDs requested earlier.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
index bdc5e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -186,7 +186,7 @@ public abstract class Receiver implements DataTransferProtocol {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion());
slotId, proto.getMaxVersion(), true);
} finally {
if (traceScope != null) traceScope.close();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
index 7fea33e..df
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -181,7 +181,8 @@ public class Sender implements DataTransferProtocol {
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token&BlockTokenIdentifier& blockToken,
SlotId slotId, int maxVersion) throws IOException {
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
OpRequestShortCircuitAccessProto.Builder builder =
OpRequestShortCircuitAccessProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildBaseHeader(
@@ -189,6 +190,7 @@ public class Sender implements DataTransferProtocol {
if (slotId != null) {
builder.setSlotId(PBHelper.convert(slotId));
builder.setSupportsReceiptVerification(supportsReceiptVerification);
OpRequestShortCircuitAccessProto proto = builder.build();
send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index e504fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ER
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.
@@ -291,64 +293,83 @@ class DataXceiver extends Receiver implements Runnable {
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token&BlockTokenIdentifier& token,
SlotId slotId, int maxVersion) throws IOException {
SlotId slotId, int maxVersion, boolean supportsReceiptVerification)
throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] =
SlotId registeredSlotId =
boolean success =
if (peer.getDomainSocket() == null) {
throw new IOException("You cannot pass file descriptors over " +
"anything but a UNIX domain socket.");
if (slotId != null) {
boolean isCached = datanode.data.
isCached(blk.getBlockPoolId(), blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
} finally {
if ((fis == null) && (slotId != null)) {
datanode.shortCircuitRegistry.unregisterSlot(slotId);
if (peer.getDomainSocket() == null) {
throw new IOException("You cannot pass file descriptors over " +
"anything but a UNIX domain socket.");
if (slotId != null) {
boolean isCached = datanode.data.
isCached(blk.getBlockPoolId(), blk.getBlockId());
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
registeredSlotId = slotId;
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
Preconditions.checkState(fis != null);
bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
bld.setMessage(e.getMessage());
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
bld.setMessage(e.getMessage());
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
bld.build().writeDelimitedTo(socketOut);
if (fis != null) {
FileDescriptor fds[] = new FileDescriptor[fis.length];
for (int i = 0; i & fds. i++) {
fds[i] = fis[i].getFD();
byte buf[] = new byte[] { (byte)0 };
peer.getDomainSocket().
sendFileDescriptors(fds, buf, 0, buf.length);
byte buf[] = new byte[1];
if (supportsReceiptVerification) {
buf[0] = (byte)USE_RECEIPT_VERIFICATION.getNumber();
buf[0] = (byte)DO_NOT_USE_RECEIPT_VERIFICATION.getNumber();
DomainSocket sock = peer.getDomainSocket();
sock.sendFileDescriptors(fds, buf, 0, buf.length);
if (supportsReceiptVerification) {
LOG.trace("Reading receipt verification byte for " + slotId);
int val = sock.getInputStream().read();
if (val & 0) {
throw new EOFException();
LOG.trace("Receipt verification is not enabled on the DataNode.
"Not verifying " + slotId);
} finally {
if ((!success) && (registeredSlotId != null)) {
("Unregistering " + registeredSlotId + " because the " +
"requestShortCircuitFdsForRead operation failed.");
datanode.shortCircuitRegistry.unregisterSlot(registeredSlotId);
if (ClientTraceLog.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId());
(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b",
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
blk.getBlockId(), dnR.getDatanodeUuid(), success));
if (fis != null) {
IOUtils.cleanup(LOG, fis);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
index 32906f4..b32c0d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java
@@ -30,6 +30,7 @@ import java.util.HashS
import java.util.I
import java.util.S
+import mon.annotations.VisibleForT
import mons.io.IOU
import mons.logging.L
import mons.logging.LogF
@@ -83,7 +84,7 @@ public class ShortCircuitRegistry {
private static final int SHM_LENGTH = 8192;
private static class RegisteredShm extends ShortCircuitShm
public static class RegisteredShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
private final String clientN
private final ShortCircuitR
@@ -383,4 +384,14 @@ public class ShortCircuitRegistry {
IOUtils.closeQuietly(watcher);
public static interface Visitor {
void accept(HashMap&ShmId, RegisteredShm& segments,
HashMultimap&ExtendedBlockId, Slot& slots);
@VisibleForTesting
public synchronized void visit(Visitor visitor) {
visitor.accept(segments, slots);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
index d72bb5e1..644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto
@@ -179,6 +179,12 @@ message OpRequestShortCircuitAccessProto {
* The shared memory slot to use, if we are using one.
optional ShortCircuitShmSlotProto slotId = 3;
* True if the client supports verifying that the file descriptor has been
* sent successfully.
optional bool supportsReceiptVerification = 4 [default = false];
message ReleaseShortCircuitAccessRequestProto {
@@ -230,6 +236,11 @@ enum Status {
IN_PROGRESS = 12;
+enum ShortCircuitFdResponse {
DO_NOT_USE_RECEIPT_VERIFICATION = 0;
USE_RECEIPT_VERIFICATION = 1;
message PipelineAckProto {
required sint64 seqno = 1;
repeated uint32 reply = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5aa892ed/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
index bfa871c..7daabd0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java
@@ -36,13 +36,16 @@ import java.util.HashM
import java.util.I
import java.util.M
+import mon.collect.HashM
import mons.lang.mutable.MutableB
import mons.logging.L
import mons.logging.LogF
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.P
+import org.apache.hadoop.hdfs.BlockReaderF
import org.apache.hadoop.hdfs.BlockReaderTestU
+import org.apache.hadoop.hdfs.DFSC
import org.apache.hadoop.hdfs.DFSInputS
import org.apache.hadoop.hdfs.DFSTestU
import org.apache.hadoop.hdfs.DistributedFileS
@@ -52,11 +55,14 @@ import org.apache.hadoop.hdfs.net.DomainP
import org.apache.hadoop.hdfs.protocol.DatanodeI
import org.apache.hadoop.hdfs.protocol.ExtendedB
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataH
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitR
+import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredS
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorI
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.V
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.CacheV
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitCache.ShortCircuitReplicaC
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.S
+import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.io.IOU
import org.apache.hadoop.net.unix.DomainS
import org.apache.hadoop.net.unix.TemporarySocketD
@@ -615,4 +621,61 @@ public class TestShortCircuitCache {
cluster.shutdown();
sockDir.close();
public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector {
public void injectRequestFileDescriptorsFailure() throws IOException {
throw new IOException("injected I/O error");
// Regression test for HDFS-7915
@Test(timeout=60000)
public void testDataXceiverCleansUpSlotsOnFailure() throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverCleansUpSlotsOnFailure", sockDir);
conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY,
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
final Path TEST_PATH2 = new Path("/test_file2");
final int TEST_FILE_LEN = 4096;
final int SEED = 0xFADE1;
DFSTestUtil.createFile(fs, TEST_PATH1, TEST_FILE_LEN,
(short)1, SEED);
DFSTestUtil.createFile(fs, TEST_PATH2, TEST_FILE_LEN,
(short)1, SEED);
// The first read should allocate one shared memory segment and slot.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// The second read should fail, and we should only have 1 segment and 1 slot
fs.getClient().getConf().brfFailureInjector =
new TestCleanupFailureInjector();
DFSTestUtil.readFileBuffer(fs, TEST_PATH2);
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
ShortCircuitRegistry registry =
cluster.getDataNodes().get(0).getShortCircuitRegistry();
registry.visit(new ShortCircuitRegistry.Visitor() {
public void accept(HashMap&ShmId, RegisteredShm& segments,
HashMultimap&ExtendedBlockId, Slot& slots) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(1, slots.size());
cluster.shutdown();
sockDir.close();
(inline, 7-Bit, 26226 bytes)

我要回帖

更多关于 static boolean 的文章

 

随机推荐