Brokers: A Tutorial Introduction

For this introduction we’ll start a stand-alone broker. First, we’ll need an instance of Etcd running:

$ GO111MODULE=on go install github.com/etcd-io/etcd
$ ~/go/bin/etcd
2019-10-21 11:57:29.465501 I | etcdmain: etcd Version: 3.3.17
2019-10-21 11:57:29.465533 I | etcdmain: Git SHA: Not provided (use ./build instead of go build)
2019-10-21 11:57:29.465536 I | etcdmain: Go Version: go1.13
2019-10-21 11:57:29.465538 I | etcdmain: Go OS/Arch: linux/amd64
... (leave this running in a tab) ...

Next we’ll install the gazette broker and the gazctl command-line tool:

$ GO111MODULE=on go install go.gazette.dev/core/cmd/gazette
$ GO111MODULE=on go install go.gazette.dev/core/cmd/gazctl

Create a local directory which will stand in for a BLOB store, and start a broker:

$ mkdir -p fragment-store

$ ~/go/bin/gazette serve --broker.port 8080 --broker.file-root fragment-store/
INFO[0000] broker configuration                          buildDate=unknown config="&{{{{local}   8080} 1024 fragment-store/} {{http://localhost:2379 20s} /gazette/cluster} {info text} {}}" version=development
INFO[0000] starting broker                               endpoint="http://roland:8080" id=busy-walrus zone=local
INFO[0000] solved for maximum assignment                 assignments=0 desired=0 dur="29.044µs" hash=15853963547446721567 items=0 lastHash=0 members=1
... (leave this running in a tab) ...

Apply a journal specification for us to work with:

$ ~/go/bin/gazctl journals apply <<EOF
name: example/journal
replication: 1
labels:
- name: app.gazette.dev/message-type
  value: TestMessage
- name: app.gazette.dev/region
  value: local
- name: app.gazette.dev/tag
  value: demo
- name: content-type
  value: application/x-ndjson
fragment:
  length: 131072
  compression_codec: SNAPPY
  stores:
  - file:///
  refresh_interval: 1m0s
EOF
INFO[0000] successfully applied                          revision=8

Now let’s issue our first append request.

$ curl -X PUT --data-binary @- http://localhost:8080/example/journal << EOF
{"Msg": "Hello, Gazette!"}
{"Msg": "See you later alligator"}
EOF

We can read our written content.

$ curl http://localhost:8080/example/journal
{"Msg": "Hello, Gazette!"}
{"Msg": "See you later alligator"}

Read requests take an offset (which defaults to 0).

$ curl "http://localhost:8080/example/journal?offset=16"
Gazette!"}
{"Msg": "See you later alligator"}

Wait a tick, that’s not valid JSON. What happened?

Well, journals are byte oriented, which means that even though we happened to write tidy JSON payloads, the brokers see journals as simply a sequence of bytes. Thus offsets are always byte offsets. A key take-away is that message formatting and representation is a concern of the client, and not of the broker. The broker doesn’t care if journals contain lines of text, streaming video, binary digits of PI, /dev/urandom, or anything else.

That’s not to say that journals aren’t eminently suited to JSON, Protobuf, or other delimited formats, however! The rule of thumb is that, so long as clients produce properly delimited sequences of serialized messages, the journal byte-stream in its entirety will be a well-formed stream of messages (because individual appends are atomic, and the broker will never interleave them).

Concurrent Appends

Let’s verify the broker properly handles concurrent appends by issuing a bunch of raced requests (& tells the shell to start each command in the background).

$ for i in {1..20}
do
  DATA='{"Msg": "Race!", "N": '${i}$'}\n'
        curl -X PUT --data-binary "$DATA" http://localhost:8080/example/journal &
done && wait
[1] 9858
[2] 9859
[3] 9860
[1]   Done                    curl -X PUT --data-binary "$DATA" http://localhost:8080/example/journal
[2]   Done                    curl -X PUT --data-binary "$DATA" http://localhost:8080/example/journal
[4]   Done                    curl -X PUT --data-binary "$DATA" http://localhost:8080/example/journal

We expect that our raced messages landed in the journal intact. Let’s verify by piping to jq, which will error if it encounters invalid JSON. We definitely see that our appends were sequenced into the journal in arbitrary order.

