HDFS小文件合并工具,支持snappy、orc格式
2018-08-31 18:56:58    2    0    0
cqc
package cn.cqc3073.tool.hdfsmerger;

import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.MissingOptionException;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

@Slf4j
public class HDFSMerger {
    private static String HomePath;
    static String storageType;
    static DateTimeFormatter dsFmt;
    static Pattern dsRegex;
    static long startDS;
    static long endDS;
    static Set<String> excludeDirnameSet = new HashSet<>();
    static long mergeMaxSize;
    static long mergeSizeThrehold; //超过该值的文件将不再进行合并
    static Configuration conf;
    static FileSystem fs;
    static Path tmpMergeHome = new Path("/tmp/merge");

    static ThreadPoolExecutor findExecutor;//用于查询待合并主目录的线程池
    static ThreadPoolExecutor mergePlanExecutor;//用于计算文件合并计划的线程池
    static ThreadPoolExecutor doMergeExecutor;//用于执行合并的线程池

    public static void main(String[] args) throws ParseException, IOException {
        parseCommandLine(args);
        conf = new Configuration();
        String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
        if (hadoopConfDir == null) {
            log.error("请设置HADOOP_CONF_DIR环境变量,用于识别hdfs-site.xml配置的位置");
            System.exit(-1);
        }
        log.info("加载配置文件:{}", new Path(Paths.get(hadoopConfDir, "core-site.xml").toString()));
        log.info("加载配置文件:{}", new Path(Paths.get(hadoopConfDir, "hdfs-site.xml").toString()));
        conf.addResource(new Path(Paths.get(hadoopConfDir, "core-site.xml").toString()));
        conf.addResource(new Path(Paths.get(hadoopConfDir, "hdfs-site.xml").toString()));
        fs = FileSystem.get(conf);
        if (!fs.exists(tmpMergeHome)) {
            fs.mkdirs(tmpMergeHome);
        }

        Runtime.getRuntime().addShutdownHook(new Thread(()->{
            findExecutor.shutdownNow();
            mergePlanExecutor.shutdownNow();
            doMergeExecutor.shutdownNow();
            try {
                findExecutor.awaitTermination(10, TimeUnit.MINUTES);
                mergePlanExecutor.awaitTermination(10, TimeUnit.MINUTES);
                doMergeExecutor.awaitTermination(10, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                log.error("",e);
            }
            try {
                fs.close();
            } catch (IOException e) {
                log.error("", e);
            }
        }));

        Stopwatch stopwatch = new Stopwatch().start();
        findExecutor.submit(new FindRunner(new Path(HomePath)));
        monitor();
        log.info("合并已完成,总共耗时{}秒", stopwatch.stop().elapsedTime(TimeUnit.SECONDS));
    }

    //监控任务是否都已经执行完成,若完成,则结束应用
    private static void monitor() {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            log.error("", e);
        }
        monitor(findExecutor, "检索线程池");
        monitor(mergePlanExecutor, "计划线程池");
        monitor(doMergeExecutor, "合并线程池");
        try {
            fs.close();
        } catch (IOException e) {
            log.error("", e);
        }
    }

