Part 1
I've been working on a storage engine for my database Hobbes, and as a result I've been spending a lot of time dealing with iterators. Hobbes at its core is a distributed key/value database, and its storage engine, affectionately termed XKS (for ExKeyStore, a stupid pun), is a multiversion Log-Structured Merge Tree (LSM Tree). That's a lot of words, but all it means is that XKS can store binary keys and values on disk and return them in sorted order when asked.
Let's keep our focus on the keys and conjure an iterator from the void.
defmodule KeyIterator do
defstruct [:table, :current_key]
def new(table, start_key) do
iterator = %KeyIterator{table: table, current_key: start_key}
case :ets.member(table, start_key) do
true -> iterator
false -> next(iterator)
end
end
def peek(%KeyIterator{current_key: key}) do
key
end
def next(%KeyIterator{table: table, current_key: key} = iterator) do
%{iterator | current_key: :ets.next(table, key)}
end
end
And now we can sprinkle on a bit of recursion and traverse our table.
def scan(iterator, acc \\ []) do
case KeyIterator.peek(iterator) do
key when is_binary(key) ->
iterator = KeyIterator.next(iterator)
scan(iterator, [key | acc])
:"$end_of_table" ->
Enum.reverse(acc)
end
end
table = :ets.new(:my_table, [:ordered_set])
:ets.insert(table, [{"a", nil}, {"b", nil}, {"c", nil}])
iterator = KeyIterator.new(table, "")
scan(iterator)
# ["a", "b", "c"]
Funny, this whole iterator thing seems like a waste. Couldn't we just scan the table directly?
def scan(table, prev, acc \\ []) do
case :ets.next(table, prev) do
key when is_binary(key) -> scan(iterator, key, [key | acc])
:"$end_of_table" -> Enum.reverse(acc)
end
end
scan(table, "")
# ["a", "b", "c"]
Much better! Nice and simple.
But it appears we may have lost something:
the iterator encapsulated all of the state needed to traverse the table.
We didn't have to tack on that extra prev key.
Perhaps that was a useful property?
Merge Ahead
Hey, this isn't some abstract thought experiment. We're building a database here! How do LSM Trees work, anyway?
A log-structured
MERGEtree takes a number of sorted runs andMERGESthem together into a single stream. This architecture allows for reduced write and space amplification in exchange for...
Alright, my eyes are already glazing over. But that merge thing sounds important. Let's try that.
def merge(table1, prev1, table2, prev2, acc) do
next1 = :ets.next(table1, prev1)
next2 = :ets.next(table2, prev2)
cond do
next1 == :"$end_of_table" and next2 == :"$end_of_table" ->
Enum.reverse(acc)
next1 == ":$end_of_table" ->
merge(table1, prev1, table2, next2, [next2 | acc])
next2 == ":$end_of_table" ->
merge(table1, next1, table2, prev2, [next1 | acc])
next1 <= next2 ->
merge(table1, next1, table2, prev2, [next1 | acc])
next2 <= next1 ->
merge(table1, prev1, table2, next2, [next2 | acc])
end
end
Well that's... not exactly beautiful, but it gets the job done. Certainly simpler than those pesky iterators. Not to mention the allocations! I think I'll stick to writing my traversals by hand.
It's okay if it's-
Now hold on a minute. Let's have a look at that description again.
A log-structured merge tree takes
a number of sorted runsand merges them together
Doesn't that mean any number of runs? Let's try three.
def merge(table1, prev1, table2, prev2, table3, prev3, acc) do
cond do
next1 == :"$end_of_table" and next2 == :"$end_of_table" and next3 == :"$end_of_table" ->
Enum.reverse(acc)
next2 == :"$end_of_table" and next3 == :"$end_of_table" ->
merge(table1, next1, table2, prev2, table3, prev3, [next1 | acc])
next1 == :"$end_of_table" and next3 == :"$end_of_table" ->
merge(table1, prev1, table2, next, table3, prev3, [next2 | acc])
next1 == :"$end_of_table" and next2 == :"$end_of_table" ->
merge(table1, prev1, table2, prev2, table3, next3, [next3 | acc])
next1 == :"$end_of_table" and next2 <= next3 ->
merge(table1, prev1, table2, next2, table3, prev3, [next2 | acc])
next1 == :"$end_of_table" and next3 <= next2 ->
merge(table1, prev1, table2, next2, table3, next3, [next3 | acc])
next2 == :"$end_of_table" and next1 <= next3 ->
merge(table1, next1, table2, prev2, table3, prev3, [next1 | acc])
next2 == :"$end_of_table" and next3 <= next1 ->
merge(table1, prev1, table2, prev2, table3, next3, [next3 | acc])
next3 == :"$end_of_table" and next2 <= next3 ->
merge(table1, prev1, table2, next2, table3, prev3, [next2 | acc])
next3 == :"$end_of_table" and next3 <= next2 ->
merge(table1, prev1, table2, prev2, table3, next3, [next3 | acc])
next1 <= next2 and next1 <= next3 ->
merge(table1, next1, table2, prev2, table3, prev3, [next1 | acc])
next2 <= next1 and next2 <= next3 ->
merge(table1, prev1, table2, next2, table3, prev3, [next2 | acc])
next3 <= next1 and next3 <= next2 ->
merge(table1, prev1, table2, prev2, table3, next3, [next3 | acc])
end
end
Oh... oh dear. That is not good. We'd better not try four.
Maybe the problem here is that we're just not structuring our code properly?
All of those :"$end_of_table" checks sure are making a mess.
Remember back when we had an iterator, it allowed us to separate the traversal from the iteration.
The abstraction seemed so wasteful then, but now that things have gotten complicated...
K-way merge
Before we do anything else,
let's modify the KeyIterator to return a sentinel key when it's empty.
The sentinel should sort higher than any other key.
We'll just use "\xFF",
255 bytes ought to be enough for anybody.
defmodule KeyIterator do
def next(%KeyIterator{table: table, current_key: key} = iterator) do
case :ets.next(table, key) do
:"$end_of_table" -> %{iterator | current_key: "\xFF"}
next -> %{iterator | current_key: next}
end
end
end
With that out of the way, let's conjure another iterator from the void.
defmodule MergeIterator do
defstruct [:iterators, :current_key]
def new(key_iterators) do
next(%MergeIterator{iterators: key_iterators})
end
def peek(%MergeIterator{current_key: key}) do
key
end
def next(%MergeIterator{iterators: iterators} = merge_iterator) do
[it1, it2] = iterators
key1 = KeyIterator.peek(it1)
key2 = KeyIterator.peek(it2)
case key1 <= key2 do
true ->
it1 = KeyIterator.next(it1)
%{merge_iterator | iterators: [it1, it2], current_key: key1}
false ->
it2 = KeyIterator.next(it2)
%{merge_iterator | iterators: [it1, it2], current_key: key2}
end
end
end
Now we can just re-use our traversal code from earlier as though we were iterating on a single table.
def scan(iterator, acc \\ []) do
case MergeIterator.peek(iterator) do
"\xFF" ->
Enum.reverse(acc)
key ->
iterator = MergeIterator.next(iterator)
scan(iterator, [key | acc])
end
end
That sure is nice. Clearly it's a lot easier to understand. But weren't we trying to merge more than two tables?
As it turns out, all we need is a priority queue (sold separately).
defmodule MergeIterator do
defstruct [:queue, :current_key]
def new(key_iterators) do
queue =
Enum.reduce(key_iterators, :gb_trees.empty(), fn it, queue ->
key = KeyIterator.peek(it)
:gb_trees.insert(key, it, queue)
end)
next(%MergeIterator{queue: queue})
end
def next(%MergeIterator{queue: queue} = iterator) do
{current_key, it, queue} = :gb_trees.take_smallest(queue)
it = KeyIterator.next(it)
key = KeyIterator.peek(it)
queue = :gb_trees.insert(key, it, queue)
%{iterator | queue: queue, current_key: current_key}
end
end
Huh, would you look at that. Six lines of code and we can merge as many tables as we want, and we can scan them with the same function we were already using!
There was no way to do this without the iterators. We needed to be able to encapsulate the state of an arbitrary number of table traversals and juggle them simultaneously. The iterators were not meaningless overhead, they were a control flow primitive in disguise.
And that is what we call foreshadowing.
Part 2
Alright, let's take a step back here. Why did I just spend half an article arguing with a strawman? Well, as it turns out, that strawman was me. Two weeks ago. When I had no idea what the hell I was doing.
See, when you're working on something you've never done before, you have to do a lot of what I would call "exploratory programming". You massage the code into shape, iteratively. It's usually not clear what it's going to look like from the start.
I got lucky with the merge. I had a rough idea of how the code was supposed to be laid out, and I knew iterators were involved because I have read other, similar code. So I pretty much one-shot the whole thing in a day and moved on.
But compaction... did not go so well.
Blocks on blocks
There is really no way to shield you from the complexity here so I'm just going to give it to you straight. LSM Trees compact sorted tables on disk into other sorted tables on disk. The source tables are merged into a single sorted stream using a K-way merge, as we have discussed. But how do you actually write things to disk? How does that work?
The tables will have to be read eventually, so we need the keys to be organized in some way. To that end, we sort them and then split them into chunks and write the chunks to disk. Each chunk gets an index entry to make lookups faster, and we write those to disk at the end as well. This effectively forms a two-level tree structure, where we binary search the index and then the chunk. Keeping up?
Sorry, but it gets worse. For reasons that are not worth getting into we need to split the data into fixed-size blocks on disk. If a chunk doesn't fit in the block, the block is padded and the chunk is sent to the next block.
So pairs go into chunks and chunks go into blocks. If a pair doesn't fit in a chunk it goes in the next chunk, and if a chunk doesn't fit in a block it goes in the next block.
Oh, and also blocks go into tables and tables go into levels. Don't even worry about that.
Building chunks
Let's bring back the merge iterator and try to build some chunks.
def build_chunk(iterator, acc) when byte_size(acc) > (32 * 1024) do
{acc, iterator}
end
def build_chunk(iterator, acc) do
case MergeIterator.peek(iterator) do
"\xFF" ->
{acc, iterator}
key ->
iterator = MergeIterator.next(iterator)
build_chunk(iterator, acc <> key)
end
end
{_iterator, "abc"} = build_chunk(iterator, "")
This function takes in an iterator and returns the iterator plus a chunk. Once the chunk grows beyond 32 KiB we stop there and return the iterator. Don't worry about how we're supposed to find keys in the resulting byte soup; that's not important here.
What is important is that this code is wrong. We want to set hard limits on size. We don't want to be just over 32 KiB, we want to be under! That requires lookahead.
Fortunately, our iterator has a handy property: it buffers the next key.
def build_chunk(iterator, acc) do
case MergeIterator.peek(iterator) do
"\xFF" ->
{acc, iterator}
next_key ->
case (byte_size(acc) + byte_size(next_key)) > (32 * 1024) do
true ->
{acc, iterator}
false ->
iterator = MergeIterator.next(iterator)
build_chunk(iterator, acc <> next_key)
end
end
end
The iterator allows us to split the chunking logic from the iteration logic, just like the scanner before. It holds on to the key for us so that the next chunk can start with it. This is a very useful property; one which, at the time, I did not fully understand.
Speaking of the next chunk, what about the blocks?
Building blocks
We already know about lookahead, so let's just start there.
def build_block(iterator, acc) do
case MergeIterator.peek(iterator) do
"\xFF" ->
{acc, iterator, nil}
_ ->
{chunk, iterator} = build_chunk(iterator, "")
case (byte_size(acc) + byte_size(chunk)) > (512 * 1024) do
true ->
{acc, iterator, chunk}
false ->
build_block(iterator, acc <> chunk)
end
end
end
Hmm, wait a minute. Something weird has happened here. We perform the lookahead to see if the chunk will fit, but if it doesn't then we have nothing to do with it. We can't just throw it away; it was expensive! So we have to just... pass it on?
Let's build a table.
def build_table(iterator, acc, initial_chunk \\ nil) do
case MergeIterator.peek(iterator) do
"\xFF" ->
{iterator, acc}
_ ->
{block, iterator, next_chunk} =
case initial_chunk do
nil -> build_block(iterator, "")
chunk -> build_block(iterator, chunk)
end
build_table(iterator, [block | acc], next_chunk)
end
end
Alright, something has gone seriously wrong here. Logic for block-building has leaked into the table code. That is definitely not supposed to be there.
Making matters worse, this example is of the simplest possible case. In reality we need to accumulate the index entries and keep track of a bunch of metadata. Plus, the tables themselves are organized into levels, so this isn't even the end of it!
Tragically, I wrote the entire first pass of compaction like this. That poor little chunk was passed through, I kid you not, like four different levels of abstraction. I wrote it, and I could tell it was wrong, but I could not for the life of me understand why. It sat like that for a month as I scaffolded out the rest of the engine before coming back for a second pass.
And then it hit me.
Iterators on iterators on iterators
What are we doing in build_block() anyway?
Are we iterating over keys?
Or are we iterating over chunks?
defmodule ChunkIterator do
defstruct [:merge_iterator, :current_chunk]
def new(merge_iterator) do
next(%ChunkIterator{merge_iterator: merge_iterator})
end
def peek(%ChunkIterator{current_chunk: current_chunk}) do
current_chunk
end
def next(%ChunkIterator{merge_iterator: merge_iterator} = iterator) do
case MergeIterator.peek(merge_iterator) do
"\xFF" ->
%{iterator | current_chunk: :empty}
_key ->
{chunk, merge_iterator} = build_chunk(merge_iterator, "")
%{iterator | merge_iterator: merge_iterator, current_chunk: chunk}
end
end
end
That looks nice. Let's try it out.
def build_block(iterator, acc) do
case ChunkIterator.peek(iterator) do
:empty ->
{acc, iterator}
chunk ->
case (byte_size(acc) + byte_size(chunk)) > (512 * 1024) do
true ->
{acc, iterator}
false ->
iterator = ChunkIterator.next(iterator)
build_block(iterator, acc <> chunk)
end
end
end
And the table?
def build_table(iterator, acc) do
case ChunkIterator.peek(iterator) do
:empty ->
{acc, iterator}
_ ->
{block, iterator} = build_block(iterator, "")
build_table(iterator, [block | acc])
end
end
Well would you look at that.
The problem is gone!
The iterator buffers the next chunk, and build_table() passes it on to the next block none the wiser.
We have successfully decoupled chunking from iteration.