$ curl -s http://localhost:8080/example/journal | jq -c '.'
{"Msg":"Hello, Gazette!"}
{"Msg":"See you later alligator"}
{"Msg":"Race!","N":2}
{"Msg":"Race!","N":1}
{"Msg":"Race!","N":13}
{"Msg":"Race!","N":8}
{"Msg":"Race!","N":7}
{"Msg":"Race!","N":5}
{... etc ...}

Of course, this is all running off of a stand-alone broker. How do we ensure this total ordering in the general case, where we have lots of brokers handling requests from lots of clients?

Briefly, at any time a given journal has exactly one broker which is coordinating every append to that journal. The choice of which broker is determined via a distributed assignment algorithm running atop Etcd. Other brokers in the cluster will proxy append requests to the current primary on the client’s behalf.

One implication is that every append to a journal must pass through an assigned broker (and usually multiple such brokers, spanning availability zones, which together make up the journal’s replication peerset). Collectively, a distributed system cannot append to a journal faster than those brokers can handle, no matter how many other brokers may exist. Journals are thus the unit of scaling for Gazette, and higher write volumes are accommodated by balancing across larger numbers of journals as well as brokers. We’ll see how to do this a bit later.

Don’t worry, though: journals are still plenty fast. For storage efficiency it’s usually a good idea to have Gazette compress journals on your behalf, and in practice the bottleneck of appending to a journal tends to be how quickly Snappy or Gzip can run.

Streaming Reads

We can also use blocking reads to have journal content streamed to us as it commits. Here we use offset=-1 to tell the broker we want to begin reading from the current write head. Note that curl and jq will run until we Ctrl-C them.

$ curl -sN "http://localhost:8080/example/journal?block=true&offset=-1" | jq -c '.'

Try appending to the journal. Notice how our curl updates with each journal write: the broker is pushing new content to us over a singled long-lived HTTP response.

Brokers have no notion of subscriptions, consumer queues, or other state aside from that which serves an active read stream. It’s on readers to track the offset they’ve read through and, when their stream must eventually be restarted, to supply that offset to the broker. While this may appear tedious it’s important for the construction of correct, stateful readers with exactly-once processing semantics that they “own” their consumption offsets. When building Gazette consumer framework applications, this is managed on your behalf.

gRPC API

As we’ve seen, brokers present journals over an HTTP API using familiar GET and PUT verbs. One callout is that journals are natively presented over a gRPC service, and what we’re actually interacting with here is an HTTP gateway that brokers offer, wrapping the gRPC Journal service.

The HTTP gateway is handy for building simple clients or reading journals from a web browser, but at high volumes in production a native gRPC client should be used instead (such as the Gazette Go client).

Gazette also offers a fully-featured tool gazctl which can often make quick work of efficiently integrating legacy or Gazette-unaware applications.

Gazctl: Gazette’s CLI Tool

Gazctl is a command-line tool for interacting with a Gazette cluster. Most anything you can do with Gazette, you can do from gazctl.

Gazctl can be directly go install’d. Run it without arguments, or run any sub-command with the --help flag for detailed documentation on the tool’s capabilities and usage.

We’ll use gazctl going forward for the rest of this tutorial. Gazctl understands the BROKER_ADDRESS environment variable, or we can create an optional configuration file at $HOME/.config/gazette/gazctl.ini.

$ mkdir -p ~/.config/gazette/ && cat > ~/.config/gazette/gazctl.ini << EOF
[journals.Broker]
Address = http://localhost:8080
EOF

We can append to our journal and stream its content from gazctl.

$ gazctl journals append -l name=example/journal << EOF
{"Msg": "Hello, Gazctl!"}
EOF
$ gazctl journals read -l name=example/journal --block
{"Msg": "Hello, Gazctl!"}

The simple examples in this tutorial belie how powerful and expressive the read, append, and other sub-commands really are. Be sure to look over their documentation.

Fragments

As setup for this section, let’s use gazctl to write a message with the current date every second.

$ while true; do sleep 1 && echo '{"Msg": "'$(date)'"}' ; done | \
        gazctl journals append -l name=example/journal --framing=lines

Now poke at read internals a bit by enabling debug logging. We see: - That our --tail offset of -1 was resolved to an explicit offset 41172, - That offsets increment with each chunk of read content, and - Each chunk references a fragment that its offset falls within.