    private static void monitor(ThreadPoolExecutor threadPoolExecutor, String threadPoolName) {
        while (threadPoolExecutor.getActiveCount() > 0 || threadPoolExecutor.getQueue().size() > 0) {
            try {
                TimeUnit.SECONDS.sleep(5);
                log.info("{} 剩余任务:{}", threadPoolName, threadPoolExecutor.getQueue().size());
            } catch (InterruptedException e) {
                log.error("", e);
            }
        }

        log.info("{}已执行完闭,准备释放", threadPoolName);
        threadPoolExecutor.shutdown();
        try {
            threadPoolExecutor.awaitTermination(5, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            log.error("", e);
        }
        log.info("{}已释放", threadPoolName);
    }

    private static void parseCommandLine(String[] args) throws ParseException {
        Options options = new Options()
                .addOption("h", "help", false, "显示命令行参数信息")
                .addOption(null, "path", true, "需要合并的hdfs根路径。默认为 /user/hive/warehouse/rawdata")
                .addOption(null, "exclude-dirname", true, "需要排除不合并的目录名称。多个目录名称之间用逗号隔开,允许不同目录层级的目录")
                .addOption(null, "storage-type", true, "待合并文件的存储格式。目前支持snappy,默认snappy")
                .addOption(null, "ds-fmt", true, "日期的格式。默认为yyyy-MM-dd")
                .addOption(null, "ds-regex", true, "从hdfs路径中提取日期的正则表达式。默认为.*_(\\d{4}-\\d{2}-\\d{2})")
                .addOption(null, "start-ds", true, "必填:起始日期")
                .addOption(null, "end-ds", true, "结束日期(包含),当未提供结束日期时,则只合并start-ds当天的数据")
                .addOption(null, "merge-max-size", true, "合并后文件的最大大小。单位:MB, 默认32M")
                .addOption(null, "find-threads-count", true, "执行检索符合合并条件(日期区间)的主目录的线程池大小,默认为3个")
                .addOption(null, "plan-threads-count", true, "用于计算文件合并计划的线程池大小,默认为2个")
                .addOption(null, "merge-threads-count", true, "用于执行文件合并的线程池大小,默认为3个");
        CommandLineParser parser = new DefaultParser();
        CommandLine cmd = parser.parse(options, args);

        if (cmd.hasOption('h')) {
            HelpFormatter formatter = new HelpFormatter();
            formatter.printHelp("merge-hdfs", options);
            System.exit(0);
        }

        HomePath = cmd.getOptionValue("path", "/user/hive/warehouse/rawdata");
        storageType = cmd.getOptionValue("storage-type", "snappy");
        dsFmt = DateTimeFormatter.ofPattern(cmd.getOptionValue("ds-fmt", "yyyy-MM-dd"));
        dsRegex = Pattern.compile(cmd.getOptionValue("ds-regex", ".*_(\\d{4}-\\d{2}-\\d{2})"));
        if (!cmd.hasOption("start-ds")) {
            throw new MissingOptionException("Missing required option: start-ds");
        }
        startDS = LocalDate.parse(cmd.getOptionValue("start-ds"), dsFmt).toEpochDay();
        endDS = cmd.hasOption("end-ds") ? LocalDate.parse(cmd.getOptionValue("end-ds"), dsFmt).toEpochDay() : startDS;

        if (cmd.hasOption("exclude-dirname")) {
            Arrays.stream(cmd.getOptionValue("exclude-dirname").split(","))
                    .map(String::trim)
                    .forEach(excludeDirnameSet::add);
            log.info("需要跳过的目录有:{}", excludeDirnameSet);
        }
        mergeMaxSize = Long.parseLong(cmd.getOptionValue("merge-max-size", "32")) << 20;
        mergeSizeThrehold = (long) (mergeMaxSize * 0.8);
        findExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("find-threads-count", "3"))
                , new ThreadFactoryBuilder().setNameFormat("find-executor-%d")
                        .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build());
        mergePlanExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("plan-threads-count", "2"))
                , new ThreadFactoryBuilder().setNameFormat("mergeplan-executor-%d")
                        .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build());
        doMergeExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Integer.parseInt(cmd.getOptionValue("merge-threads-count", "3"))
                , new ThreadFactoryBuilder().setNameFormat("domerge-executor-%d")
                        .setUncaughtExceptionHandler((t, e) -> log.error("thread name ["+t.getName()+"] catch exception", e)).build());
    }
}

 

package cn.cqc3073.tool.hdfsmerger;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.time.LocalDate;
import java.util.regex.Matcher;

import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.dsFmt;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.dsRegex;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.endDS;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.excludeDirnameSet;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.findExecutor;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergePlanExecutor;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.startDS;

@Slf4j
public class FindRunner implements Runnable {
    private Path parent;

