全国咨询热线:400-009-1906

如何在 Go 语言中用 Beats 开发 Logstash 插件

上周,我写了一篇关于一个java程序员如何开发一个Logstash插件。但是,随着Packetbeat的出现,Logstash现在有了Beats的帮助将数据推送到Elasticsearch。Beats使用Go语言开发,这是对传统java开发者的另一个挑战。这周,我试着将我的Logstash Reddit插件移植成专用的Beat。这篇文章记录了我的发现(剧透:这可比Ruby简单多了).

更多精彩内容以及学习资料,尚学堂论坛bbs.bjsxt.com免费下载。

配置环境

在OSX系统上很容易安装GO的可执行文件:

 brew install go

虽然Java或Ruby (或者任何我知道的语言) 可以在本地文件系统的任何地方使用命令,,但是Go项目必须使用单一专用的地址,,并且在$GOPATH环境变量下可用。

创建项目

对于Logstash插件,Beats项目可以从模板创建。官方文档的说明十分简单。鉴于Go对文件系统上的位置的严格要求,只需按照以下说明生成一个新的即可使用的Go项目。

默认模板代码将在控制台中重复发送带增量计数器的事件:

 ./redditbeat -e -d "*" 2016/12/13 22:55:56.013362 beat.go:267: INFO   Home path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat]   Config path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat]   Data path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/data]   Logs path: [/Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/logs] 2016/12/13 22:55:56.013390 beat.go:177: INFO Setup Beat: redditbeat; Version: 6.0.0-alpha1 2016/12/13 22:55:56.013402 processor.go:43: DBG  Processors:  2016/12/13 22:55:56.013413 beat.go:183: DBG  Initializing output plugins 2016/12/13 22:55:56.013417 logp.go:219: INFO Metrics logging every 30s 2016/12/13 22:55:56.013518 output.go:167: INFO Loading template enabled. Reading template file:   /Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/redditbeat.template.json 2016/12/13 22:55:56.013888 output.go:178: INFO Loading template enabled for Elasticsearch 2.x. Reading template file:   /Users/i303869/projects/private/go/src/github.com/nfrankel/redditbeat/redditbeat.template-es2x.json 2016/12/13 22:55:56.014229 client.go:120: INFO Elasticsearch url: http: 2016/12/13 22:55:56.014272 outputs.go:106: INFO Activated elasticsearch as output plugin. 2016/12/13 22:55:56.014279 publish.go:234: DBG  Create output worker 2016/12/13 22:55:56.014312 publish.go:276: DBG  No output is defined to store the topology.   The server fields might not be filled. 2016/12/13 22:55:56.014326 publish.go:291: INFO Publisher name: LSNM33795267A 2016/12/13 22:55:56.014386 async.go:63: INFO Flush Interval set to: 1s 2016/12/13 22:55:56.014391 async.go:64: INFO Max Bulk Size set to: 50 2016/12/13 22:55:56.014395 async.go:72: DBG  create bulk processing worker (interval=1s, bulk size=50) 2016/12/13 22:55:56.014449 beat.go:207: INFO redditbeat start running. 2016/12/13 22:55:56.014459 redditbeat.go:38: INFO redditbeat is running! Hit CTRL-C to stop it. 2016/12/13 22:55:57.370781 client.go:184: DBG  Publish: {   "@timestamp": "2016-12-13T22:54:47.252Z",   "beat": {     "hostname": "LSNM33795267A",     "name": "LSNM33795267A",     "version": "6.0.0-alpha1"   },   "counter": 1,   "type": "redditbeat" }

关于命令行参数:-e记录到标准err,而-d“*”启用所有调试选择器。有关参数的完整列表,请键入./redditbeat --help。

编码

Go代码位于.go文件中(令人惊讶...)在$ GOPATH / src文件夹的项目子文件夹中。

配置类型