$ gazctl journals read -l name=example/journal --tail --block --log.level=debug
INFO[0000] read started                                  journal=example/journal offset=0
DEBU[0000] read is ready                                 fragment.Begin=14 fragment.End=41215 fragment.URL= journal=example/journal offset=41172
{"Msg": "Mon 29 Jul 2019 11:34:29 PM EDT"}
DEBU[0001] read is ready                                 fragment.Begin=14 fragment.End=41258 fragment.URL= journal=example/journal offset=41215
{"Msg": "Mon 29 Jul 2019 11:34:30 PM EDT"}
DEBU[0002] read is ready                                 fragment.Begin=14 fragment.End=41301 fragment.URL= journal=example/journal offset=41258
{"Msg": "Mon 29 Jul 2019 11:34:31 PM EDT"}

Gazette uses fragments to describe byte-ranges of a journal, formally defined by a (journal-name, begin-offset, end-offset, and SHA1-sum). A constraint of fragments is that their [begin, end) byte spans never subdivide a client append: fragments contain only whole appends, and if those appends each consist of properly delimited messages, then so does the fragment.

A fragment file is a file of raw journal content, persisted by brokers under a naming scheme which incorporates the fragment definition itself. gazctl has a fragments command for listing fragments of our journal.

$ gazctl journals fragments -l name=example/journal
+-----------------+--------+---------+---------------+-----------------+-------------+
|     JOURNAL     | OFFSET | LENGTH  |   PERSISTED   |      SHA1       | COMPRESSION |
+-----------------+--------+---------+---------------+-----------------+-------------+
| example/journal |      0 | 43 B    | 8 minutes ago | 92a7ee0e4be7... | SNAPPY      |
| example/journal |     43 | 2.3 KiB | 7 minutes ago | e3c86a45d870... | SNAPPY      |
| example/journal |   2365 | 2.5 KiB | 6 minutes ago | c06eb3b317c0... | SNAPPY      |
| example/journal |   4902 | 2.5 KiB | 5 minutes ago | 6c651e79c7fe... | SNAPPY      |
| example/journal |   7482 | 2.5 KiB | 4 minutes ago | 1eceb1b39740... | SNAPPY      |
| example/journal |  10062 | 2.5 KiB | 3 minutes ago | 579e03e6202f... | SNAPPY      |
| example/journal |  12599 | 2.5 KiB | 2 minutes ago | f65f0b59f423... | SNAPPY      |
| example/journal |  15179 | 2.5 KiB | 1 minute ago  | 49b43a078397... | SNAPPY      |
| example/journal |  17759 | 2.5 KiB |               | fd560d3b9033... | SNAPPY      |
| example/journal |  20296 | 1.9 KiB |               | 6882ce2d56fd... | SNAPPY      |
+-----------------+--------+---------+---------------+-----------------+-------------+

For this demo, we created a local fragment-store directory into which fragments are persisted and which we can inspect. In a real deployment a BLOB store or mounted NAS array would be used instead (and we would also configure for much larger fragments). Fragments are named by their offsets and SHA1 sum using zero-padding and hex-encoding, which preserves the relative offset ordering of file names. Notice how the latest 6882ce fragment from our above listing doesn’t exist yet: it’s actively being appended to by the broker. We see all others have been persisted.

$ ls -lR fragment-store/
fragment-store/example/journal:
total 36
-rw------- 1 johnny johnny  61 Jul 30 12:42 0000000000000000-000000000000002b-92a7ee0e4be7a03fd1a3224055a9d6b7bbd6125e.sz
-rw------- 1 johnny johnny 339 Jul 30 12:43 000000000000002b-000000000000093d-e3c86a45d87051716caa2b6b5dcc7be77d4e21bb.sz
-rw------- 1 johnny johnny 365 Jul 30 12:44 000000000000093d-0000000000001326-c06eb3b317c0e42696e2dd2bc2e07a589b5c4bf7.sz
-rw------- 1 johnny johnny 370 Jul 30 12:45 0000000000001326-0000000000001d3a-6c651e79c7fe8847c41264e90efaea8c28cacf59.sz
-rw------- 1 johnny johnny 370 Jul 30 12:46 0000000000001d3a-000000000000274e-1eceb1b39740fd0accb1de8d4654fafa2f20db24.sz
-rw------- 1 johnny johnny 365 Jul 30 12:47 000000000000274e-0000000000003137-579e03e6202f1fe7ae7c9eaeaa6342b4cfb1483e.sz
-rw------- 1 johnny johnny 370 Jul 30 12:48 0000000000003137-0000000000003b4b-f65f0b59f423266775e4d8ba075e56adba296b1f.sz
-rw------- 1 johnny johnny 370 Jul 30 12:49 0000000000003b4b-000000000000455f-49b43a0783974daee3ff4265b1e418097de1472a.sz
-rw------- 1 johnny johnny 365 Jul 30 12:50 000000000000455f-0000000000004f48-fd560d3b90331733704959f1c0608b4c7c690537.sz