    FindRunner(Path path){
        this.parent = path;
    }
    @Override
    public void run() {
        log.debug("正在检索的目录:{}", this.parent);
        try {
            FileStatus[] fileStatuses = fs.listStatus(this.parent, p -> {
                try {
                    return fs.isDirectory(p);
                } catch (IOException e) {
                    log.error("",e);
                    return false;
                }
            });
            for (FileStatus fileStatuse : fileStatuses) {
                Path path = fileStatuse.getPath();
                String pathName = path.getName();
                if (excludeDirnameSet.contains(pathName)) {
                    log.info("{}在排除名单中,跳过该目录", path);
                    continue;
                }

                Matcher m = dsRegex.matcher(pathName);
                if (m.find()) {
                    String ds = m.group(1);
                    long pathEpochDay = LocalDate.parse(ds, dsFmt).toEpochDay();
                    if (pathEpochDay >= startDS && pathEpochDay <= endDS) {
                        log.info("待合并目录:{}", path);
                        mergePlanExecutor.submit(MergePlanRunner.getMergePlanRunner(path));
                    }
                    continue;
                }
                findExecutor.submit(new FindRunner(path));
            }
        } catch (Exception e) {
            log.error("", e);
        }
    }
}

 

package cn.cqc3073.tool.hdfsmerger;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;

import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.doMergeExecutor;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergeMaxSize;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergePlanExecutor;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.mergeSizeThrehold;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.storageType;

@Slf4j
public class MergePlanRunner implements Runnable {
    private Path parent;

    private MergePlanRunner(Path path){
        this.parent = path;
    }
    @Override
    public void run() {
        log.debug("计算合并计划:{}", this.parent);
        try {
            List<FileStatus> fileList = new LinkedList<>();//待合并的文件
            FileStatus[] fileStatuses = fs.listStatus(this.parent);
            for (FileStatus fileStatuse : fileStatuses) {
                Path path = fileStatuse.getPath();
                if (fileStatuse.isDirectory()) {
                    mergePlanExecutor.submit(getMergePlanRunner(path));
                    continue;
                }

                if (fileStatuse.isFile()) {
                    fileList.add(fileStatuse);
                }
            }

            /*
            合并计划算法:
            1. 按文件的大小进行降序
            2. 顺序取出文件,按照以下原则进行匹配:大的文件优先匹配, 优先将两两大文件进行合并
             */
            fileList.sort((o1, o2) -> (int)(o2.getLen() - o1.getLen()));
            Map<List<FileStatus>, Long> planMap = new LinkedHashMap<>();
            for (FileStatus fileStatus : fileList) {
                long fileLenth = fileStatus.getLen();
                if (fileLenth > mergeSizeThrehold) continue; //不对超过临界值的文件进行合并
                boolean isMatch = false;
                for (Map.Entry<List<FileStatus>, Long> entry : planMap.entrySet()) {
                    long sumFileLenth = entry.getValue() + fileLenth;
                    if (sumFileLenth < mergeMaxSize){
                        entry.getKey().add(fileStatus);
                        entry.setValue(sumFileLenth);
                        isMatch = true;
                        break;
                    }
                }

                if (!isMatch) {
                    List<FileStatus> list = new LinkedList<>();
                    list.add(fileStatus);
                    planMap.put(list, fileLenth);
                }
            }

            for (List<FileStatus> plan : planMap.keySet()) {
                if (plan.size() <= 1) continue;
                doMergeExecutor.submit(MergeRunner.getMergeRunner(plan));
            }

        } catch (Exception e) {
            log.error("", e);
        }
    }
    static MergePlanRunner getMergePlanRunner(Path path){
        if ("snappy".equalsIgnoreCase(storageType) || "orc".equalsIgnoreCase(storageType)) {
            return new MergePlanRunner(path);
        } else {
            throw new RuntimeException("unsupport storge type ["+storageType+"]");
        }
    }

}

 

package cn.cqc3073.tool.hdfsmerger;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Trash;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.ReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.SocketTimeoutException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.conf;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.fs;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.storageType;
import static com.nubia.stat.tool.hdfsmerger.HDFSMerger.tmpMergeHome;

@Slf4j
public abstract class MergeRunner implements Runnable {
    private static AtomicInteger counter = new AtomicInteger();
    private static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmm");
    private List<FileStatus> needMergeFiles;

    MergeRunner(List<FileStatus> needMergeFiles) {
        this.needMergeFiles = needMergeFiles;
    }

