Skip to main content

moregeek program

使用hadoop patch包修复的一次经历_江南独孤客的博客-多极客编程

原因:

因为公司的平台的数据量在30P左右,使用了Hadoop3.1.2的版本,而且使用的纠删码功能,报错信息如下:

java.io.IOException: Unexpected EOS from the reader
at org.apache.hadoop.hdfs.StripeReader.readToBuffer(StripeReader.java:241)
at org.apache.hadoop.hdfs.StripeReader.lambda$readCells$0(StripeReader.java:281)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

经查询官网,是因为原生存在bug

bug链接地址:​​https://issues.apache.org/jira/browse/HDFS-14373​

下载针对修复3.1.x版本的patch包

使用Hadoop patch包修复的一次经历_hdfs

修复代码

打开patch包,查看一下包放置的路径和需要修复的代码

使用Hadoop patch包修复的一次经历_hdfs_02

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index 168b48c..e840da9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -247,6 +247,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Found Checksum error for "
+ currentBlock + " from " + currentNode
+ " at " + ce.getPos());
+ //Clear buffer to make next decode success
+ strategy.getReadBuffer().clear();
// we want to remember which block replicas we have tried
corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
throw ce;
@@ -254,6 +256,8 @@ private int readToBuffer(BlockReader blockReader,
DFSClient.LOG.warn("Exception while reading from "
+ currentBlock + " of " + dfsStripedInputStream.getSrc() + " from "
+ currentNode, e);
+ //Clear buffer to make next decode success
+ strategy.getReadBuffer().clear();
throw e;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 4c2ff92..2b09a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -355,7 +355,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);

// Step 3: merge into stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);

// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
@@ -416,7 +417,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);

// Step 3: merge into at most 5 stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);

// Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@@ -512,7 +514,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
* {@link AlignedStripe} instances.
*/
private static AlignedStripe[] mergeRangesForInternalBlocks(
- ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
+ ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
+ LocatedStripedBlock blockGroup, int cellSize) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
List<AlignedStripe> stripes = new ArrayList<>();
@@ -524,6 +527,17 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
}
}

+ // Add block group last cell offset in stripePoints if it is fall in to read
+ // offset range.
+ int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize);
+ int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits();
+ long lastCellEndOffset = (idxInInternalBlk * (long)cellSize)
+ + (blockGroup.getBlockSize() % cellSize);
+ if (stripePoints.first() < lastCellEndOffset
+ && stripePoints.last() > lastCellEndOffset) {
+ stripePoints.add(lastCellEndOffset);
+ }
+
long prev = -1;
for (long point : stripePoints) {
if (prev >= 0) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 48ecf9a..d50d482 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -19,8 +19,11 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -561,6 +564,50 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
}
}

