凭证录入的详细过程详细些

查看: 13637|回复: 5
最后登录阅读权限10在线时间55 小时UID93787帖子第纳尔17 精华0互助0 荣誉0 贡献0 魅力0 鸡蛋0 注册时间
平民, 积分 7, 距离下一级还需 3 积分
UID93787帖子第纳尔17 精华0互助0 荣誉0 贡献0 魅力0 注册时间
我现在的任务已经提示为帮助王定六和三打祝家庄了
可是打完王定六的任务后 也没有什么提示
跑到祝家庄也没有什么反应
怎么开始触发三打祝家庄的任务啊
最后登录阅读权限20在线时间248 小时UID122188帖子第纳尔193 精华0互助3 荣誉0 贡献0 魅力3 鸡蛋0 注册时间
扈从, 积分 63, 距离下一级还需 36 积分
UID122188帖子第纳尔193 精华0互助3 荣誉0 贡献0 魅力3 注册时间
三打祝家庄的任务触发条件~
1、身上要有无字天书
2、队伍里要有扈三娘、顾大嫂、祝彪(注:读心术的使用CD是一天)
3、在祝家庄里休息,就会自动触发任务。
前传:过关条件1、敌人全灭&&2、石秀、杨雄、时迁至少有一人存活
敌人基本上都是义勇军系列的难度不大,只要玩家操作好一点,基本上可以过去。
一打祝家庄:过关条件1、找到7个宝箱&&2、石秀(这个时候由玩家扮演)要跑到城外的旗帜处
不要恋战,速度找到7个宝箱,然后速度去门口干掉小栾,出城跑到旗帜处就OK了,宝箱具体位置论坛上已经有介绍,可以搜索看一下。
二打祝家庄:过关条件1、敌人将领全灭(宋江、李逵、林冲、秦明、马麟、欧鹏、王英)2、扈三娘(玩家扮演)不能挂
没什么说的,挨着打就行,就是李黑子血多点
三打祝家庄:过关条件1、敌人全灭&&2、顾大嫂(玩家扮演)不能挂
出来以后速度去楼下监狱门口搞掉小栾,然后会放出来很多援军,再去门口打开城门就万事OK了,很简单。
完成三打祝家庄后会得到小栾,在你的队伍里
(把战斗AI和伤害都调低一点,不难过)
乐于助人, 品德高尚!
总评分:&互助 + 1&
最后登录阅读权限30在线时间657 小时UID63324帖子第纳尔444 精华0互助2 荣誉0 贡献0 魅力1 鸡蛋0 注册时间
见习骑士, 积分 118, 距离下一级还需 281 积分
UID63324帖子第纳尔444 精华0互助2 荣誉0 贡献0 魅力1 注册时间
ls得很详细了。一打的帖子,简单已经变过了,建议在渭州城里先练练。三大是最简单的。个人认为二大需要注意下下就ok了。
一面憧憬未来,一面回望过往。
最后登录阅读权限10在线时间55 小时UID93787帖子第纳尔17 精华0互助0 荣誉0 贡献0 魅力0 鸡蛋0 注册时间
平民, 积分 7, 距离下一级还需 3 积分
UID93787帖子第纳尔17 精华0互助0 荣誉0 贡献0 魅力0 注册时间
非常感谢以上二位 哈哈
最后登录阅读权限10在线时间48 小时UID97749帖子第纳尔14 精华0互助0 荣誉0 贡献0 魅力0 鸡蛋0 注册时间
平民, 积分 4, 距离下一级还需 6 积分
UID97749帖子第纳尔14 精华0互助0 荣誉0 贡献0 魅力0 注册时间
二打 有难度 但是好像后面可以选择跟上追宋江 于是你身后的家丁众会围殴林冲李逵 那就简单多了
最后登录阅读权限30在线时间410 小时UID116987帖子第纳尔872 精华0互助18 荣誉11 贡献3 魅力16 鸡蛋0 注册时间
见习骑士, 积分 391, 距离下一级还需 8 积分
UID116987帖子第纳尔872 精华0互助18 荣誉11 贡献3 魅力16 注册时间
一大朱家莊我就失敗了。真難啊,難道是我裝備不行?
原版正版用户勋章
原版正版用户勋章
Powered by奥数题95-94希望过程详细些
☆异鸣★1493
=1994×(5)-1995×(4)=×××-=0
为您推荐:
其他类似问题
95-94=**()=0对吧
=001-001=1*94)=0
95-94=*()=0
95-94 =**() =0
扫描下载二维码编译器优化过程具体是做了些什么,优化后的程序速度能提高多少百分比?
按投票排序
题主问的“编译器优化”有点指代不明啊,如果“编译器优化”是指对编译器自身的优化,那么Bootstrap(中文译为自举?)可能是一个例子。但我觉得题主问得更可能是编译器对程序的优化,类似于gcc -o3这样子的,试着举几个简单的课堂例子,用C语言代码给出例子帮助理解(实际上应该用IR),抛砖引玉了。高阶的还请蓝色大大和 大大补充了。编译器对程序的优化,大体上(我知道的)有这样几个原则/思路:编译器优化原则之一:能在编译阶段(compile time)做的工作,就不要在程序运行时(runtime)做。例1: Constant Propagation (常数传递?) & Constant Folding (中文常数折叠?)int freq = 32768;
int secs = 15;
int ticks = freq *
//假设后面freq和secs两个变量没有别重新赋值
Constant Propagation 优化后:(实际的优化应该在Intermediate Representation里,这里为了方便理解还是用C语言了)int ticks = 32768*15;
Constant Folding 优化后:int ticks = 491520;
//编译器将常数运算结果计算后赋值
这个例子很简单,相信大家都能看懂。程序优化前,有3个变量需要3个寄存器,一次乘法运算。程序优化后,只有1个变量需要一个寄存器,没有乘法运算。这个优化看起来很微不足道,但实际上用途很广。为了程序的可读性和可维护性,大多数程序员应该还是会选用第一种方式写3行程序而不是直接甩下一行int ticks = 491520让后来读程序的人摸不到头脑。有了编译器的优化,程序员既可以写出易读的程序又不必担心性能受影响。尤其是在嵌入式领域,很多低端芯片根本就没有硬件乘法器,如果程序不做上述优化可能这3行代码需要几十个cycle,优化过后一个cycle就搞定,你说性能差了多少? 而付出的代价,只是编译程序的时间稍微长了那么一点点,who cares?类似的优化:Dead code elimination, Common subexpression elimination等编译器优化原则之二:对循环优化,优化,再优化!例2:Loop-invariant code motion (例子来自Wiki)for (int i = 0; i & i++) {
a[i] = 6 * i + x *
很显然这个循环里,x这个变量被毫无疑义地赋值了n次,这一步骤完全可以在循环之前做。优化后:x = y +
for (int i = 0; i & i++) {
a[i] = 6 * i + t1;
100个循环,每个循环少执行了一条语句,就相当于是少执行了100条语句啊。要是循环1000次,1000次呢?有的仿真程序都是几十上百万次的循环,手贱多写了一条这样的语句你说对程序性能影响多大?你说你技术高超不会写出这样的程序,但你能保证所有人都不这样写吗?例3:Loop Unrolling (循环展开)int array[1024] = ...
for (int i = 0; i & 1024; i++) {
array[i] *= 2;
优化后:int array[1024] = ...
for (int i = 0; i & 1024; i+=4) {
array[i] *=2;
array[i+1] *=2;
array[i+2] *=2;
array[i+3] *=2;
看起来程序优化之后比以前还长了。但实际上,每执行一次循环,都要有一次条件的判断(i&1024)。如果不做优化,那么循环1024次,就要对条件判断1024次。优化后,只循环256次,那么判断的次数就减少到256次。而且这个优化可以将程序并行化,在运算资源充足的时候可以大幅提高运算速度和效率。当然代价是编译后代码Size变大。类似的优化:Software Pipelining, Strength Reduction等等。之所以对Loop进行着重优化,是因为有统计表明世界上的程序大部分时间都是跑在loop里的。loop里面优化了一小点,就会对整个程序的性能有很大提升。最后给一个实际应用的例子:最近在用某厂DSP芯片做项目,搞DSP的同学都知道DSP上的硬件浮点运算资源非常丰富,但因为程序是从别处扒来的没有针对DSP进行优化,编译器又比较蠢,所以DSP并没有发挥出实际性能。然后我们手动展开了几个比较核心的loop,这样运算资源得到充分利用,程序性能提升了3倍。嗯,就做了这一件事儿,性能提升了3倍。懂一点编译器的知识就是好~利益相关:不是搞编译器的,各位轻拍。
o0和o3差别可是相当的大
像Haskell的编译器,开了优化选项,能把指数级的复杂度优化成多项式…
优秀程序员让编译器无事可做:)
已有帐号?
无法登录?
社交帐号登录服务之星的过程怎样写-39的微博|图片|游记|
还没有收听的途说,到这里看
TA收听了 4 人
3 人收听了TA
TA放入行囊:
还没旅游计划
TA到此一游:
神马地方都还没去
TA参加的活动
TA还没有参加任何活动
来自 未知星球
发过1个地点途说,还没地盘呀,要学习盘古开天辟地!
服务之星的过程怎样写【老师QQ:16485492 】服务之星的过程怎样写我们是一个论文发表服务平台。主要提供代写论文和论文代写,发表论文,代写硕士论文等服务,成立至今写作论文五万余篇,赢得客户的广泛好评。以庞大的期刊库为依托,为用户提供论文发表期刊信息咨询服务。期刊几乎涵盖所有学术领域,包括核心、SCI、国家级、省级等各个种类。代理多家省级期刊、国家级期刊、统计源科技核心期刊、北大中文核心期刊、南大CSSCI期刊,可以推荐发表多专业职称论文,欢迎索取最新代理刊物清单。ylpxjcdljqdfr
发私信给:
私信内容:下次自动登录
现在的位置:
& 综合 & 正文
Hadoop读写过程详细代码
一、文件的打开
1.1、客户端
HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为:
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
其中dfs为DistributedFileSystem的成员变量DFSClient,其open函数被调用,其中创建一个DFSInputStream(src, buffersize, verifyChecksum)并返回。
在DFSInputStream的构造函数中,openInfo函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,实现如下:
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
this.locatedBlocks = newI
this.currentNode =
private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length) throws IOException {
return namenode.getBlockLocations(src, start, length);
LocatedBlocks主要包含一个链表的List&LocatedBlock& blocks,其中每个LocatedBlock包含如下信息:
· Block b:此block的信息
· long offset:此block在文件中的偏移量
· DatanodeInfo[] locs:此block位于哪些DataNode上
上面namenode.getBlockLocations是一个RPC调用,最终调用NameNode类的getBlockLocations函数。
1.2、NameNode
NameNode.getBlockLocations实现如下:
public LocatedBlocks getBlockLocations(String src,
long offset,
long length) throws IOException {
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
namesystem是NameNode一个成员变量,其类型为FSNamesystem,保存的是NameNode的name space树,其中一个重要的成员变量为FSDirectory dir。
FSDirectory和Lucene中的FSDirectory没有任何关系,其主要包括FSImage fsImage,用于读写硬盘上的fsimage文件,FSImage类有成员变量FSEditLog editLog,用于读写硬盘上的edit文件,这两个文件的关系在上一篇中已经解释过。
FSDirectory还有一个重要的成员变量INodeDirectoryWithQuota rootDir,INodeDirectoryWithQuota的父类为INodeDirectory,实现如下:
public class INodeDirectory extends INode {
private List&INode&
由此可见INodeDirectory本身是一个INode,其中包含一个链表的INode,此链表中,如果仍为文件夹,则是类型INodeDirectory,如果是文件,则是类型INodeFile,INodeFile中有成员变量BlockInfo blocks[],是此文件包含的block的信息。显然这是一棵树形的结构。
FSNamesystem.getBlockLocations函数如下:
public LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime) throws IOException {
final LocatedBlocks ret = getBlockLocationsInternal(src, dir.getFileINode(src),
offset, length, Integer.MAX_VALUE, doAccessTime);
dir.getFileINode(src)通过路径名从文件系统树中找到INodeFile,其中保存的是要打开的文件的INode的信息。
getBlockLocationsInternal的实现如下:
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
INodeFile inode,
long offset,
long length,
int nrBlocksToReturn,
boolean doAccessTime)
throws IOException {
//得到此文件的block信息
Block[] blocks = inode.getBlocks();
List&LocatedBlock& results = new ArrayList&LocatedBlock&(blocks.length);
//计算从offset开始,长度为length所涉及的blocks
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.
for (curBlk = 0; curBlk & nrB curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
if (curPos + blkSize & offset) {
//当offset在curPos和curPos + blkSize之间的时候,curBlk指向offset所在的block
curPos += blkS
long endOff = offset +
//循环,依次遍历从curBlk开始的每个block,直到当前位置curPos越过endOff
int numNodes = blocksMap.numNodes(blocks[curBlk]);
int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
boolean blockCorrupt = (numCorruptNodes == numNodes);
int numMachineSet = blockCorrupt ? numNodes :
(numNodes - numCorruptNodes);
//依次找到此block所对应的datanode,将其中没有损坏的放入machineSet中
DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
if (numMachineSet & 0) {
numNodes = 0;
for(Iterator&DatanodeDescriptor& it =
blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
DatanodeDescriptor dn = it.next();
boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
machineSet[numNodes++] =
//使用此machineSet和当前的block构造一个LocatedBlock
results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
blockCorrupt));
curPos += blocks[curBlk].getNumBytes();
} while (curPos & endOff
&& curBlk & blocks.length
&& results.size() & nrBlocksToReturn);
//使用此LocatedBlock链表构造一个LocatedBlocks对象返回
return inode.createLocatedBlocks(results);
1.3、客户端
通过RPC调用,在NameNode得到的LocatedBlocks对象,作为成员变量构造DFSInputStream对象,最后包装为FSDataInputStream返回给用户。
二、文件的读取
2.1、客户端
文件读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函数进行文件读操作。
FSDataInputStream会调用其封装的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函数,实现如下:
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
long filelen = getFileLength();
int realLen =
if ((position + length) & filelen) {
realLen = (int)(filelen - position);
//首先得到包含从offset到offset + length内容的block列表
//比如对于64M一个block的文件系统来说,欲读取从100M开始,长度为128M的数据,则block列表包括第2,3,4块block
List&LocatedBlock& blockRange = getBlockRange(position, realLen);
int remaining = realL
//对每一个block,从中读取内容
//对于上面的例子,对于第2块block,读取从36M开始,读取长度28M,对于第3块,读取整一块64M,对于第4块,读取从0开始,长度为36M,共128M数据
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
fetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset);
remaining -= bytesToR
position += bytesToR
offset += bytesToR
assert remaining == 0 : "Wrong number of bytes read.";
if (stats != null) {
stats.incrementBytesRead(realLen);
return realL
其中getBlockRange函数如下:
private synchronized List&LocatedBlock& getBlockRange(long offset,
long length)
throws IOException {
List&LocatedBlock& blockRange = new ArrayList&LocatedBlock&();
//首先从缓存的locatedBlocks中查找offset所在的block在缓存链表中的位置
int blockIdx = locatedBlocks.findBlock(offset);
if (blockIdx & 0) { // block is not cached
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
long remaining =
long curOff =
while(remaining & 0) {
LocatedBlock blk =
//按照blockIdx的位置找到block
if(blockIdx & locatedBlocks.locatedBlockCount())
blk = locatedBlocks.get(blockIdx);
//如果block为空,则缓存中没有此block,则直接从NameNode中查找这些block,并加入缓存
if (blk == null || curOff & blk.getStartOffset()) {
LocatedBlocks newB
newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
//如果block找到,则放入结果集
blockRange.add(blk);
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curO
remaining -= bytesR
curOff += bytesR
//取下一个block
blockIdx++;
return blockR
其中fetchBlockByteRange实现如下:
private void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset) throws IOException {
Socket dn =
int numAttempts = block.getLocations().
//此while循环为读取失败后的重试次数
while (dn == null && numAttempts-- & 0 ) {
//选择一个DataNode来读取数据
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = ;
InetSocketAddress targetAddr = retval.
BlockReader reader =
//创建Socket连接到DataNode
dn = socketFactory.createSocket();
dn.connect(targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
int len = (int) (end - start + 1);
//利用建立的Socket链接,生成一个reader负责从DataNode读取数据
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(),
block.getBlock().getGenerationStamp(),
start, len, buffersize,
verifyChecksum, clientName);
//读取数据
int nread = reader.readAll(buf, offset, len);
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
//如果读取失败,则将此DataNode标记为失败节点
addToDeadNodes(chosenNode);
BlockReader.newBlockReader函数实现如下:
public static BlockReader newBlockReader( Socket sock, String file,
long blockId,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
//使用Socket建立写入流,向DataNode发送读指令
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
Text.writeString(out, clientName);
out.flush();
//使用Socket建立读入流,用于从DataNode读取数据
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
DataChecksum checksum = DataChecksum.newDataChecksum( in );
long firstChunkOffset = in.readLong();
//生成一个reader,主要包含读入流,用于读取数据
return new BlockReader( file, blockId, in, checksum, verifyChecksum,
startOffset, firstChunkOffset, sock );
BlockReader的readAll函数就是用上面生成的DataInputStream读取数据。
2.2、DataNode
在DataNode启动的时候,会调用函数startDataNode,其中与数据读取有关的逻辑如下:
void startDataNode(Configuration conf,
AbstractList&File& dataDirs
) throws IOException {
// 建立一个ServerSocket,并生成一个DataXceiverServer来监控客户端的链接
ServerSocket ss = (socketWriteTimeout & 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
this.dnRegistration.setName(machineName + ":" + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
DataXceiverServer.run()函数如下:
public void run() {
while (datanode.shouldRun) {
//接受客户端的链接
Socket s = ss.accept();
s.setTcpNoDelay(true);
//生成一个线程DataXceiver来对建立的链接提供服务
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "
+ StringUtils.stringifyException(ie));
DataXceiver.run()函数如下:
public void run() {
DataInputStream in=
//建立一个输入流,读取客户端发送的指令
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
short version = in.readShort();
boolean local = s.getInetAddress().equals(s.getLocalAddress());
byte op = in.readByte();
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
long startTime = DataNode.now();
switch ( op ) {
case DataTransferProtocol.OP_READ_BLOCK:
//真正的读取数据
readBlock( in );
datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.readsFromLocalClient.inc();
datanode.myMetrics.readsFromRemoteClient.inc();
case DataTransferProtocol.OP_WRITE_BLOCK:
//真正的写入数据
writeBlock( in );
datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
if (local)
datanode.myMetrics.writesFromLocalClient.inc();
datanode.myMetrics.writesFromRemoteClient.inc();
//其他的指令
} catch (Throwable t) {
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
} finally {
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
private void readBlock(DataInputStream in) throws IOException {
//读取指令
long blockId = in.readLong();
Block block = new Block( blockId, 0 , in.readLong());
long startOffset = in.readLong();
long length = in.readLong();
String clientName = Text.readString(in);
//创建一个写入流,用于向客户端写数据
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
//生成BlockSender用于读取本地的block的数据,并发送给客户端
//BlockSender有一个成员变量InputStream blockIn用于读取本地block的数据
BlockSender blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
//向客户端写入数据
long read = blockSender.sendBlock(out, baseStream, null);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
三、文件的写入
下面解析向hdfs上传一个文件的过程。
3.1、客户端
上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite,
int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return new FSDataOutputStream
(dfs.create(getPathName(f), permission,
overwrite, replication, blockSize, progress, bufferSize),
statistics);
其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:
public OutputStream create(String src,
FsPermission permission,
boolean overwrite,
short replication,
long blockSize,
Progressable progress,
int buffersize
) throws IOException {
checkOpen();
if (permission == null) {
permission = FsPermission.getDefault();
FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
OutputStream result = new DFSOutputStream(src, masked,
overwrite, replication, blockSize, progress, buffersize,
conf.getInt("io.bytes.per.checksum", 512));
leasechecker.put(src, result);
其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。
DFSOutputStream(String src, FsPermission masked, boolean overwrite,
short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum) throws IOException {
this(src, blockSize, progress, bytesPerChecksum);
computePacketChunkSize(writePacketSize, bytesPerChecksum);
namenode.create(
src, masked, clientName, overwrite, replication, blockSize);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
QuotaExceededException.class);
streamer.start();
3.2、NameNode
NameNode的create函数调用namesystem.startFile函数,其又调用startFileInternal函数,实现如下:
private synchronized void startFileInternal(String src,
PermissionStatus permissions,
String holder,
String clientMachine,
boolean overwrite,
boolean append,
short replication,
long blockSize
) throws IOException {
//创建一个新的文件,状态为under construction,没有任何data block与之对应
long genstamp = nextGenerationStamp();
INodeFileUnderConstruction newNode = dir.addFile(src, permissions,
replication, blockSize, holder, clientMachine, clientNode, genstamp);
3.3、客户端
下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write函数,最终会调用DFSOutputStream的writeChunk函数:
按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:
· 首先将package 1写入DataNode 1
· 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
· 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
· 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕
protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)
throws IOException {
//创建一个package,并写入数据
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
currentPacket.writeChecksum(checksum, 0, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.numChunks++;
bytesCurBlock +=
//如果此package已满,则放入队列中准备发送
if (currentPacket.numChunks == currentPacket.maxChunks ||
bytesCurBlock == blockSize) {
dataQueue.addLast(currentPacket);
//唤醒等待dataqueue的传输线程,也即DataStreamer
dataQueue.notifyAll();
currentPacket =
DataStreamer的run函数如下:
public void run() {
while (!closed && clientRunning) {
Packet one =
synchronized (dataQueue) {
//如果队列中没有package,则等待
while ((!closed && !hasError && clientRunning
&& dataQueue.size() == 0) || doSleep) {
dataQueue.wait(1000);
} catch (InterruptedException e) {
//得到队列中的第一个package
one = dataQueue.getFirst();
long offsetInBlock = one.offsetInB
//由NameNode分配block,并生成一个写入流指向此block
if (blockStream == null) {
nodes = nextBlockOutputStream(src);
response = new ResponseProcessor(nodes);
response.start();
ByteBuffer buf = one.getBuffer();
//将package从dataQueue移至ackQueue,等待确认
dataQueue.removeFirst();
dataQueue.notifyAll();
synchronized (ackQueue) {
ackQueue.addLast(one);
ackQueue.notifyAll();
//利用生成的写入流将数据写入DataNode中的block
blockStream.write(buf.array(), buf.position(), buf.remaining());
if (one.lastPacketInBlock) {
blockStream.writeInt(0); //表示此block写入完毕
blockStream.flush();
} catch (Throwable e) {
其中重要的一个函数是nextBlockOutputStream,实现如下:
private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
LocatedBlock lb =
boolean retry =
DatanodeInfo[]
int count = conf.getInt("dfs.client.block.write.retries", 3);
//由NameNode为文件分配DataNode和block
lb = locateFollowingBlock(startTime);
block = lb.getBlock();
nodes = lb.getLocations();
//创建向DataNode的写入流
success = createBlockOutputStream(nodes, clientName, false);
} while (retry && --count &= 0);
locateFollowingBlock中通过RPC调用namenode.addBlock(src, clientName)函数
3.4、NameNode
NameNode的addBlock函数实现如下:
public LocatedBlock addBlock(String src,
String clientName) throws IOException {
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
return locatedB
FSNamesystem的getAdditionalBlock实现如下:
public LocatedBlock getAdditionalBlock(String src,
String clientName
) throws IOException {
long fileLength, blockS
DatanodeDescriptor clientNode =
Block newBlock =
//为新的block选择DataNode
DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
clientNode,
blockSize);
//得到文件路径中所有path的INode,其中最后一个是新添加的文件对的INode,状态为under construction
INode[] pathINodes = dir.getExistingPathINodes(src);
int inodesLen = pathINodes.
INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction)
pathINodes[inodesLen - 1];
//为文件分配block, 并设置在那写DataNode上
newBlock = allocateBlock(src, pathINodes);
pendingFile.setTargets(targets);
return new LocatedBlock(newBlock, targets, fileLength);
3.5、客户端
在分配了DataNode和block以后,createBlockOutputStream开始写入数据。
private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
boolean recoveryFlag) {
//创建一个socket,链接DataNode
InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
s = socketFactory.createSocket();
int timeoutValue = 3000 * nodes.length + socketT
s.connect(target, timeoutValue);
s.setSoTimeout(timeoutValue);
s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
datanodeWriteT
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout),
DataNode.SMALL_BUFFER_SIZE));
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
//写入指令
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_WRITE_BLOCK );
out.writeLong( block.getBlockId() );
out.writeLong( block.getGenerationStamp() );
out.writeInt( nodes.length );
out.writeBoolean( recoveryFlag );
Text.writeString( out, client );
out.writeBoolean(false);
out.writeInt( nodes.length - 1 );
//注意,次循环从1开始,而非从0开始。将除了第一个DataNode以外的另外两个DataNode的信息发送给第一个DataNode, 第一个DataNode可以根据此信息将数据写给另两个DataNode
for (int i = 1; i & nodes. i++) {
nodes[i].write(out);
checksum.writeHeader( out );
out.flush();
firstBadLink = Text.readString(blockReplyStream);
if (firstBadLink.length() != 0) {
throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
blockStream =
客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将数据写入DataNode
3.6、DataNode
DataNode的DataXceiver中,收到指令DataTransferProtocol.OP_WRITE_BLOCK则调用writeBlock函数:
private void writeBlock(DataInputStream in) throws IOException {
DatanodeInfo srcDataNode =
//读入头信息
Block block = new Block(in.readLong(),
dataXceiverServer.estimateBlockSize, in.readLong());
int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
boolean isRecovery = in.readBoolean(); // is this part of recovery?
String client = Text.readString(in); // working on behalf of this client
boolean hasSrcDataNode = in.readBoolean(); // is src node info present
if (hasSrcDataNode) {
srcDataNode = new DatanodeInfo();
srcDataNode.readFields(in);
int numTargets = in.readInt();
if (numTargets & 0) {
throw new IOException("Mislabelled incoming datastream.");
//读入剩下的DataNode列表,如果当前是第一个DataNode,则此列表中收到的是第二个,第三个DataNode的信息,如果当前是第二个DataNode,则受到的是第三个DataNode的信息
DatanodeInfo targets[] = new DatanodeInfo[numTargets];
for (int i = 0; i & targets. i++) {
DatanodeInfo tmp = new DatanodeInfo();
tmp.readFields(in);
targets[i] =
DataOutputStream mirrorOut = // stream to next target
DataInputStream mirrorIn = // reply from next target
DataOutputStream replyOut = // stream to prev target
Socket mirrorSock = // socket to next target
BlockReceiver blockReceiver = // responsible for data handling
String mirrorNode = // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
//生成一个BlockReceiver, 其有成员变量DataInputStream in为从客户端或者上一个DataNode读取数据,还有成员变量DataOutputStream mirrorOut,用于向下一个DataNode写入数据,还有成员变量OutputStream out用于将数据写入本地。
blockReceiver = new BlockReceiver(block, in,
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
isRecovery, client, srcDataNode, datanode);
// get a connection back to the previous target
replyOut = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
//如果当前不是最后一个DataNode,则同下一个DataNode建立socket连接
if (targets.length & 0) {
InetSocketAddress mirrorTarget =
// Connect to backup machine
mirrorNode = targets[0].getName();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
int timeoutValue = numTargets * datanode.socketT
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
mirrorSock.connect(mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
//创建向下一个DataNode写入数据的流
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
mirrorOut.writeLong( block.getGenerationStamp() );
mirrorOut.writeInt( pipelineSize );
mirrorOut.writeBoolean( isRecovery );
Text.writeString( mirrorOut, client );
mirrorOut.writeBoolean(hasSrcDataNode);
if (hasSrcDataNode) { // pass src node information
srcDataNode.write(mirrorOut);
mirrorOut.writeInt( targets.length - 1 );
//此出也是从1开始,将除了下一个DataNode的其他DataNode信息发送给下一个DataNode
for ( int i = 1; i & targets. i++ ) {
targets[i].write( mirrorOut );
blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
//使用BlockReceiver接受block
String mirrorAddr = (mirrorSock == null) ? null : mirrorN
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets.length);
} finally {
// close all opened streams
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
BlockReceiver的receiveBlock函数中,一段重要的逻辑如下:
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, BlockTransferThrottler throttlerArg,
int numTargets) throws IOException {
//不断的接受package,直到结束
while (receivePacket() & 0) {}
if (mirrorOut != null) {
mirrorOut.writeInt(0); // mark the end of the block
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
BlockReceiver的receivePacket函数如下:
private int receivePacket() throws IOException {
//从客户端或者上一个节点接收一个package
int payloadLen = readNextPacket();
buf.mark();
//read the header
buf.getInt(); // packet length
offsetInBlock = buf.getLong(); // get offset of packet in block
long seqno = buf.getLong(); // get seqno
boolean lastPacketInBlock = (buf.get() != 0);
int endOfHeader = buf.position();
buf.reset();
setBlockPosition(offsetInBlock);
//将package写入下一个DataNode
if (mirrorOut != null) {
mirrorOut.write(buf.array(), buf.position(), buf.remaining());
mirrorOut.flush();
} catch (IOException e) {
handleMirrorOutError(e);
buf.position(endOfHeader);
int len = buf.getInt();
offsetInBlock +=
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
int checksumOff = buf.position();
int dataOff = checksumOff + checksumL
byte pktBuf[] = buf.array();
buf.position(buf.limit()); // move to the end of the data.
//将数据写入本地的block
out.write(pktBuf, dataOff, len);
/// flush entire packet before sending ack
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
lastPacketInBlock);
return payloadL
&&&&推荐文章:
【上篇】【下篇】

我要回帖

更多关于 自制狗粮详细过程 的文章

 

随机推荐