http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.15.1/FlumeDeveloperGuide.html
本节结合官方文档和源码来尝试定义自定义组件
源码
首先来看看源码是如何组件是如何定义:
TaildirSource
Taildir Source
源码是 org.apache.flume.source.taildir.TaildirSource
,大致有几个操作
- 继承
AbstractSource
并实现Configurable
、PollableSource
- 定义私有属性接收
Source
定义时属性:filePaths
、positionFilePath
、batchSize
… configure
函数初始化私有属性process
函数处理真正的流程
loggerSink
` logger Sink ` 源码是 org.apache.flume.sink.LoggerSink
,操作类似
- 集成
AbstractSink
并实现Configurable
- 定义私有属性接收
Sink
定义时属性:channel
、name
… process
函数处理真正的流程,包含事务提交
HostInterceptor
Host Interceptor
源码是 org.apache.flume.interceptor.HostInterceptor
,类似但有几点变化
- 必须要有内部类
Builder
Builder
中获取属性和创建Interceptor
intercept
函数处理流程
Source
自定义Source
:随机生成 0-100
,并可以加上自定义的前后缀
public class CustomSource extends AbstractSource implements Configurable, PollableSource{
private String prefix;
private String suffix;
public void configure(Context context) {
prefix = context.getString("prefix","danner");
suffix = context.getString("suffix");
}
public Status process() throws EventDeliveryException {
Status status = null;
try {
// 创建 Event
SimpleEvent event = new SimpleEvent();
// event 添加内容
event.setBody(new String(prefix + new Random().nextInt(100) + suffix).getBytes());
// event 向后传递到 Channel
getChannelProcessor().processEvent(event);
// 处理成功
status = Status.READY;
}catch (Throwable t){
t.printStackTrace();
status = Status.BACKOFF;
}
// 防止太快
try {
Thread.sleep(2*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return status;
}
......
conf
文件配置成 logger
,但 source
配置有变化:type
不是自带需带类名,后面直接代码集成到 Flume
源码后可简写
ng1.sources.r1.type = com.flume.bidata.flume.CustomSource
ng1.sources.r1.suffix = source
执行后输出
Event: { headers:{} body: 64 61 6E 6E 65 72 39 33 73 6F 75 72 63 65 danner93source }
Sink
类似的简单自定义 Sink
:加前后缀输出
public class CustomSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
.getLogger(CustomSink.class);
private String prefix;
private String suffix;
public void configure(Context context) {
prefix = context.getString("prefix","hz");
suffix = context.getString("suffix","");
}
public Status process() throws EventDeliveryException {
Status status = null;
Event event = null;
// 得到输入的 channel
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
try {
transaction.begin();
// 获取 event
while (true){
event = channel.take();
if (event != null){
break;
}
}
// 输出 event
logger.info("custom sink: " + new String(event.getBody()));
// 事务提交
transaction.commit();
status = Status.READY;
}catch (Throwable t){
t.printStackTrace();
// event 处理失败,回滚
transaction.rollback();
status = Status.BACKOFF;
}finally {
// 关闭
transaction.close();
}
return status;
}
}
conf
配置 netcat
输入
ng1.sinks.k1.type = com.flume.bidata.flume.CustomSink
ng1.sinks.k1.suffix = end
执行输出
[INFO - com.flume.bidata.flume.CustomSink.process(CustomSink.java:39)] hzxiend
事务
上节出现事务,本节来看看 Flume
中 事务是如何在 agent
工作,以 MemoryChannel
为例
private LinkedBlockingDeque<Event> takeList; // sink 从 channel 取出 event 的记录,回滚使用
private LinkedBlockingDeque<Event> putList; // source 放入 channel 的 event
定义 putList
批量接收 Source
传递过来 event
;takeList
批量发送 event
到 Sink
source —> putList —> queue —> takeList —> sink
source –> channel
-
doPut
:由ChannelProcessor
发送event
,Channel
将event
放入putList
并检测队列是否溢出 -
doCommit
:比较特殊涉及takeList
和putList
操作- 若
takeList.size() > putList.size()
表示取的速度跟不上放的速度,需要判断channel
能否放下 后续event
- 将
putList
中所有的event
取出并放入channel queue
- 清除
takeList
、putList
- 若
-
doRollback
:回滚,与下文是同个函数
channel –> sink
doTake
:将event
从channel queue
取出并放入takeList
doCommit
:与上面的doCommit
是同个函数doRollback
:与上面的doRollback
是同个函数- 判断
channel queue
>takeList
,则运行回滚,否则报空间不足无法回滚 - 将
takeList
中event
放回到channel queue
- 清除
takeList
、putList
- 判断
Interceptor
Interceptor
也是组件之一,它可以修改/过滤 event
,与 selector
配合使用
public class CustomInterceptor implements Interceptor {
private List<Event> list;
public void initialize() {
list = new LinkedList<Event>();
}
public Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();
String content = new String(event.getBody());
// selector 需要配置 type header
if(content.contains("bejson")){
headers.put("type","bejson");
}else{
headers.put("type","other");
}
return event;
}
public List<Event> intercept(List<Event> events) {
list.clear();
for(Event event :events){
list.add(intercept(event));
}
return list;
}
public static class Builder implements Interceptor.Builder{
public Interceptor build() {
return new CustomInterceptor();
}
public void configure(Context context) {
}
}
public void close() {
}
}
需要三个 agent
agent1
ng1.sources = r1
ng1.sinks = k1 k2
ng1.channels = c1 c2
ng1.sources.r1.type = netcat
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44444
ng1.sources.r1.interceptors = i1
ng1.sources.r1.interceptors.i1.type = com.flume.bidata.flume.CustomInterceptor$Builder
ng1.sources.r1.selector.type = multiplexing
ng1.sources.r1.selector.header = type # type 字段是根据 CustomInterceptor 定义
ng1.sources.r1.selector.mapping.bejson = c1
ng1.sources.r1.selector.mapping.other = c2
ng1.channels.c1.type = memory
ng1.channels.c2.type = memory
ng1.sinks.k1.type = avro
ng1.sinks.k1.hostname = danner000
ng1.sinks.k1.port = 44445
ng1.sinks.k2.type = avro
ng1.sinks.k2.hostname = danner000
ng1.sinks.k2.port = 44446
ng1.sources.r1.channels = c1 c2
ng1.sinks.k1.channel = c1
ng1.sinks.k2.channel = c2
agent2
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1
ng1.sources.r1.type = avro
ng1.sources.r1.bind = danner000
ng1.sources.r1.port = 44445
ng1.channels.c1.type = memory
ng1.sinks.k1.type = logger
ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1
agent3
ng1.sources = r1
ng1.sinks = k1
ng1.channels = c1
ng1.sources.r1.type = avro
ng1.sources.r1.bind = 192.168.22.147
ng1.sources.r1.port = 44446
ng1.channels.c1.type = memory
ng1.sinks.k1.type = logger
ng1.sources.r1.channels = c1
ng1.sinks.k1.channel = c1
三个agent
实现了根据 event
内容是否包含bejson
来划分 event
最终存储点。