The gazctl fragments sub-command can provide further help with enumerating fragments, including outputting URLs pre-signed for GET access that can be integrated into batch pipelines. See its documentation for more discussion.

From an architecture perspective, fragments and their stores are at the heart of how brokers themselves are able to stay ephemeral, disposable, and fast to scale. A broker can begin serving journal reads as soon as it completes a fragment store file listing. Or a new broker can be integrated into a journal’s replication peer set by having that peer set close its current fragment and “roll” to a new & empty one at the current write head. No data migrations are ever required to “catch up” a broker. Nor must we ever wait for a faulted broker to restart and re-join the peer set, potentially gating new appends until it does: as soon as a broker has faulted, it’s immediately and permanently replaced. The broker’s one cardinal responsibility is to ensure that all fragments it previously replicated are promptly persisted to backing stores. Other than this, they can come and go freely. Brokers are cattle, not pets.

JournalSpecs

So far we’ve worked with a single journal, but an active production cluster will often serve hundreds of journals, thousands, or more. The list sub-command is used to list journals of the cluster and their current assigned brokers. Right now we have just one journal. Let’s fix that. But first, we’ll talk about interacting with JournalSpecs.

$ gazctl journals list --primary
+-----------------+-------------+
|      NAME       |   PRIMARY   |
+-----------------+-------------+
| example/journal | busy-walrus |
+-----------------+-------------+

As mentioned, Gazette relies on Etcd for consensus over distributed state of the system, such as current broker-to-journal assignments and the set of JournalSpecs.

Specs define the existence and desired behavior of entities in Gazette. If you come from Kubernetes, this will feel familiar and indeed Gazette uses specs in analogous ways. We can use gazctl to fetch our single JournalSpec in YAML form:

$ gazctl journals list --format yaml
name: example/journal
replication: 1
labels:
- name: app.gazette.dev/message-type
  value: TestMessage
- name: app.gazette.dev/region
  value: local
- name: app.gazette.dev/tag
  value: demo
- name: content-type
  value: application/x-ndjson
fragment:
  length: 131072
  compression_codec: SNAPPY
  stores:
  - file:///
  refresh_interval: 1m0s
  retention: 1h0m0s
  flush_interval: 1m0s
revision: 3

Gazctl has an apply sub-command for modifying JournalSpecs. Here we modify the above output to switch from SNAPPY to GZIP compression.

$ gazctl journals apply << EOF
name: example/journal
replication: 1
labels:
- name: app.gazette.dev/message-type
  value: TestMessage
- name: app.gazette.dev/region
  value: local
- name: app.gazette.dev/tag
  value: demo
- name: content-type
  value: application/x-ndjson
fragment:
  length: 131073
  compression_codec: GZIP
  stores:
  - file:///
  refresh_interval: 1m0s
  retention: 1h0m0s
  flush_interval: 1m0s
revision: 3
EOF
INFO[0000] successfully applied                          revision=5

Gazctl also has an edit sub-command which will be familiar to kubectl users, and is convenient shorthand for this common “list, modify, then apply” workflow.

$ gazctl journals edit -l name=example/journal

Finally, let’s use apply to create some new journals.

$ gazctl journals apply << EOF
name: foobar/
replication: 1
labels:
- name: content-type
  value: application/x-ndjson
- name: my-label
fragment:
  length: 4096
  compression_codec: GZIP
  stores:
  - file:///
  refresh_interval: 1m0s
  flush_interval: 1m0s
