package cn.cqc3073.tool.hdfsmerger; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.MissingOptionException; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.nio.file.Paths; import java.time.LocalDate; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @Slf4j public class HDFSMerger { private static String HomePath; static String storageType; static DateTimeFormatter dsFmt; static Pattern dsRegex; static long startDS; static long endDS; static Set<String> excludeDirnameSet = new HashSet<>(); static long mergeMaxSize; static long mergeSizeThrehold; //超过该值的文件将不再进行合并 static Configuration conf; static FileSystem fs; static Path tmpMergeHome = new Path("/tmp/merge"); static ThreadPoolExecutor findExecutor;//用于查询待合并主目录的线程池 static ThreadPoolExecutor mergePlanExecutor;//用于计算文件合并计划的线程池 static ThreadPoolExecutor doMergeExecutor;//用于执行合并的线程池 public static void main(String[] args) throws ParseException, IOException { parseCommandLine(args); conf = new Configuration(); String hadoopConfDir = System.getenv("HADOOP_CONF_DIR"); if (hadoopConfDir == null) { log.error("请设置HADOOP_CONF_DIR环境变量,用于识别hdfs-site.xml配置的位置"); System.exit(-1); } log.info("加载配置文件:{}", new Path(Paths.get(hadoopConfDir, "core-site.xml").toString())); log.info("加载配置文件:{}", new Path(Paths.get(hadoopConfDir, "hdfs-site.xml").toString())); conf.addResource(new Path(Paths.get(hadoopConfDir, "core-site.xml").toString())); conf.addResource(new Path(Paths.get(hadoopConfDir, "hdfs-site.xml").toString())); fs = FileSystem.get(conf); if (!fs.exists(tmpMergeHome)) { fs.mkdirs(tmpMergeHome); } Runtime.getRuntime().addShutdownHook(new Thread(()->{ findExecutor.shutdownNow(); mergePlanExecutor.shutdownNow(); doMergeExecutor.shutdownNow(); try { findExecutor.awaitTermination(10, TimeUnit.MINUTES); mergePlanExecutor.awaitTermination(10, TimeUnit.MINUTES); doMergeExecutor.awaitTermination(10, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error("",e); } try { fs.close(); } catch (IOException e) { log.error("", e); } })); Stopwatch stopwatch = new Stopwatch().start(); findExecutor.submit(new FindRunner(new Path(HomePath))); monitor(); log.info("合并已完成,总共耗时{}秒", stopwatch.stop().elapsedTime(TimeUnit.SECONDS)); } //监控任务是否都已经执行完成,若完成,则结束应用 private static void monitor() { try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { log.error("", e); } monitor(findExecutor, "检索线程池"); monitor(mergePlanExecutor, "计划线程池"); monitor(doMergeExecutor, "合并线程池"); try { fs.close(); } catch (IOException e) { log.error("", e); } } private static void monitor(ThreadPoolExecutor threadPoolExecutor, String threadPoolName) { while (threadPoolExecutor.getActiveCount() > 0 || threadPoolExecutor.getQueue().size() > 0) { try { TimeUnit.SECONDS.sleep(5); log.info("{} 剩余任务:{}", threadPoolName, threadPoolExecutor.getQueue().size()); } catch (InterruptedException e) { log.error("", e); } } log.info("{}已执行完闭,准备释放", threadPoolName); threadPoolExecutor.shutdown(); try { threadPoolExecutor.awaitTermination(5, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error("", e); } log.info("{}已释放", threadPoolName); } private static void parseCommandLine(String[] args) throws ParseException { Options options = new Options() .addOption("h", "help", false, "显示命令行参数信息") .addOption(null, "path", true, "需要合并的hdfs根路径。默认为 /user/hive/warehouse/rawdata") .addOption(null, "exclude-dirname", true, "需要排除不合并的目录名称。多个目录名称之间用逗号隔开,允许不同目录层级的目录") .addOption(null, "storage-type", true, "待合并文件的存储格式。目前支持snappy,默认snappy") .addOption(null, "ds-fmt", true, "日期的格式。默认为yyyy-MM-dd") .addOption(null, "ds-regex", true, "从hdfs路径中提取日期的正则表达式。默认为.*_(\\d{4}-\\d{2}-\\d{2})") .addOption(null, "start-ds", true, "必填:起始日期") .addOption(null, "end-ds", true, "结束日期(包含),当未提供结束日期时,则只合并start-ds当天的数据") .addOption(null, "merge-max-size", true, "合并后文件的最大大小。单位:MB, 默认32M") .addOption(null, "find-threads-count", true, "执行检索符合合并条件(日期区间)的主目录的线程池大小,默认为3个") .addOption(null, "plan-threads-count", true, "用于计算文件合并计划的线程池大小,默认为2个") .addOption(null, "merge-threads-count", true, "用于执行文件合并的线程池大小,默认为3个"); CommandLineParser parser = new DefaultParser(); CommandLine cmd = parser.parse(options, args); if (cmd.hasOption('h')) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("merge-hdfs", options); System.exit(0); } HomePath = cmd.getOptionValue("path", "/user/hive/warehouse/rawdata"); storageType = cmd.getOptionValue("storage-type", "snappy"); dsFmt = DateTimeFormatter.ofPattern(cmd.getOptionValue("ds-fmt", "yyyy-MM-dd")); dsRegex = Pattern.compile(cmd.getOptionValue("ds-regex", ".*_(\\d{4}-\\d{2}-\\d{2})")); if (!cmd.hasOption("start-ds")) { throw new MissingOptionException("Missing required option: start-ds"); } startDS = LocalDate.parse(cmd.getOptionValue("start-ds"), dsFmt).toEpochDay(); endDS = cmd.hasOption("end-ds") ? LocalDate.parse(cmd.getOptionValue("end-ds"), dsFmt).toEpochDay() : startDS; if (cmd.hasOption("exclude-dirname")) { Arrays.stream(cmd.getOptionValue("exclude-dirname").split(",")) .map(String::trim) .forEach(excludeDirnameSet::add); log.info("需要跳过的目录有:{}", excludeDirnameSet); } mergeMaxSize = Long.parseLong(cmd.getOptionValue("merge-max-size", "32")) << 20; mergeSizeThrehold = (long) (mergeMaxSize * 0.8); findExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("find-threads-count", "3")) , new ThreadFactoryBuilder().setNameFormat("find-executor-%d") .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build()); mergePlanExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("plan-threads-count", "2")) , new ThreadFactoryBuilder().setNameFormat("mergeplan-executor-%d") .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build()); doMergeExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("merge-threads-count", "3")) , new ThreadFactoryBuilder().setNameFormat("domerge-executor-%d") .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build()); } }
package cn.cqc3073.tool.hdfsmerger; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import java.io.IOException; import java.time.LocalDate; import java.util.regex.Matcher; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.dsFmt; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.dsRegex; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.endDS; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.excludeDirnameSet; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.findExecutor; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergePlanExecutor; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.startDS; @Slf4j public class FindRunner implements Runnable { private Path parent; FindRunner(Path path){ this.parent = path; } @Override public void run() { log.debug("正在检索的目录:{}", this.parent); try { FileStatus[] fileStatuses = fs.listStatus(this.parent, p -> { try { return fs.isDirectory(p); } catch (IOException e) { log.error("",e); return false; } }); for (FileStatus fileStatuse : fileStatuses) { Path path = fileStatuse.getPath(); String pathName = path.getName(); if (excludeDirnameSet.contains(pathName)) { log.info("{}在排除名单中,跳过该目录", path); continue; } Matcher m = dsRegex.matcher(pathName); if (m.find()) { String ds = m.group(1); long pathEpochDay = LocalDate.parse(ds, dsFmt).toEpochDay(); if (pathEpochDay >= startDS && pathEpochDay <= endDS) { log.info("待合并目录:{}", path); mergePlanExecutor.submit(MergePlanRunner.getMergePlanRunner(path)); } continue; } findExecutor.submit(new FindRunner(path)); } } catch (Exception e) { log.error("", e); } } }
package cn.cqc3073.tool.hdfsmerger; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.doMergeExecutor; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergeMaxSize; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergePlanExecutor; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergeSizeThrehold; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.storageType; @Slf4j public class MergePlanRunner implements Runnable { private Path parent; private MergePlanRunner(Path path){ this.parent = path; } @Override public void run() { log.debug("计算合并计划:{}", this.parent); try { List<FileStatus> fileList = new LinkedList<>();//待合并的文件 FileStatus[] fileStatuses = fs.listStatus(this.parent); for (FileStatus fileStatuse : fileStatuses) { Path path = fileStatuse.getPath(); if (fileStatuse.isDirectory()) { mergePlanExecutor.submit(getMergePlanRunner(path)); continue; } if (fileStatuse.isFile()) { fileList.add(fileStatuse); } } /* 合并计划算法: 1. 按文件的大小进行降序 2. 顺序取出文件,按照以下原则进行匹配:大的文件优先匹配, 优先将两两大文件进行合并 */ fileList.sort((o1, o2) -> (int)(o2.getLen() - o1.getLen())); Map<List<FileStatus>, Long> planMap = new LinkedHashMap<>(); for (FileStatus fileStatus : fileList) { long fileLenth = fileStatus.getLen(); if (fileLenth > mergeSizeThrehold) continue; //不对超过临界值的文件进行合并 boolean isMatch = false; for (Map.Entry<List<FileStatus>, Long> entry : planMap.entrySet()) { long sumFileLenth = entry.getValue() + fileLenth; if (sumFileLenth < mergeMaxSize){ entry.getKey().add(fileStatus); entry.setValue(sumFileLenth); isMatch = true; break; } } if (!isMatch) { List<FileStatus> list = new LinkedList<>(); list.add(fileStatus); planMap.put(list, fileLenth); } } for (List<FileStatus> plan : planMap.keySet()) { if (plan.size() <= 1) continue; doMergeExecutor.submit(MergeRunner.getMergeRunner(plan)); } } catch (Exception e) { log.error("", e); } } static MergePlanRunner getMergePlanRunner(Path path){ if ("snappy".equalsIgnoreCase(storageType) || "orc".equalsIgnoreCase(storageType)) { return new MergePlanRunner(path); } else { throw new RuntimeException("unsupport storge type ["+storageType+"]"); } } }
package cn.cqc3073.tool.hdfsmerger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Trash; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.Reader; import org.apache.hadoop.hive.ql.io.orc.ReaderImpl; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.orc.CompressionKind; import org.apache.orc.OrcProto; import org.apache.orc.StripeInformation; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; import java.net.SocketTimeoutException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.conf; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.storageType; import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.tmpMergeHome; @Slf4j public abstract class MergeRunner implements Runnable { private static AtomicInteger counter = new AtomicInteger(); private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm"); private List<FileStatus> needMergeFiles; MergeRunner(List<FileStatus> needMergeFiles) { this.needMergeFiles = needMergeFiles; } @Override public void run() { String msg = needMergeFiles.stream() .map(fileStatus -> fileStatus.getPath() + "(" + fileStatus.getLen() + ")") .collect(Collectors.joining(",")); long totalSize = needMergeFiles.stream() .mapToLong(FileStatus::getLen) .sum(); if (log.isDebugEnabled()) { log.debug("预计合并文件数量:{},合并后总大小:{}, 合并的文件有:{}", needMergeFiles.size(), FileUtils.byteCountToDisplaySize(totalSize), msg); } try { Path srcPath = doMerge(needMergeFiles); Path destPath = new Path(needMergeFiles.get(0).getPath().getParent(), srcPath.getName()); log.info("已合并文件:{}, 准备迁入{}。 预计文件大小{},实际文件大小{}" , srcPath, destPath, totalSize, fs.getFileStatus(srcPath).getLen()); fs.rename(srcPath, destPath); for (FileStatus needMergeFile : needMergeFiles) { Trash.moveToAppropriateTrash(fs, needMergeFile.getPath(), conf); } } catch (SocketTimeoutException e) { log.error("[Timeout]合并失败,参与合并的文件有:" + msg, e); } catch (Exception e) { log.error("合并失败,参与合并的文件有:" + msg, e); } } abstract Path doMerge(List<FileStatus> needMergeFiles) throws IOException; static MergeRunner getMergeRunner(List<FileStatus> needMergeFiles) { if ("snappy".equalsIgnoreCase(storageType)) { return new SnappyMergeRunner(needMergeFiles); } else if ("orc".equalsIgnoreCase(storageType)) { return new OrcMergeRunner(needMergeFiles); } else { throw new RuntimeException("unsupport storge type [" + storageType + "]"); } } private static class SnappyMergeRunner extends MergeRunner { SnappyMergeRunner(List<FileStatus> needMergeFiles) { super(needMergeFiles); } @Override Path doMerge(List<FileStatus> needMergeFiles) throws IOException { String mergedFileName = String.format("merged%d.%s.snappy" , counter.incrementAndGet() , LocalDateTime.now().format(formatter)); Path tmpFile = new Path(tmpMergeHome, mergedFileName); SnappyCodec codec = new SnappyCodec(); codec.setConf(conf); try (OutputStream out = codec.createOutputStream(fs.create(tmpFile))) { for (FileStatus needMergeFile : needMergeFiles) { try (InputStream in = codec.createInputStream(fs.open(needMergeFile.getPath()))) { copyBytes(in, out, 4096); } } } return tmpFile; } void copyBytes(InputStream in, OutputStream out, int buffSize) throws IOException { PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); if (bytesRead < 0) { throw new IOException("read stream fail"); } while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); } } } //Orc合并逻辑参考自Hive原生的OrcFileMergeOperator类 private static class OrcMergeRunner extends MergeRunner { private CompressionKind compression = null; private int compressBuffSize = 0; private OrcFile.Version version; private int columnCount = 0; private int rowIndexStride = 0; OrcMergeRunner(List<FileStatus> needMergeFiles) { super(needMergeFiles); } @Override Path doMerge(List<FileStatus> needMergeFiles) throws IOException { String mergedFileName = String.format("merged%d.%s.orc" , counter.incrementAndGet() , LocalDateTime.now().format(formatter)); Path tmpFile = new Path(tmpMergeHome, mergedFileName); Writer orcWriter = null; try { for (FileStatus needMergeFile : needMergeFiles) { Path path = needMergeFile.getPath(); ReaderImpl orcReader = (ReaderImpl) OrcFile.createReader(fs, path); if (orcWriter == null){ orcWriter = createOrcWriter(tmpFile, orcReader); } if (!checkCompatibility(orcReader, path)) { continue; } List<StripeInformation> stripes = orcReader.getStripes(); List<OrcProto.StripeStatistics> stripeStatistics = orcReader.getOrcProtoStripeStatistics(); int stripeIdx = 0; try(FSDataInputStream fdis = fs.open(path)){ for (StripeInformation stripe : stripes) { byte[] buffer = new byte[(int)stripe.getLength()]; fdis.readFully(stripe.getOffset(), buffer, 0, (int)stripe.getLength()); orcWriter.appendStripe(buffer, 0, buffer.length, stripe, stripeStatistics.get(stripeIdx++)); } } orcWriter.appendUserMetadata(orcReader.getOrcProtoUserMetadata()); } } finally { if (orcWriter != null) { orcWriter.close(); } } return tmpFile; } private Writer createOrcWriter(Path tmpFile, Reader orcReader) throws IOException { compression = orcReader.getCompressionKind(); compressBuffSize = orcReader.getCompressionSize(); version = orcReader.getFileVersion(); //noinspection deprecation columnCount = orcReader.getTypes().get(0).getSubtypesCount(); rowIndexStride = orcReader.getRowIndexStride(); OrcFile.WriterOptions options = OrcFile.writerOptions(conf) .compress(compression) .version(version) .rowIndexStride(rowIndexStride) .inspector(orcReader.getObjectInspector()); if (compression != CompressionKind.NONE) { options.bufferSize(compressBuffSize).enforceBufferSize(); } return OrcFile.createWriter(tmpFile, options); } private boolean checkCompatibility(Reader orcReader, Path inputPath) { // check compatibility with subsequent files //noinspection deprecation if ((orcReader.getTypes().get(0).getSubtypesCount() != columnCount)) { log.warn("Incompatible ORC file merge! Column counts mismatch for " + inputPath); return false; } if (orcReader.getCompressionKind() != compression) { log.warn("Incompatible ORC file merge! Compression codec mismatch for " +inputPath); return false; } if (orcReader.getCompressionSize() != compressBuffSize) { log.warn("Incompatible ORC file merge! Compression buffer size mismatch for " + inputPath); return false; } if (!orcReader.getFileVersion().equals(version)) { log.warn("Incompatible ORC file merge! Version mismatch for " + inputPath); return false; } if (orcReader.getRowIndexStride() != rowIndexStride) { log.warn("Incompatible ORC file merge! Row index stride mismatch for " + inputPath); return false; } return true; } } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>tools</artifactId> <groupId>cn.cqc3073</groupId> <version>1.0</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hdfs-merger</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.16</version> </dependency> </dependencies> <build> <finalName>merge-hdfs</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>executable</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <manifestEntries> <Main-Class>com.nubia.stat.tool.hdfsmerger.HDFSMerger</Main-Class> </manifestEntries> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>core-site.xml</exclude> <exclude>hdfs-site.xml</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.skife.maven</groupId> <artifactId>really-executable-jar-maven-plugin</artifactId> <configuration> <flags>-Xmx5G -Djava.library.path=/usr/lib/hadoop-current/lib/native</flags> <classifier>executable</classifier> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>really-executable-jar</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>