阿川CH
学海无涯,上栽上栽!
Toggle navigation
阿川CH
主页
归档
标签
封装Hive Thrift Client ,解决一些通过JDBC处理不了的问题
2018-08-30 18:03:16
0
0
0
cqc
封装Hive Thrift Client的目的, 主要用于提交Hive作业(不涉及查询结果的提取,提取数据可自行扩展此类或直接使用HiveJDBC),解决一些通过JDBC处理不了的问题,如: 1. 取执行状态 2. 取执行进度 3. 取执行日志 4. 设置一些hive执行参数 ``` package me.cqc.hive; import com.google.common.base.Strings; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.cli.FetchType; import org.apache.hive.service.cli.HiveSQLException; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.RowSetFactory; import org.apache.hive.service.rpc.thrift.TCLIService; import org.apache.hive.service.rpc.thrift.TCLIServiceConstants; import org.apache.hive.service.rpc.thrift.TColumnDesc; import org.apache.hive.service.rpc.thrift.TExecuteStatementReq; import org.apache.hive.service.rpc.thrift.TExecuteStatementResp; import org.apache.hive.service.rpc.thrift.TFetchOrientation; import org.apache.hive.service.rpc.thrift.TFetchResultsReq; import org.apache.hive.service.rpc.thrift.TFetchResultsResp; import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq; import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataReq; import org.apache.hive.service.rpc.thrift.TGetResultSetMetadataResp; import org.apache.hive.service.rpc.thrift.TJobExecutionStatus; import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TOpenSessionResp; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TOperationState; import org.apache.hive.service.rpc.thrift.TPrimitiveTypeEntry; import org.apache.hive.service.rpc.thrift.TProgressUpdateResp; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.hive.service.rpc.thrift.TStatus; import org.apache.hive.service.rpc.thrift.TStatusCode; import org.apache.hive.service.rpc.thrift.TTableSchema; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import javax.security.sasl.SaslException; import java.io.Serializable; import java.text.NumberFormat; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.StreamSupport; /** * 很多写法可参考下原生ThriftCLIServiceClient类 */ @Slf4j public class HiveThriftClient { private TTransport transport; private TCLIService.Client client; private HiveThriftClient() { } /** * 每创建一个实例,意味着多了一个与Hive的连接,需要注意。 * 一般hive session,如果是同一个user,则可共用一个连接 * 因为一个连接对应一个用户(目前找不到一个连接能对应多个用户的方法),所以若有多用户的需求,则只能创建多个的连接 */ public static HiveThriftClient getInstance(String host, int port, String username, String password) throws SaslException { HiveThriftClient hiveThriftClient = new HiveThriftClient(); hiveThriftClient.transport = new TSocket(host, port); hiveThriftClient.transport = PlainSaslHelper.getPlainTransport(username, password, hiveThriftClient.transport); TBinaryProtocol protocol = new TBinaryProtocol(hiveThriftClient.transport); hiveThriftClient.client = new TCLIService.Client(protocol); return hiveThriftClient; } /** * 创建会话 * 在Tez中,同一个会话默认是可以共享Application Master */ public SessionHandle openSession(Map<String, String> configuration) throws TException { if (!transport.isOpen()) { transport.open(); } TOpenSessionReq openReq = new TOpenSessionReq(); if (configuration != null) { openReq.setConfiguration(configuration); } TOpenSessionResp openResp = client.OpenSession(openReq); return new SessionHandle(openResp.getSessionHandle(), openResp.getServerProtocolVersion()); } /** * 异步执行SQL */ public OperationHandle execute(SessionHandle sessionHandle, String sql) throws TException, HiveException { TExecuteStatementReq req = new TExecuteStatementReq(sessionHandle.getTSessionHandle(), sql); req.setRunAsync(true); TExecuteStatementResp executeStatementResp = client.ExecuteStatement(req); if (executeStatementResp.getStatus().getStatusCode() == TStatusCode.ERROR_STATUS) { throw new HiveException(executeStatementResp.getStatus().getErrorMessage()); } return new OperationHandle(executeStatementResp.getOperationHandle(), sessionHandle.getTProtocolVersion()); } /** * 获得sql执行状态, 可选获得执行进度 */ public OperationStatus getOperationStatus(OperationHandle operationHandle, boolean getProgressUpdate) throws TException { TGetOperationStatusReq getOperationStatusReq = new TGetOperationStatusReq(operationHandle.getTOperationHandle()); getOperationStatusReq.setGetProgressUpdate(getProgressUpdate); TGetOperationStatusResp resp = client.GetOperationStatus(getOperationStatusReq); OperationStatus.OperationStatusBuilder operationStatusBuilder = OperationStatus.builder() .operationState(resp.getOperationState()) .status(resp.getStatus()) .operationStarted(resp.getOperationStarted()) .operationCompleted(resp.getOperationCompleted()); if (resp.getOperationState() == TOperationState.ERROR_STATE) { operationStatusBuilder.operationException(new HiveSQLException(resp.getErrorMessage(), resp.getSqlState(), resp.getErrorCode())); } TProgressUpdateResp tProgressUpdateResp; if (getProgressUpdate && (tProgressUpdateResp = resp.getProgressUpdateResponse()) != null) { JobProgressUpdate jobProgressUpdate = JobProgressUpdate.builder() .progressedPercentage(tProgressUpdateResp.getProgressedPercentage()) .footerSummary(tProgressUpdateResp.getFooterSummary()) .headerNames(tProgressUpdateResp.getHeaderNames()) .rows(tProgressUpdateResp.getRows()) .status(tProgressUpdateResp.getStatus()) .build(); operationStatusBuilder.jobProgressUpdate(jobProgressUpdate); } return operationStatusBuilder.build(); } /** * 获取执行日志 */ public String getLog(OperationHandle operationHandle) throws TException { TFetchResultsReq tFetchResultsReq = new TFetchResultsReq(operationHandle.getTOperationHandle(), TFetchOrientation.FETCH_NEXT, Long.MAX_VALUE); tFetchResultsReq.setFetchType(FetchType.LOG.toTFetchType()); TFetchResultsResp tFetchResultsResp = client.FetchResults(tFetchResultsReq); RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), operationHandle.getTProtocolVersion()); return StreamSupport.stream(rowSet.spliterator(), false) .map(objects -> objects[0].toString()) .filter(t -> !Strings.isNullOrEmpty(t)) .collect(Collectors.joining(System.lineSeparator())); } /** * 如果元数据有不全的地方可参考Hive源码:HiveQueryResultSet的retrieveSchema方法 */ public ColumnMetaData getColumnMetaData(OperationHandle operationHandle) throws TException { TGetResultSetMetadataReq metadataReq = new TGetResultSetMetadataReq(operationHandle.getTOperationHandle()); TGetResultSetMetadataResp metadataResp = client.GetResultSetMetadata(metadataReq); if (verifySuccess(metadataResp.getStatus())) { return null; } TTableSchema schema = metadataResp.getSchema(); if (schema == null || !schema.isSetColumns()) { throw new TException("column meta data is not set"); } List<String> columnNames = new ArrayList<>(); List<String> columnTypes = new ArrayList<>(); List<TColumnDesc> columns = schema.getColumns(); for (TColumnDesc column : columns) { String columnName = column.getColumnName(); columnNames.add(columnName); TPrimitiveTypeEntry primitiveTypeEntry = column.getTypeDesc().getTypes().get(0).getPrimitiveEntry(); String columnTypeName = TCLIServiceConstants.TYPE_NAMES.get(primitiveTypeEntry.getType()); columnTypes.add(columnTypeName); } return new ColumnMetaData(columnNames, columnTypes); } /** * 确认任务是否执行成功 */ public static boolean verifySuccess(TStatus status) { return status.getStatusCode() == TStatusCode.SUCCESS_STATUS || status.getStatusCode() == TStatusCode.SUCCESS_WITH_INFO_STATUS ; } /** * 确认任务是否结束,此处结束非Hive里的正常结束,而是任务已经不处于执行状态 */ public static boolean verifyFinish(TOperationState tOperationState) { return tOperationState != TOperationState.INITIALIZED_STATE && tOperationState != TOperationState.PENDING_STATE && tOperationState != TOperationState.RUNNING_STATE ; } @AllArgsConstructor @Getter public static class ColumnMetaData { private List<String> columnNames; private List<String> columnTypes; } @AllArgsConstructor @Getter public static class SessionHandle implements Serializable{ private TSessionHandle tSessionHandle; private TProtocolVersion tProtocolVersion; } @AllArgsConstructor @Getter public static class OperationHandle implements Serializable { private TOperationHandle tOperationHandle; private TProtocolVersion tProtocolVersion; } @Builder @Getter public static class OperationStatus { private TOperationState operationState; private TStatus status; private long operationStarted; private long operationCompleted; private boolean hasResultSet; private HiveSQLException operationException; private JobProgressUpdate jobProgressUpdate; @Override public String toString() { return "OperationState:" + operationState.name() + (jobProgressUpdate != null && jobProgressUpdate.status != TJobExecutionStatus.NOT_AVAILABLE ? ", JobProgress:" + NumberFormat.getPercentInstance().format(jobProgressUpdate.progressedPercentage) : "") + (verifyFinish(operationState) ? ", JobStatus:" + status.getStatusCode().name() : "") ; } } @Builder @Getter public static class JobProgressUpdate { public double progressedPercentage; public String footerSummary; private List<String> headerNames; private List<List<String>> rows; private TJobExecutionStatus status; } } ``` 简单的单元测试用例 ``` package test.me.cqc.hive; import lombok.extern.slf4j.Slf4j; import me.cqc.hive.HiveThriftClient; import me.cqc.hive.SerDeUtils; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.thrift.TException; import org.junit.Before; import org.junit.Test; import javax.security.sasl.SaslException; import java.io.IOException; import java.util.concurrent.TimeUnit; @Slf4j public class HiveThriftClientTest { private HiveThriftClient client; private HiveThriftClient.SessionHandle sessionHandle; private HiveThriftClient.OperationHandle operationHandle; @Before public void init() throws SaslException, TException, HiveException { System.out.println("init one time----------------------"); client = HiveThriftClient.getInstance("10.206.19.188", 10000, "cqc", "cqc"); sessionHandle = client.openSession(null); operationHandle = client.execute(sessionHandle, "select count(*) from default.events2"); } @Test public void getOperationStatus() throws TException, InterruptedException { HiveThriftClient.OperationStatus operationStatus = client.getOperationStatus(operationHandle, true); System.out.println(operationStatus.toString()); while (! HiveThriftClient.verifyFinish(operationStatus.getOperationState())) { TimeUnit.SECONDS.sleep(3); operationStatus = client.getOperationStatus(operationHandle, true); System.out.println(operationStatus.toString()); } TimeUnit.SECONDS.sleep(60); System.out.println(client.getLog(operationHandle)); } @Test public void getLog() throws TException, InterruptedException { HiveThriftClient.OperationStatus operationStatus = client.getOperationStatus(operationHandle, true); System.out.println(client.getLog(operationHandle)); while (! HiveThriftClient.verifyFinish(operationStatus.getOperationState())) { TimeUnit.SECONDS.sleep(3); operationStatus = client.getOperationStatus(operationHandle, true); System.out.println(client.getLog(operationHandle)); } } } ```
上一篇:
Hive执行异常Premature EOF from inputStream问题及解决方案
下一篇:
springboot下读取jar包中文件内容的方式
文档导航