Flume

概述

Flume 分布式的日志收集系统 官网手册

Frame 1

  • agent:收集日志发送到目的地(运行在日志收集端的一个Java进程),包括三个核心组件:
    • source 收集日志(数据临时存放在channel中)
      • 可处理各种类型各种格式的日志数据
      • 例如日志类型:avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义
    • channel 缓冲数据(数据只有在sink发送成功之后才会被删除)
      • 例如存放在:memory、jdbc、file、自定义
    • sink 发送日志到目的地
      • 例如目的地:hdfs、logger、avro、thrift、ipc、file、null、hbase、solr、自定义
    • 特殊:interceptor 拦截器(在日志进入到source前,包装清洗过滤event)
      • chain形式:可对一个source指定多个拦截器,按先后顺序依次处理
      • 官方已有的拦截器:Timestamp/Host/Static/Regex Filtering/Regex Extractor/...
  • event:在整个数据传输过程中,流动的是event
  • 注意:
    • 事务保证在event级别
    • flume支持多级agent
    • flume支持扇入(fan-in),扇出(fan-out) Frame 2 Frame 3

使用示例

  1. 安装:直接下载解压即可
  2. 配置:在$FLUME_HOME/conf下添加一个配置文件(例如:flume-conf-test1.properties

     # agent1表示代理名称
     agent1.sources=source1
     agent1.sinks=sink1
     agent1.channels=channel1
    
     # 配置source1
     # 1. Spooling Directory是监控指定文件夹中新文件的变化
     # 一旦新文件出现,就解析该文件内容,然后写入到channle
     # 写入完成后,标记该文件已完成或者删除该文件
     # 2. 添加Timestamp Interceptor
     # 在event的header中添加一个key叫timestamp,value为当前的时间戳
     agent1.sources.source1.type=spooldir
     agent1.sources.source1.spoolDir=/home/hadoop/input/flume
     agent1.sources.source1.channels=channel1
     agent1.sources.source1.fileHeader = false
     agent1.sources.source1.interceptors = i1
     agent1.sources.source1.interceptors.i1.type = timestamp
    
     # 配置sink1
     agent1.sinks.sink1.type=hdfs
     agent1.sinks.sink1.hdfs.path=hdfs://cj.storm:9000/output/flume
     agent1.sinks.sink1.hdfs.fileType=DataStream
     agent1.sinks.sink1.hdfs.writeFormat=TEXT
     # agent1.sinks.sink1.hdfs.rollInterval=0
     # agent1.sinks.sink1.hdfs.rollSize=10485760
     agent1.sinks.sink1.channel=channel1
     agent1.sinks.sink1.hdfs.filePrefix=%Y-%m-%d
    
     # 配置channel1
     agent1.channels.channel1.type=file
     agent1.channels.channel1.checkpointDir=/home/hadoop/input/flume_tmp/checkpoint
     agent1.channels.channel1.dataDirs=/home/hadoop/input/flume_tmp/data
    
  3. 测试

     # 1. 创建被监控目录
     > mkdir -p /home/hadoop/input/flume
    
     # 2. 启动flume agent1
     # -n 指定agent名称
     # -c 指定配置文件目录
     # -f 指定配置文件
     > bin/flume-ng agent -n agent1 -c conf -f conf/flume-conf-test1.properties \
     -Dflume.root.logger=DEBUG,console
    
     # 3. 放入测试文件
     # 4. 查看运行结果
     > hadoop fs -lsr /output/flume