    @Override
    public void run() {
        String msg = needMergeFiles.stream()
                .map(fileStatus -> fileStatus.getPath() + "(" + fileStatus.getLen() + ")")
                .collect(Collectors.joining(","));
        long totalSize = needMergeFiles.stream()
                .mapToLong(FileStatus::getLen)
                .sum();
        if (log.isDebugEnabled()) {
            log.debug("预计合并文件数量:{},合并后总大小:{}, 合并的文件有:{}", needMergeFiles.size(), FileUtils.byteCountToDisplaySize(totalSize), msg);
        }

        try {
            Path srcPath = doMerge(needMergeFiles);
            Path destPath = new Path(needMergeFiles.get(0).getPath().getParent(), srcPath.getName());
            log.info("已合并文件:{}, 准备迁入{}。 预计文件大小{},实际文件大小{}"
                    , srcPath, destPath, totalSize, fs.getFileStatus(srcPath).getLen());
            fs.rename(srcPath, destPath);
            for (FileStatus needMergeFile : needMergeFiles) {
                Trash.moveToAppropriateTrash(fs, needMergeFile.getPath(), conf);
            }
        } catch (SocketTimeoutException e) {
            log.error("[Timeout]合并失败,参与合并的文件有:" + msg, e);
        } catch (Exception e) {
            log.error("合并失败,参与合并的文件有:" + msg, e);
        }

    }

    abstract Path doMerge(List<FileStatus> needMergeFiles) throws IOException;

    static MergeRunner getMergeRunner(List<FileStatus> needMergeFiles) {
        if ("snappy".equalsIgnoreCase(storageType)) {
            return new SnappyMergeRunner(needMergeFiles);
        } else if ("orc".equalsIgnoreCase(storageType)) {
            return new OrcMergeRunner(needMergeFiles);
        } else {
            throw new RuntimeException("unsupport storge type [" + storageType + "]");
        }
    }

    private static class SnappyMergeRunner extends MergeRunner {
        SnappyMergeRunner(List<FileStatus> needMergeFiles) {
            super(needMergeFiles);
        }

        @Override
        Path doMerge(List<FileStatus> needMergeFiles) throws IOException {
            String mergedFileName = String.format("merged%d.%s.snappy"
                    , counter.incrementAndGet()
                    , LocalDateTime.now().format(formatter));
            Path tmpFile = new Path(tmpMergeHome, mergedFileName);
            SnappyCodec codec = new SnappyCodec();
            codec.setConf(conf);
            try (OutputStream out = codec.createOutputStream(fs.create(tmpFile))) {
                for (FileStatus needMergeFile : needMergeFiles) {
                    try (InputStream in = codec.createInputStream(fs.open(needMergeFile.getPath()))) {
                        copyBytes(in, out, 4096);
                    }
                }
            }
            return tmpFile;
        }

        void copyBytes(InputStream in, OutputStream out, int buffSize)
                throws IOException {
            PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
            byte buf[] = new byte[buffSize];
            int bytesRead = in.read(buf);
            if (bytesRead < 0) {
                throw new IOException("read stream fail");
            }
            while (bytesRead >= 0) {
                out.write(buf, 0, bytesRead);
                if ((ps != null) && ps.checkError()) {
                    throw new IOException("Unable to write to output stream.");
                }
                bytesRead = in.read(buf);
            }
        }
    }

    //Orc合并逻辑参考自Hive原生的OrcFileMergeOperator类
    private static class OrcMergeRunner extends MergeRunner {
        private CompressionKind compression = null;
        private int compressBuffSize = 0;
        private OrcFile.Version version;
        private int columnCount = 0;
        private int rowIndexStride = 0;
        OrcMergeRunner(List<FileStatus> needMergeFiles) {
            super(needMergeFiles);
        }

