阿川CH
学海无涯,上栽上栽!
Toggle navigation
阿川CH
主页
归档
标签
Flume 写Channel的流程
Flume
2018-08-31 18:59:06
2
0
0
cqc
Flume
**流程图** ![](/api/file/getImage?fileId=5ae01dda418f8a54f60000e5) 1. InterceptorChain中的拦截器可以对事件进行修改、过滤 2. 通过ChannelSelector获得RequiredChannel列表和OptioinalChannel列表 3. 将事件往RequiredChannel列表中的所有Channel中写,任何一个Channel接收失败,都会导致这个批次的事件处理失败。这里可能会存在一个重复数据的问题,即当往Channel1中写是成功的,此时Channel1中已保存完成这个事件,而Channel2处理失败了,此时会向上游的Sink反馈Failed, 要求其重新再传 4. 将事件往OptioinalChannel列表中的所有Channel中写,不同RequiredChannel,其如果处理失败了,不影响其他OptioinalChannel的处理 **ChannelProcessor关键代码解析** ```java public void processEventBatch(List<Event> events) { Preconditions.checkNotNull(events, "Event list must not be null"); events = interceptorChain.intercept(events);//将事件传递给拦截器链进行修改、过滤 Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap<Channel, List<Event>>(); Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap<Channel, List<Event>>(); for (Event event : events) { List<Channel> reqChannels = selector.getRequiredChannels(event); for (Channel ch : reqChannels) { List<Event> eventQueue = reqChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); reqChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } List<Channel> optChannels = selector.getOptionalChannels(event); for (Channel ch : optChannels) { List<Event> eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); optChannelQueue.put(ch, eventQueue); } eventQueue.add(event); } } // Process required channels // 在处理RequiredChannel中,任一的Channel处理失败都会向上层抛出异常,此时上层将会向上游的Sink反馈Failed for (Channel reqChannel : reqChannelQueue.keySet()) { Transaction tx = reqChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List<Event> batch = reqChannelQueue.get(reqChannel); for (Event event : batch) { reqChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; } else if (t instanceof ChannelException) { throw (ChannelException) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); } } finally { if (tx != null) { tx.close(); } } } // Process optional channels for (Channel optChannel : optChannelQueue.keySet()) { Transaction tx = optChannel.getTransaction(); Preconditions.checkNotNull(tx, "Transaction object must not be null"); try { tx.begin(); List<Event> batch = optChannelQueue.get(optChannel); for (Event event : batch) { optChannel.put(event); } tx.commit(); } catch (Throwable t) { tx.rollback(); LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { if (tx != null) { tx.close(); } } } } ```
上一篇:
Hive参数说明
下一篇:
HDFS小文件合并工具,支持snappy、orc格式
文档导航