第一个有趣的文件是config / config.go,它定义了一个结构来声明Beat的可能参数。至于前面的Logstash插件,让我们添加一个subreddit参数,并设置它的默认值:

 type Config struct {  Period time.Duration `config:"period"`  Subreddit string `config:"subreddit"` }  var DefaultConfig = Config {  Period: 15 * time.Second,  Subreddit: "elastic", }

Beater Type

Beat本身的代码在beater / redditbean.go中找到。默认模板为Beat和三个函数创建一个struct:

  1. Beat构造函数—用来读取配置: 
     func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { ... } 

  2. Run 函数- 需要覆盖Beat的主要功能: 
     func (bt *Redditbeat) Run(b *beat.Beat) error { ... } 

  3. Stop 函数管理优雅关闭: 
     func (bt *Redditbeat) Stop() { ... } 

Note 1:在Go中没有明确的接口实现。实现了 interface 中的所有方法,即创建一个隐式继承关系. 出于写文档的目的,这是 Beater 接口:

 type Beater interface {  Run(b *Beat) error  Stop() }

因此,由于Beat结构实现了Run和Stop,它是一个Beater。

Note 2: 在Go中没有类的概念,所以方法不能在一个具体类型上声明。但是,它存在扩展函数的概念:可以添加行为到一个类型(在单个包中)的函数。它需要声明receiver 类型:这是在fun关键字和函数名之间完成的 - 这里是指Redditbeat类型(或者更准确地说,是一个指向Redditbeat类型的指针,但是这里有一个隐式转换)。

构造函数和Stop函数可以保持不变,无论什么特性都应该在Run函数中。在这种情况下,功能是调用Reddit REST API并为每个Reddit帖子发送一条消息。

最终代码如下所示:

 func (bt *Redditbeat) Run(b *beat.Beat) error {  bt.client = b.Publisher.Connect()  ticker := time.NewTicker(bt.config.Period)  reddit := "https://www.reddit.com/r/" + bt.config.Subreddit + "/.json"  client := &http.Client {}  for {   select {   case <-bt.done:    return nil   case <-ticker.C:   }   req, reqErr := http.NewRequest("GET", reddit, nil)   req.Header.Add("User-Agent", "Some existing header to bypass 429 HTTP")   if (reqErr != nil) {    panic(reqErr)   }   resp, getErr := client.Do(req)   if (getErr != nil) {    panic(getErr)   }   body, readErr := ioutil.ReadAll(resp.Body)   defer resp.Body.Close()   if (readErr != nil) {    panic(readErr)   }   trimmedBody := body[len(prefix):len(body) - len(suffix)]   messages := strings.Split(string(trimmedBody), separator)   for i := 0; i < len(messages); i ++ {    event := common.MapStr{     "@timestamp": common.Time(time.Now()),     "type":       b.Name,     "message":    "{" + messages[i] + "}",    }    bt.client.PublishEvent(event)   }  } }

这里是对最重要的几部分的解释:

  • line 4: 通过连接字符串创建Reddit REST URL,包括配置Subreddit参数。记住,它的默认值已在config.go文件中定义。
  • line 5: 引用httpClient类型
  • line 12: 创建新的HTTP请求。注意Go允许多个返回值。
  • line 13: 如果没有设置标准请求头,Reddit的API将返回429状态码。
  • line 14: Go标准错误不通过异常处理,而是随着常规返回值返回。根据Golang wiki:

    指示调用者的错误条件,应通过返回错误值来完成

  • line 15: panic() 函数类似于在Java中抛出异常, 被处理时推到栈顶。 有关详细信息,请查看相关文档。
  • line 17: 执行HTTP请求。
  • line 21: 将响应主体读入字节数组。
  • line 22: 关闭主体流。注意defer关键字:

    defer语句延迟函数的执行,直到环绕的函数返回。

  • line 26: 创建整个响应主体字节数组的切片 - 对数组的一部分的引用。实质上,它删除了前缀和后缀以保持相关的JSON值。之后将字节数组解析成JSON。
  • line 27: 分割切片以单独获取每个JSON片段。
  • line 29: 将消息创建为简单的字典结构。
  • line 34: 发送。