        @Override
        Path doMerge(List<FileStatus> needMergeFiles) throws IOException {
            String mergedFileName = String.format("merged%d.%s.orc"
                    , counter.incrementAndGet()
                    , LocalDateTime.now().format(formatter));
            Path tmpFile = new Path(tmpMergeHome, mergedFileName);
            Writer orcWriter = null;
            try {
                for (FileStatus needMergeFile : needMergeFiles) {
                    Path path = needMergeFile.getPath();
                    ReaderImpl orcReader = (ReaderImpl) OrcFile.createReader(fs, path);
                    if (orcWriter == null){
                        orcWriter = createOrcWriter(tmpFile, orcReader);
                    }
                    if (!checkCompatibility(orcReader, path)) {
                        continue;
                    }
                    List<StripeInformation> stripes  = orcReader.getStripes();
                    List<OrcProto.StripeStatistics> stripeStatistics = orcReader.getOrcProtoStripeStatistics();
                    int stripeIdx = 0;
                    try(FSDataInputStream fdis = fs.open(path)){
                        for (StripeInformation stripe : stripes) {
                            byte[] buffer = new byte[(int)stripe.getLength()];
                            fdis.readFully(stripe.getOffset(), buffer, 0, (int)stripe.getLength());
                            orcWriter.appendStripe(buffer, 0, buffer.length, stripe, stripeStatistics.get(stripeIdx++));
                        }
                    }
                    orcWriter.appendUserMetadata(orcReader.getOrcProtoUserMetadata());
                }
            } finally {
                if (orcWriter != null) {
                    orcWriter.close();
                }
            }

            return tmpFile;
        }

        private Writer createOrcWriter(Path tmpFile, Reader orcReader) throws IOException {
            compression = orcReader.getCompressionKind();
            compressBuffSize = orcReader.getCompressionSize();
            version = orcReader.getFileVersion();
            //noinspection deprecation
            columnCount = orcReader.getTypes().get(0).getSubtypesCount();
            rowIndexStride = orcReader.getRowIndexStride();

            OrcFile.WriterOptions options = OrcFile.writerOptions(conf)
                    .compress(compression)
                    .version(version)
                    .rowIndexStride(rowIndexStride)
                    .inspector(orcReader.getObjectInspector());
            if (compression != CompressionKind.NONE) {
                options.bufferSize(compressBuffSize).enforceBufferSize();
            }
            return OrcFile.createWriter(tmpFile, options);
        }

        private boolean checkCompatibility(Reader orcReader, Path inputPath) {
            // check compatibility with subsequent files
            //noinspection deprecation
            if ((orcReader.getTypes().get(0).getSubtypesCount() != columnCount)) {
                log.warn("Incompatible ORC file merge! Column counts mismatch for " + inputPath);
                return false;
            }

            if (orcReader.getCompressionKind() != compression) {
                log.warn("Incompatible ORC file merge! Compression codec mismatch for " +inputPath);
                return false;
            }

            if (orcReader.getCompressionSize() != compressBuffSize) {
                log.warn("Incompatible ORC file merge! Compression buffer size mismatch for " + inputPath);
                return false;

            }

            if (!orcReader.getFileVersion().equals(version)) {
                log.warn("Incompatible ORC file merge! Version mismatch for " + inputPath);
                return false;
            }

            if (orcReader.getRowIndexStride() != rowIndexStride) {
                log.warn("Incompatible ORC file merge! Row index stride mismatch for " + inputPath);
                return false;
            }

            return true;
        }
    }
}

 

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>tools</artifactId>
        <groupId>cn.cqc3073</groupId>
        <version>1.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>hdfs-merger</artifactId>
    <version>1.0</version>

    <dependencies>
        <dependency>
            <groupId>commons-cli</groupId>
            <artifactId>commons-cli</artifactId>
            <version>1.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>2.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.16</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>merge-hdfs</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>executable</shadedClassifierName>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <manifestEntries>
                                        <Main-Class>com.nubia.stat.tool.hdfsmerger.HDFSMerger</Main-Class>
                                    </manifestEntries>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>core-site.xml</exclude>
                                        <exclude>hdfs-site.xml</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.skife.maven</groupId>
                <artifactId>really-executable-jar-maven-plugin</artifactId>
                <configuration>
                    <flags>-Xmx5G -Djava.library.path=/usr/lib/hadoop-current/lib/native</flags>
                    <classifier>executable</classifier>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>really-executable-jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>

</project>

 

上一篇: Flume 写Channel的流程

下一篇: UDAF:oneof(任意获得待聚合集中的一个)

文档导航