Flume 自定义组件

Posted by danner on May 30, 2018

http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1/FlumeDeveloperGuide.html

本节结合官方文档和源码来尝试定义自定义组件

源码

首先来看看源码是如何组件是如何定义:

TaildirSource

Taildir Source 源码是 org.apache.flume.source.taildir.TaildirSource,大致有几个操作

  • 继承 AbstractSource 并实现 ConfigurablePollableSource
  • 定义私有属性接收 Source 定义时属性:filePathspositionFilePathbatchSize
  • configure 函数初始化私有属性
  • process 函数处理真正的流程

loggerSink

` logger Sink ` 源码是 org.apache.flume.sink.LoggerSink,操作类似

  • 集成 AbstractSink并实现 Configurable
  • 定义私有属性接收 Sink 定义时属性:channelname
  • process 函数处理真正的流程,包含事务提交

HostInterceptor

Host Interceptor源码是 org.apache.flume.interceptor.HostInterceptor,类似但有几点变化

  • 必须要有内部类 Builder
  • Builder 中获取属性和创建 Interceptor
  • intercept 函数处理流程

Source

自定义Source :随机生成 0-100,并可以加上自定义的前后缀

public class CustomSource extends AbstractSource implements Configurable, PollableSource{
    private String prefix;
    private String suffix;

    public void configure(Context context) {
        prefix = context.getString("prefix","danner");
        suffix = context.getString("suffix");
    }
    public Status process() throws EventDeliveryException {
        Status status = null;

        try {
            // 创建 Event
            SimpleEvent event = new SimpleEvent();

            // event 添加内容
            event.setBody(new String(prefix + new Random().nextInt(100) + suffix).getBytes());

            // event 向后传递到 Channel
            getChannelProcessor().processEvent(event);

            // 处理成功
            status = Status.READY;
        }catch (Throwable t){
            t.printStackTrace();
            status = Status.BACKOFF;
        }

        // 防止太快
        try {
            Thread.sleep(2*1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return status;
    }
    ......

conf 文件配置成 logger,但 source 配置有变化:type 不是自带需带类名,后面直接代码集成到 Flume 源码后可简写

ng1.sources.r1.type = com.flume.bidata.flume.CustomSource
ng1.sources.r1.suffix = source

执行后输出

Event: { headers:{} body: 64 61 6E 6E 65 72 39 33 73 6F 75 72 63 65 danner93source }

Sink

类似的简单自定义 Sink:加前后缀输出

public class CustomSink extends AbstractSink implements Configurable {
    private static final Logger logger = LoggerFactory
            .getLogger(CustomSink.class);
    private String prefix;
    private String suffix;

    public void configure(Context context) {
        prefix = context.getString("prefix","hz");
        suffix = context.getString("suffix","");
    }
    public Status process() throws EventDeliveryException {

        Status status = null;
        Event event = null;
        // 得到输入的 channel
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        try {
            transaction.begin();
            // 获取 event
            while (true){
                event = channel.take();
                if (event != null){
                    break;
                }
            }
            // 输出 event
            logger.info("custom sink: " + new String(event.getBody()));
			// 事务提交
            transaction.commit();
            status = Status.READY;
        }catch (Throwable t){
            t.printStackTrace();
            // event 处理失败,回滚
            transaction.rollback();
            status = Status.BACKOFF;
        }finally {
            // 关闭
            transaction.close();
        }
        return status;
    }
 }

conf 配置 netcat 输入

ng1.sinks.k1.type = com.flume.bidata.flume.CustomSink
ng1.sinks.k1.suffix = end

执行输出

[INFO - com.flume.bidata.flume.CustomSink.process(CustomSink.java:39)] hzxiend

事务

上节出现事务,本节来看看 Flume事务是如何在 agent工作,以 MemoryChannel 为例

private LinkedBlockingDeque<Event> takeList; // sink 从 channel 取出 event 的记录,回滚使用
private LinkedBlockingDeque<Event> putList;	 // source 放入 channel 的 event

定义 putList 批量接收 Source 传递过来 eventtakeList批量发送 eventSink

source —> putList —> queue —> takeList —> sink

source –> channel

  • doPut:由 ChannelProcessor发送 eventChannelevent放入 putList并检测队列是否溢出

  • doCommit:比较特殊涉及 takeListputList 操作

    • takeList.size() > putList.size()表示取的速度跟不上放的速度,需要判断 channel 能否放下 后续event
    • putList 中所有的 event 取出并放入 channel queue
    • 清除 takeListputList
  • doRollback:回滚,与下文是同个函数

channel –> sink

  • doTake:将 eventchannel queue 取出并放入 takeList
  • doCommit:与上面的 doCommit是同个函数
  • doRollback:与上面的doRollback是同个函数
    • 判断 channel queue > takeList,则运行回滚,否则报空间不足无法回滚
    • takeListevent放回到 channel queue
    • 清除 takeListputList

Interceptor

Interceptor 也是组件之一,它可以修改/过滤 event,与 selector配合使用

public class CustomInterceptor implements Interceptor {
    private List<Event> list;
    public void initialize() {
        list = new LinkedList<Event>();
    }

    public Event intercept(Event event) {
        Map<String, String> headers = event.getHeaders();
        String content = new String(event.getBody());
        // selector 需要配置 type  header
        if(content.contains("bejson")){
            headers.put("type","bejson");
        }else{
            headers.put("type","other");
        }
        return event;
    }

    public List<Event> intercept(List<Event> events) {
        list.clear();
        for(Event event :events){
            list.add(intercept(event));
        }
        return list;
    }

    public static class Builder implements Interceptor.Builder{
        public Interceptor build() {
            return new CustomInterceptor();
        }
        public void configure(Context context) {

        }
    }
    public void close() {
    }
}

需要三个 agent

  • agent1
ng1.sources = r1
ng1.sinks = k1 k2
ng1.channels = c1 c2

ng1.sources.r1.type = netcat
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44444

ng1.sources.r1.interceptors = i1
ng1.sources.r1.interceptors.i1.type = com.flume.bidata.flume.CustomInterceptor$Builder

ng1.sources.r1.selector.type = multiplexing
ng1.sources.r1.selector.header = type   # type 字段是根据 CustomInterceptor 定义
ng1.sources.r1.selector.mapping.bejson = c1
ng1.sources.r1.selector.mapping.other = c2

ng1.channels.c1.type = memory
ng1.channels.c2.type = memory

ng1.sinks.k1.type = avro
ng1.sinks.k1.hostname = danner000
ng1.sinks.k1.port = 44445
ng1.sinks.k2.type = avro
ng1.sinks.k2.hostname = danner000
ng1.sinks.k2.port = 44446

ng1.sources.r1.channels = c1 c2
ng1.sinks.k1.channel = c1
ng1.sinks.k2.channel = c2
  • agent2
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1

ng1.sources.r1.type = avro
ng1.sources.r1.bind = danner000
ng1.sources.r1.port = 44445

ng1.channels.c1.type = memory

ng1.sinks.k1.type = logger

ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1
  • agent3
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1

ng1.sources.r1.type = avro
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44446

ng1.channels.c1.type = memory

ng1.sinks.k1.type = logger

ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1

三个agent 实现了根据 event 内容是否包含bejson 来划分 event 最终存储点。

参考资料

sql flume

Flume MemoryChannel源码详解