配置, 构建, 运行

默认配置参数可以在项目根目录下的redditbeat.yml文件中找到。请注意,redditbeat.full.yml中列出了其他常见的Beat参数,以及相关注释。

关于Beats的一个有趣的事情是,他们的消息可以直接发送到Elasticsearch或Logstash进行进一步处理。这在上述配置文件中配置。

 redditbeat:   period: 10s output.elasticsearch:   hosts: ["localhost:9200"] output.logstash:   hosts: ["localhost:5044"]   enabled: true 

此配置片段将每10秒循环运行Run方法,并将消息发送到在localhost上运行的Logstash实例在端口5044上。这可以在运行Beat时被覆盖(见下文)。

注意:为了使Logstash接受来自Beats的消息,必须安装Logstash Beat插件,并且必须为Beats配置Logstash的input:

 input {   beats {     port => 5044   } } 

要构建项目,请在项目的根目录中键入make。它将创建一个可以运行的可执行文件。

 ./redditbeat -e -E redditbeat.subreddit=java 

-E参数可以覆盖在的redditbeat.yml配置文件中找到的参数(见上文)。在这里,它设置subreddit读为“java”,而不是默认的“elastic”。

输出如下所示:

 2016/12/17 14:51:19.748329 client.go:184: DBG  Publish: {   "@timestamp": "2016-12-17T14:51:19.748Z",   "beat": {     "hostname": "LSNM33795267A",     "name": "LSNM33795267A",     "version": "6.0.0-alpha1"   },   "message": "{     /"kind/": /"t3/", /"data/": {       /"contest_mode/": false, /"banned_by/": null,        /"domain/": /"blogs.oracle.com/", /"subreddit/": /"java/", /"selftext_html/": null,        /"selftext/": /"/", /"likes/": null, /"suggested_sort/": null, /"user_reports/": [],        /"secure_media/": null, /"saved/": false, /"id/": /"5ipzgq/", /"gilded/": 0,        /"secure_media_embed/": {}, /"clicked/": false, /"report_reasons/": null,        /"author/": /"pushthestack/", /"media/": null, /"name/": /"t3_5ipzgq/", /"score/": 11,        /"approved_by/": null, /"over_18/": false, /"removal_reason/": null, /"hidden/": false,        /"thumbnail/": /"/", /"subreddit_id/": /"t5_2qhd7/", /"edited/": false,        /"link_flair_css_class/": null, /"author_flair_css_class/": null, /"downs/": 0,        /"mod_reports/": [], /"archived/": false, /"media_embed/": {}, /"is_self/": false,        /"hide_score/": false, /"spoiler/": false,        /"permalink/": /"/r/java/comments/5ipzgq/jdk_9_will_no_longer_bundle_javadb//",        /"locked/": false, /"stickied/": false, /"created/": 1481943248.0,        /"url/": /"https:       /"author_flair_text/": null, /"quarantine/": false,        /"title/": /"JDK 9 will no longer bundle JavaDB/", /"created_utc/": 1481914448.0,        /"link_flair_text/": null, /"distinguished/": null, /"num_comments/": 4,        /"visited/": false, /"num_reports/": null, /"ups/": 11     }   }",   "type": "redditbeat" } 

总结

奇怪的是,我发现开发一个Beat比Logstash插件更容易。Go语言是一种是更低级的语言,其中一些概念感觉有些奇怪(如隐式接口实现),但整个生态系统更简单,更像是一门人类语言。此外,Beats更多样化,因为它们可以发送到 Elasticsearch 和/或 Logstash。

更多精彩内容以及学习资料,尚学堂论坛bbs.bjsxt.com免费下载。