children:
  - name: foobar/part-000
  - name: foobar/part-001
  - name: foobar/part-002
EOF
INFO[0000] successfully applied                          revision=7

Our new journals now appear in list, assigned to our broker.

$ gazctl journals list --primary
+-----------------+-------------+
|      NAME       |   PRIMARY   |
+-----------------+-------------+
| example/journal | busy-walrus |
| foobar/part-000 | busy-walrus |
| foobar/part-001 | busy-walrus |
| foobar/part-002 | busy-walrus |
+-----------------+-------------+

Try starting another broker instance (this time, omitting the --broker.port flag). You’ll see that they re-assign journals to balance across available broker processes. Use gazctl journals list to confirm this. Reads and appends of any journal may be directed to any broker. If the request reaches a broker which cannot serve the request, it will proxy on our behalf to a broker that can.

Labels and Selectors

Since journals are the unit of scale for brokers, you’ll sometimes want to spread a collection of like records across many journals. This is commonly called a “topic”, where individual journals serve as partitions of the topic. Indeed, topics and partitioning are an essential strategy for building highly-scaled systems.

However, you’ll find that brokers have no APIs for managing topics. Nor is it a field of JournalSpecs. We arguably defined a grouping above by using a common foobar/ prefix, but this is purely convention: journal names are a flat key-space and the / has no special meaning. In fact, topics have no formal definition anywhere in the Gazette codebase. What gives?

A key insight is that a topic, and the data which is referred to by that topic, is really in the eye of the beholder. By way of example, we might have a collection of QueryLog events that we want to model as a topic. Suppose these are generated from serving in various regions, like us-east-1 or eu-west-1. Further suppose we have distinct web and mobile apps which both generate this event type. It becomes a bit messy to define what the topic(s) of QueryLogs should be. Is it all of them? Segregated by serving region? Or by whether it came from the web vs mobile app? Both? What about the query sub-type? It’s hard (or impossible!) to define precise topics ahead of time, without perfect knowledge of how they’ll ultimately be used. Fortunately we don’t have to.

Gazette uses a concept of labels to capture metadata of a journal, such as its message type, serving region, or anything else, and selectors for querying sets of journals by their labels. If you’re familiar with Kubernetes Labels and Selectors, their implementation in Gazette works almost identically.

When creating or editing a journal, best practice is to also populate labels for that journal. The choice of labels and values is arbitrary and teams can evolve their own meanings over time, but Gazette does provide conventions.

Having done this, it turns out that label selectors become an excellent way to define “topics” on a ex post facto basis. Each application that consumes QueryLogs can define for itself what dimensions are desired for its use-case, and by crafting an appropriate selector, then be assured of processing the set of partitions that exist now or in the future.

Gazette labels have one deviation from the Kubernetes implementation worth calling out, which is that labels are a multi-map: a label can be repeated with distinct values. A selector selects on any matched included value, and disallows a match on any excluded value.

We’ve actually been using label selectors this whole time via the -l flag. Every journal has two labels which are implicitly defined: name, which is the exact journal name, and prefix, which matches any prefix of the journal name that ends in /. Let’s close out this tutorial by trying out some examples.

$ gazctl journals list -l prefix=example/
+-----------------+
|      NAME       |
+-----------------+
| example/journal |
+-----------------+
$ gazctl journals list -l prefix=foobar/
+-----------------+
|      NAME       |
+-----------------+
| foobar/part-000 |
| foobar/part-001 |
| foobar/part-002 |
+-----------------+
$ gazctl journals list -l app.gazette.dev/message-type=TestMessage
+-----------------+
|      NAME       |
+-----------------+
| example/journal |
+-----------------+
$ gazctl journals list -l my-label
+-----------------+
|      NAME       |
+-----------------+
| foobar/part-000 |
| foobar/part-001 |
| foobar/part-002 |
+-----------------+
$ gazctl journals list -l "name in (example/journal, foobar/part-001)"
+-----------------+
|      NAME       |
+-----------------+
| example/journal |
| foobar/part-001 |
+-----------------+
$ gazctl journals list -l "prefix=foobar/, name not in (foobar/part-001)"
+-----------------+
|      NAME       |
+-----------------+
| foobar/part-000 |
| foobar/part-002 |
+-----------------+