Last week, I wrote about how one could start developing one’s Logstash plugin coming from a Java developer background. However, with the acquisition of Packetbeat, Logstash now has help from Beats to push data to Elasticsearch. Beats are developed in Go, another challenge for traditional Java developers. This week, I tried porting my Logstash Reddit plugin to a dedicated Beat. This post documents my findings (spoiler: I found it much easier than with Ruby).
Setting up the environment
On OSX, installing the go
executable is easy as pie:
brew install go
While Java or Ruby (or any language I know for that matter) can reside anywhere on the local filesystem, Go projects must all be situated in one single dedicated location, available under the $GOPATH
environment variable.
Creating the project
As for Logstash plugins, Beats projects can be created from a template. The documentation to do that is quite straightforward. Given Go’s rigid requirements regarding location on the filesystem, just following instructions yields a new ready-to-use Go project.
The default template code will repeatedly send an event with an incremented counter in the console:
./redditbeat -e -d "*" 2016/12/13 22:55:56.013362 beat.go:267: INFO Home path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat] Config path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat] Data path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/data] Logs path: [/Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/logs] 2016/12/13 22:55:56.013390 beat.go:177: INFO Setup Beat: redditbeat; Version: 6.0.0-alpha1 2016/12/13 22:55:56.013402 processor.go:43: DBG Processors: 2016/12/13 22:55:56.013413 beat.go:183: DBG Initializing output plugins 2016/12/13 22:55:56.013417 logp.go:219: INFO Metrics logging every 30s 2016/12/13 22:55:56.013518 output.go:167: INFO Loading template enabled. Reading template file: /Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/redditbeat.template.json 2016/12/13 22:55:56.013888 output.go:178: INFO Loading template enabled for Elasticsearch 2.x. Reading template file: /Users/i303869/projects/private/go/src/github.com/ajavageek/redditbeat/redditbeat.template-es2x.json 2016/12/13 22:55:56.014229 client.go:120: INFO Elasticsearch url: http://localhost:9200 2016/12/13 22:55:56.014272 outputs.go:106: INFO Activated elasticsearch as output plugin. 2016/12/13 22:55:56.014279 publish.go:234: DBG Create output worker 2016/12/13 22:55:56.014312 publish.go:276: DBG No output is defined to store the topology. The server fields might not be filled. 2016/12/13 22:55:56.014326 publish.go:291: INFO Publisher name: LSNM33795267A 2016/12/13 22:55:56.014386 async.go:63: INFO Flush Interval set to: 1s 2016/12/13 22:55:56.014391 async.go:64: INFO Max Bulk Size set to: 50 2016/12/13 22:55:56.014395 async.go:72: DBG create bulk processing worker (interval=1s, bulk size=50) 2016/12/13 22:55:56.014449 beat.go:207: INFO redditbeat start running. 2016/12/13 22:55:56.014459 redditbeat.go:38: INFO redditbeat is running! Hit CTRL-C to stop it. 2016/12/13 22:55:57.370781 client.go:184: DBG Publish: { "@timestamp": "2016-12-13T22:54:47.252Z", "beat": { "hostname": "LSNM33795267A", "name": "LSNM33795267A", "version": "6.0.0-alpha1" }, "counter": 1, "type": "redditbeat" }
Regarding command-line parameters: -e
logs to the standard err, while -d "*"
enables all debugging selectors.
For the full list of parameters, type ./redditbeat --help
.
The code
Go code is located in .go
files (what a surprise…) in the project sub-folder of the $GOPATH/src
folder.
Configuration type
The first interesting file is config/config.go
which defines a struct
to declare possible parameters for the Beat.
As for the former Logstash plugin, let’s add a subreddit
parameter, and sets it default value:
type Config struct {
Period time.Duration `config:"period"`
Subreddit string `config:"subreddit"`
}
var DefaultConfig = Config {
Period: 15 * time.Second,
Subreddit: "elastic",
}
Beater type
Code for the Beat itself is found in beater/redditbean.go
.
The default template creates a struct
for the Beat and three functions:
- The Beat constructor - it reads the configuration:
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { ... }
- The
Run
function - should loop over the main feature of the Beat:func (bt *Redditbeat) Run(b *beat.Beat) error { ... }
- The
Stop
function handles graceful shutdown:func (bt *Redditbeat) Stop() { ... }
There’s no explicit interface implementation in Go.
Just implementing all methods from an interface creates an implicit inheritance relationship.
For documentation purposes, here’s the
|
Hence, since the Beat struct implements both Run
and Stop
, it is a Beater
.
There’s no concept of class in Go, so methods cannot be declared on a concrete type.
However, it exists the concept of extension functions:
functions that can add behavior to a type (inside a single package).
It needs to declare the receiver type:
this is done between the |
The constructor and the Stop
function can stay as they are, whatever feature should be developed must be in the Run
function.
In this case, the feature is to call the Reddit REST API and send a message for every Reddit post.
The final code looks like the following:
func (bt *Redditbeat) Run(b *beat.Beat) error {
bt.client = b.Publisher.Connect()
ticker := time.NewTicker(bt.config.Period)
reddit := "https://www.reddit.com/r/" + bt.config.Subreddit + "/.json" (1)
client := &http.Client {} (2)
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}
req, reqErr := http.NewRequest("GET", reddit, nil) (3)
req.Header.Add("User-Agent", "Some existing header to bypass 429 HTTP") (4)
if (reqErr != nil) { (5)
panic(reqErr) (6)
}
resp, getErr := client.Do(req) (7)
if (getErr != nil) {
panic(getErr)
}
body, readErr := ioutil.ReadAll(resp.Body) (8)
defer resp.Body.Close() (9)
if (readErr != nil) {
panic(readErr)
}
trimmedBody := body[len(prefix):len(body) - len(suffix)] (10)
messages := strings.Split(string(trimmedBody), separator) (11)
for i := 0; i < len(messages); i ++ {
event := common.MapStr{ (12)
"@timestamp": common.Time(time.Now()),
"type": b.Name,
"message": "{" + messages[i] + "}",
}
bt.client.PublishEvent(event) (13)
}
}
}
Here’s an explanation of the most important pieces:
1 | Create the Reddit REST URL by concatenating Strings, including the configuration Subreddit parameter.
Remember that its default value has been defined in the config.go file. |
2 | Get a reference on a new HTTP client type |
3 | Create a new HTTP request. Note that Go allows for multiple return values. |
4 | If no standard request header is set, Reddit’s API will return a 429 status code |
5 | Go standard errors are not handled through exceptions but are returned along regular returned values.
According to the Golang wiki:
Indicating error conditions to callers should be done by returning error value |
6 | The panic() function is similar to throwing an exception in Java, climbing up the stack until it’s handled.
For more information, check the relevant documentation. |
7 | Execute the HTTP request |
8 | Read the response body into a byte array |
9 | Close the body stream.
Note the defer keyword:
A defer statement defers the execution of a function until the surrounding function returns. |
10 | Create a slice - a reference to a part of an array, of the whole response body byte array. In essence, it removes the prefix and the suffix to keep the relevant JSON value. It would be overkill to parse the byte array into JSON. |
11 | Split the slice to get each JSON fragment separately |
12 | Create the message as a simple dictionary structure |
13 | Send it |
Configure, Build, and launch
Default configuration parameters can be found in the redditbeat.yml
file at the root of the project.
Note that additional common Beat parameters are listed in the redditbeat.full.yml
, along with relevant comments.
The interesting thing about Beats is that their messages can be sent directly to Elasticsearch or to Logstash for further processing. This is configured in the aforementioned configuration file.
redditbeat:
period: 10s
output.elasticsearch:
hosts: ["localhost:9200"]
output.logstash:
hosts: ["localhost:5044"]
enabled: true
This configuration snippet will loop over the Run
method every 10 seconds and send messages to the Logstash instance running on localhost on port 5044.
This can be overridden when running the Beat (see below).
For Logstash to accept messages from Beats, the Logstash Beat plugin must be installed and Logstash input must be configured for Beats: |
input {
beats {
port => 5044
}
}
To build the project, type make
at the project’s root.
It will create an executable that can be run.
./redditbeat -e -E redditbeat.subreddit=java
The -E
flag may override parameters found in the embedded redditbeat.yml
configuration file (see above).
Here, it sets the subreddit to be read to "java" instead of the default "elastic".
The output looks like the following:
2016/12/17 14:51:19.748329 client.go:184: DBG Publish: { "@timestamp": "2016-12-17T14:51:19.748Z", "beat": { "hostname": "LSNM33795267A", "name": "LSNM33795267A", "version": "6.0.0-alpha1" }, "message": "{ \"kind\": \"t3\", \"data\": { \"contest_mode\": false, \"banned_by\": null, \"domain\": \"blogs.oracle.com\", \"subreddit\": \"java\", \"selftext_html\": null, \"selftext\": \"\", \"likes\": null, \"suggested_sort\": null, \"user_reports\": [], \"secure_media\": null, \"saved\": false, \"id\": \"5ipzgq\", \"gilded\": 0, \"secure_media_embed\": {}, \"clicked\": false, \"report_reasons\": null, \"author\": \"pushthestack\", \"media\": null, \"name\": \"t3_5ipzgq\", \"score\": 11, \"approved_by\": null, \"over_18\": false, \"removal_reason\": null, \"hidden\": false, \"thumbnail\": \"\", \"subreddit_id\": \"t5_2qhd7\", \"edited\": false, \"link_flair_css_class\": null, \"author_flair_css_class\": null, \"downs\": 0, \"mod_reports\": [], \"archived\": false, \"media_embed\": {}, \"is_self\": false, \"hide_score\": false, \"spoiler\": false, \"permalink\": \"/r/java/comments/5ipzgq/jdk_9_will_no_longer_bundle_javadb/\", \"locked\": false, \"stickied\": false, \"created\": 1481943248.0, \"url\": \"https://blogs.oracle.com/java-platform-group/entry/deferring_to_derby_in_jdk\", \"author_flair_text\": null, \"quarantine\": false, \"title\": \"JDK 9 will no longer bundle JavaDB\", \"created_utc\": 1481914448.0, \"link_flair_text\": null, \"distinguished\": null, \"num_comments\": 4, \"visited\": false, \"num_reports\": null, \"ups\": 11 } }", "type": "redditbeat" }
Conclusion
Strangely enough, I found developing a Beat easier than a Logstash plugin. Go is more low-level and some concepts feel really foreign (like implicit interface implementation), but the ecosystem is much simpler - as the language is more recent. Also, Beats are more versatile, in that they can send to Elasticsearch and/or Logstash.