一、概述
“光说不练假把式。” 官网上的介绍多少让人迷迷糊糊的,各种高大上的词语仿佛让 NiFi 离我们越来越远。
实践是最好的老师。那就让我们试用一下 NiFi 吧!
二、安装
由于我的整个学习和使用过程都是在 Linux 下完成,所以,整个教程也是面向 Linux 用户的。
其他系统下的使用方法类似,如有其他系统的用户,那么还请有选择性的进行参考本教程。
NiFi 是免安装地,仅需从官网下载压缩包,然后解压,旋即完成了整个安装过程。
- 官网下载压缩包:nifi.apache.org/download,如何压缩包的选择参考文章 Apache NiFi 二进制包和源码包安装介绍
tar -xzf nifi-x.x.x-bin.tar.gz
命令解压 ,关于解命令烦请参考文章 Linux 下tar.gz、tar、bz2、zip 等解压缩、压缩命令小结- 解压结束即完成整个安装过程
三、配置
我们需要对 NiFi 的配置文件(存放在 [nifi_install_location]/config
)进行个性化配置。
由于目前我们只是简单的上手,所以大部分配置项都选择了默认的配置。我们仅对 NiFi 进行基本的配置。
nifi.properties
文件针对的是 NiFi 的配置,仅需做如下修改:
# web properties #
nifi.web.war.directory=./lib
nifi.web.http.host=192.168.203.7 # 设置成本机的 ip
nifi.web.http.port=9191
nifi.web.https.host=
nifi.web.https.port=
nifi.web.jetty.working.directory=./work/jetty
nifi.web.jetty.threads=200
其他文件在本教程场景下无需修改。
四、启动
通过 [nifi_install_location]/bin/nifi.sh
进行 start
、restart
、stop
、status
等操作。
./bin/nifi.sh start
启动 nifi。- 浏览器中访问
192.168.203.7:9191/nifi
(此处的ip与端口同你的配置项相关) - 即可出现 nifi 的操作界面
由于 nifi 加载配置信息较慢,稍作等待。
五、简单使用
设定一个这样的场景:
统计每次收到的消息中,单词出现的频次。WORD COUNT!!!
5.0 先修:操作面板与 Processor
熟悉操作面板的各个组件。如何添加、启动、停止、删除一个 Processor,如何对 Processor 进行基本配置?
对于上述问题,请自行在界面上进行尝试和练习。
官网上也提供了一些 Youtube 上的教学视频,请参考这里 nifi.apache.org/videos
5.1 涉及到的组件
- GenerateFlowFile
- ExecuteScript
- PutFile
5.2 部署组件
从 Processor 拖取上述组件到操作面板上,依照描述的数据流动方式将组件串起。
串联组件时,有时需要对组件进行关系的选择。即选择上一组件分发的正确/错误消息分发到下一组件。
同时,末端的组件你需要在它的 setting pannel 指定自己处理 Success、Failure。
对 Processor 的配置如下:
- GenerateFlowFile
- Profile -> Custom Text 填入需要统计单词频次的文章/内容
如:
Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic. Some of the high-level capabilities and objectives of Apache NiFi include:
- Scheduling -> Run Duration 调节至 2s,放缓数据生成的速度
- Profile -> Custom Text 填入需要统计单词频次的文章/内容
- ExecuteScript
- Profile -> ScriptEngine 选择 Groovy
- ScriptBody 填入如下 Groovy 代码:
// Code from http://funnifi.blogspot.com/2016/02/executescript-processor-replacing-flow.html
import org.apache.commons.io.IOUtils import java.nio.charset.* def flowFile = session.get() if(!flowFile) return flowFile = session.write(flowFile, {inputStream, outputStream -> def wordCount = [:] def tellTaleHeart = IOUtils.toString(inputStream, StandardCharsets.UTF_8) def words = tellTaleHeart.split(/(!|\?|-|\.|\"|:|;|,|\s)+/)*.toLowerCase() words.each { word -> def currentWordCount = wordCount.get(word) if(!currentWordCount) { wordCount.put(word, 1) } else { wordCount.put(word, currentWordCount + 1) } } def outputMapString = wordCount.inject("", {k,v -> k += "${v.key}: ${v.value}\n"}) Date latestdate = new Date(); outputStream.write(outputMapString.getBytes(StandardCharsets.UTF_8)) } as StreamCallback) Date date = new Date(); flowFile = session.putAttribute(flowFile, 'filename', 'telltale_heart_wordcount' + date.getTime()) session.transfer(flowFile, REL_SUCCESS)
- PutFile
- Profile -> Directory 填入存放结果的文件夹,如
/home/lbh/logs/result
- Profile -> Directory 填入存放结果的文件夹,如
启动各个 Processor 后,就能清晰地看到数据在 Processors 之间流动。
查看 PutFile 中设置的文件存放目录,能够看到存放着统计结果的文件 telltale_heart_wordcount