+ @Test
+ public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
+ throws IOException {
+ DataNodeProperties stopDataNode = null;
+ try {
+ cluster.waitActive();
+ ErasureCodingPolicy policy = getEcPolicy();
+ DistributedFileSystem filesystem = cluster.getFileSystem();
+ filesystem.enableErasureCodingPolicy(policy.getName());
+ Path dir = new Path("/tmp");
+ filesystem.mkdirs(dir);
+ filesystem.getClient().setErasureCodingPolicy(dir.toString(),
+ policy.getName());
+ Path f = new Path(dir, "file");
+
+ //1. File with one stripe, last data cell should be half filed.
+ long fileLength = (policy.getCellSize() * policy.getNumDataUnits())
+ - (policy.getCellSize() / 2);
+ DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0);
+
+ //2. Stop first DN from stripe.
+ LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations(
+ f.toString(), 0, fileLength);
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+ final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg,
+ cellSize, dataBlocks, parityBlocks);
+ cluster.stopDataNode(blocks[0].getLocations()[0].getName());
+
+ //3. Do pread for fist cell, reconstruction should happen
+ try (FSDataInputStream in = filesystem.open(f)) {
+ DFSStripedInputStream stripedIn = (DFSStripedInputStream) in
+ .getWrappedStream();
+ byte[] b = new byte[policy.getCellSize()];
+ stripedIn.read(0, b, 0, policy.getCellSize());
+ }
+ } catch (HadoopIllegalArgumentException e) {
+ fail(e.getMessage());
+ } finally {
+ if (stopDataNode != null) {
+ cluster.restartDataNode(stopDataNode, true);
+ }
+ }
+ }
+
/**
* Empties the pool for the specified buffer type, for the current ecPolicy.
* <p>

注意:这个是修复/hadoop-hdfs-project/项目下的代码

红色部分有git的a和b路径,一定要在建立a和b的路径,把这个项目分别复制到这两个文件夹下,需要修复的时候提示找不到路径

使用Hadoop patch包修复的一次经历_hdfs_03

代码修复

使用patch -p1 < HDFS-14373.branch-3.1.patch

注意,此patch包一定要放到hadoop-rel-release-3.1.2 目录下,否则也会提示找不到路径

出现以下证明代码修复成功

使用Hadoop patch包修复的一次经历_hadoop_04

程序中修复的代码

StripedBlockUtil.java中修复的部分

iff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 4c2ff92..2b09a7f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -355,7 +355,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);

// Step 3: merge into stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);

// Step 4: calculate each chunk's position in destination buffer. Since the
// whole read range is within a single stripe, the logic is simpler here.
@@ -416,7 +417,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
cells);

// Step 3: merge into at most 5 stripes
- AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges);
+ AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges,
+ blockGroup, cellSize);

// Step 4: calculate each chunk's position in destination buffer
calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf);
@@ -512,7 +514,8 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
* {@link AlignedStripe} instances.
*/
private static AlignedStripe[] mergeRangesForInternalBlocks(
- ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) {
+ ErasureCodingPolicy ecPolicy, VerticalRange[] ranges,
+ LocatedStripedBlock blockGroup, int cellSize) {
int dataBlkNum = ecPolicy.getNumDataUnits();
int parityBlkNum = ecPolicy.getNumParityUnits();
List<AlignedStripe> stripes = new ArrayList<>();
@@ -524,6 +527,17 @@ public static long spaceConsumedByStripedBlock(long numDataBlkBytes,
}
}

下面java代码中修复的部分(只摘取了一小段)


使用Hadoop patch包修复的一次经历_java_05

原来的代码

使用Hadoop patch包修复的一次经历_hdfs_06

打包

找到Apache Hadoop HDFS Client的maven进行打包

使用Hadoop patch包修复的一次经历_hadoop_07

打包成功

使用Hadoop patch包修复的一次经历_hadoop_08

打成功的jar

使用Hadoop patch包修复的一次经历_java_09

打成功后替换jar包上线


©著作权归作者所有:来自51CTO博客作者江南独孤客的原创作品,请联系作者获取转载授权,否则将追究法律责任

跨集群distcp数据的时候报错异常java.nio.channels.unresolvedaddressexception_江南独孤客的博客-多极客编程

异常2022-10-10 19:41:55,541 INFO [IPC Server handler 17 on 33142] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1663912973547_0434_m_000000_0: Error: java.io.IOExcept

kerberos开启后服务报错unable to obtain password from user_江南独孤客的博客-多极客编程

异常日志022-10-09 10:26:16,230 ERROR org.apache.hadoop.hdfs.qjournal.server.JournalNode: Failed to start journalnode.org.apache.hadoop.security.KerberosAuthException: failure to login: for principal: jn/h

