http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1/FlumeDeveloperGuide.html
本节结合官方文档和源码来尝试定义自定义组件
源码
首先来看看源码是如何组件是如何定义:
TaildirSource
Taildir Source 源码是 org.apache.flume.source.taildir.TaildirSource,大致有几个操作
- 继承 AbstractSource并实现Configurable、PollableSource
- 定义私有属性接收 Source定义时属性:filePaths、positionFilePath、batchSize…
- configure函数初始化私有属性
- process函数处理真正的流程
loggerSink
` logger Sink ` 源码是 org.apache.flume.sink.LoggerSink,操作类似
- 集成 AbstractSink并实现Configurable
- 定义私有属性接收 Sink定义时属性:channel、name…
- process函数处理真正的流程,包含事务提交
HostInterceptor
Host Interceptor源码是 org.apache.flume.interceptor.HostInterceptor,类似但有几点变化
- 必须要有内部类 Builder
- Builder中获取属性和创建- Interceptor
- intercept函数处理流程
Source
自定义Source :随机生成 0-100,并可以加上自定义的前后缀
public class CustomSource extends AbstractSource implements Configurable, PollableSource{
    private String prefix;
    private String suffix;
    public void configure(Context context) {
        prefix = context.getString("prefix","danner");
        suffix = context.getString("suffix");
    }
    public Status process() throws EventDeliveryException {
        Status status = null;
        try {
            // 创建 Event
            SimpleEvent event = new SimpleEvent();
            // event 添加内容
            event.setBody(new String(prefix + new Random().nextInt(100) + suffix).getBytes());
            // event 向后传递到 Channel
            getChannelProcessor().processEvent(event);
            // 处理成功
            status = Status.READY;
        }catch (Throwable t){
            t.printStackTrace();
            status = Status.BACKOFF;
        }
        // 防止太快
        try {
            Thread.sleep(2*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return status;
    }
    ......
conf 文件配置成 logger,但 source 配置有变化:type 不是自带需带类名,后面直接代码集成到 Flume 源码后可简写
ng1.sources.r1.type = com.flume.bidata.flume.CustomSource
ng1.sources.r1.suffix = source
执行后输出
Event: { headers:{} body: 64 61 6E 6E 65 72 39 33 73 6F 75 72 63 65      danner93source }
Sink
类似的简单自定义 Sink:加前后缀输出
public class CustomSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(CustomSink.class);
    private String prefix;
    private String suffix;
    public void configure(Context context) {
        prefix = context.getString("prefix","hz");
        suffix = context.getString("suffix","");
    }
    public Status process() throws EventDeliveryException {
        Status status = null;
        Event event = null;
        // 得到输入的 channel
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            transaction.begin();
            // 获取 event
            while (true){
                event = channel.take();
                if (event != null){
                    break;
                }
            }
            // 输出 event
            logger.info("custom sink: " + new String(event.getBody()));
			// 事务提交
            transaction.commit();
            status = Status.READY;
        }catch (Throwable t){
            t.printStackTrace();
            // event 处理失败,回滚
            transaction.rollback();
            status = Status.BACKOFF;
        }finally {
            // 关闭
            transaction.close();
        }
        return status;
    }
 }
conf 配置 netcat 输入
ng1.sinks.k1.type = com.flume.bidata.flume.CustomSink
ng1.sinks.k1.suffix = end
执行输出
[INFO - com.flume.bidata.flume.CustomSink.process(CustomSink.java:39)] hzxiend
事务
上节出现事务,本节来看看 Flume  中 事务是如何在 agent工作,以 MemoryChannel 为例
private LinkedBlockingDeque<Event> takeList; // sink 从 channel 取出 event 的记录,回滚使用
private LinkedBlockingDeque<Event> putList;	 // source 放入 channel 的 event
定义 putList 批量接收 Source 传递过来 event;takeList批量发送 event 到 Sink
source —> putList —> queue —> takeList —> sink
source –> channel
- 
    doPut:由ChannelProcessor发送event,Channel将event放入putList并检测队列是否溢出
- 
    doCommit:比较特殊涉及takeList和putList操作- 若 takeList.size() > putList.size()表示取的速度跟不上放的速度,需要判断channel能否放下 后续event
- 将 putList中所有的event取出并放入channel queue
- 清除 takeList、putList
 
- 若 
- 
    doRollback:回滚,与下文是同个函数
channel –> sink
- doTake:将- event从- channel queue取出并放入- takeList
- doCommit:与上面的- doCommit是同个函数
- doRollback:与上面的- doRollback是同个函数- 判断 channel queue>takeList,则运行回滚,否则报空间不足无法回滚
- 将takeList中event放回到channel queue
- 清除 takeList、putList
 
- 判断 
Interceptor
Interceptor 也是组件之一,它可以修改/过滤 event,与 selector配合使用
public class CustomInterceptor implements Interceptor {
    private List<Event> list;
    public void initialize() {
        list = new LinkedList<Event>();
    }
    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String content = new String(event.getBody());
        // selector 需要配置 type  header
        if(content.contains("bejson")){
            headers.put("type","bejson");
        }else{
            headers.put("type","other");
        }
        return event;
    }
    public List<Event> intercept(List<Event> events) {
        list.clear();
        for(Event event :events){
            list.add(intercept(event));
        }
        return list;
    }
    public static class Builder implements Interceptor.Builder{
        public Interceptor build() {
            return new CustomInterceptor();
        }
        public void configure(Context context) {
        }
    }
    public void close() {
    }
}
需要三个 agent
- agent1
ng1.sources = r1
ng1.sinks = k1 k2
ng1.channels = c1 c2
ng1.sources.r1.type = netcat
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44444
ng1.sources.r1.interceptors = i1
ng1.sources.r1.interceptors.i1.type = com.flume.bidata.flume.CustomInterceptor$Builder
ng1.sources.r1.selector.type = multiplexing
ng1.sources.r1.selector.header = type   # type 字段是根据 CustomInterceptor 定义
ng1.sources.r1.selector.mapping.bejson = c1
ng1.sources.r1.selector.mapping.other = c2
ng1.channels.c1.type = memory
ng1.channels.c2.type = memory
ng1.sinks.k1.type = avro
ng1.sinks.k1.hostname = danner000
ng1.sinks.k1.port = 44445
ng1.sinks.k2.type = avro
ng1.sinks.k2.hostname = danner000
ng1.sinks.k2.port = 44446
ng1.sources.r1.channels = c1 c2
ng1.sinks.k1.channel = c1
ng1.sinks.k2.channel = c2
- agent2
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1
ng1.sources.r1.type = avro
ng1.sources.r1.bind = danner000
ng1.sources.r1.port = 44445
ng1.channels.c1.type = memory
ng1.sinks.k1.type = logger
ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1
- agent3
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1
ng1.sources.r1.type = avro
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44446
ng1.channels.c1.type = memory
ng1.sinks.k1.type = logger
ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1
三个agent 实现了根据 event 内容是否包含bejson 来划分 event 最终存储点。