火山引擎在行为分析场景下的clickhouse join优化_字节跳动数据平台的博客-多极客编程

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群背景火山引擎增长分析DataFinder基于ClickHouse来进行行为日志的分析,ClickHouse的主要版本是基于社区版改进开发的字节内部版本。主要的表结构:事件表:存储用户行为数据,以用户ID分shard存储。--列出了主要的字段信息CREATE TABLE tob_apps_all( `tea_a

标签评分:海量标签如何进行系统治理?_mb605311eb9631f的博客-多极客编程

本篇是「标签画像系列」的第四篇,此前我们已经介绍过了标签画像体系建设方法论、标签体系设计与加工、标签加工与落库,这次我们来介绍一下「标签评分」。标签评分是标签治理的一个重要措施,通过给标签打分,可清晰直观的从各个维度评估标签,掌握标签真实使用情况,进行标签持续优化,助力业务运营。同时,也能帮助数据团队判断哪些标签更应该投入计算与存储资源,合理规划集群资源。一、为何要使用标签评分?经过前期标签体系设

一文读懂hbase_程序员路遥的博客-多极客编程

HBase是一个分布式、可扩展、支持海量数据存储的NoSQL数据库。底层物理存储是以Key-Value的数据格式存储的,HBase中的所有数据文件都存储在Hadoop HDFS文件系统上。一、主要组件HBase详细架构图解注意:HBase是依赖ZooKeeper和HDFS的,需要启动ZooKeeper和HDFS。1. Client提供了访问HBase的一系列API接口,如Java Native A

特征平台在数禾的建设与应用_阿里云计算平台团队的博客-多极客编程

本篇文章为数禾科技数据开发专家杨涵冰的演讲内容整理。主要内容包括:特征平台概览特征存储服务流批一体方案模型策略调用方案​​点击查看更多技术内容​​一、特征平台概览首先是特征平台的概览,整个特征平台分成四层,分别是数据服务、存储服务、计算引擎、原始存储。数据服务层提供向外的服务,主要包括四种:一是传统的 API 点查;二是圈选查询;三是事件消息;四是同步调用计算。其中同步调用计算服务是即时计算的,相

跨集群distcp数据的时候报错异常java.nio.channels.unresolvedaddressexception_江南独孤客的博客-多极客编程

异常2022-10-10 19:41:55,541 INFO [IPC Server handler 17 on 33142] org.apache.hadoop.mapred.TaskAttemptListenerImpl: Diagnostics report from attempt_1663912973547_0434_m_000000_0: Error: java.io.IOExcept

搜索中常见数据结构与算法探究(一)_京东云官方的博客-多极客编程

1 前言ES现在已经被广泛的使用在日常的搜索中,Lucene作为它的内核值得我们深入研究,比如FST,下面就用两篇分享来介绍一些本文的主题:第一篇主要介绍数据结构和算法基础和分析方法,以及一些常用的典型的数据结构;第二篇主要介绍图论,以及自动机,KMP,FST等算法;下面开始第一篇2 引言“算法是计算机科学领域最重要的基石之一““编程语言虽然该学,但是学习计算机算法和理论更重要,因为计算机算法和理

90% 的 cdp 成了摆设?3 家零售企业说可以这么玩_数据分析那些事儿的博客-多极客编程

从事数字营销的同学,对消费者数据平台(Customer Data Platform,以下简称“CDP” )肯定不陌生。近两年,CDP 的热度持续走高,但是也有观点认为“90% 的 CDP 都成了摆设”,那么 CDP 对企业的价值究竟何在?企业该如何落地与应用?为什么企业需要 CDP?首先,我们思考下,为什么企业需要 CDP?为什么 CDP 之前没有火起来?主要有两个原因:第一,增量到存量,企业竞争

sql基础篇_davieyang的博客-多极客编程

SQL 基础及安装配置 SQL(Structured Query Language):结构化查询语言,用于访问和处理数据库的标准的计算机语言,可面向数据库执行查询、读取、插入、更新、删除数据,还可以创建数据库、建表、建存储过程、建视图,还可以通过SQL设置表、存储过程和视图的权限 常见的关系型数据库(RDBMS) RDBMS是关系型数据库的英文缩写,目前常见的有MySQL、MS SQL Ser

悲观锁与乐观锁_蓦然的博客-多极客编程

何谓悲观锁与乐观锁乐观锁对应于生活中乐观的人总是想着事情往好的方向发展,悲观锁对应于生活中悲观的人总是想着事情往坏的方向发展。这两种人各有优缺点,不能不以场景而定说一种人好于另外一种人。悲观锁总是假设最坏的情况,每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人想拿这个数据就会阻塞直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传

python之斐波那契数列的实现_zhang_shiwei的博客-多极客编程

1.斐波那契数列的概念斐波那契数列(Fibonacci sequence),又称黄金分割数列,因数学家莱昂纳多·斐波那契(Leonardo Fibonacci)以兔子繁殖为例子而引入,故又称为“兔子数列”,指的是这样一个数列:1、1、2、3、5、8、13、21、34、……在数学上,斐波那契数列以如下被以递推的方法定义:F(0)=0,F(1)=1, F(n)=F(n - 1)+F(n - 2)(n