Golang Priority Queue: Sorting at Scale
How to find top N selling items out of billions without sorting all of them? 🤔
Greetings friend 👋
A Priority Queue (we'll implement it from scratch) has a few interesting properties which makes it indispensable for processing high volumes of data.
It's also known as Heap or Heap Queue and is often available in a language standard library. For example, Go defines a container/heap
interface (pkg.go.dev/container/heap) and provides Heap operations on a conformant container.
Priority Queue is often unfairly forgotten though. We don't use it as much as other fundamental containers (arrays, lists and dicts), but it can be useful in constrained environments with high volume processing.
Properties
Recall, that a Priority Queue has the following properties:
- Constant time minimal element look up (it's always at the front of the queue)
LogN
time for adding and removing elements (heap is binary tree)
Sort Complexity
We're all familiar with sort
. Even if you're rusty on the actual algorithm, you know that most common sort
implementations are:
- QuickSort – requires no additional memory and runs in
N*LogN
- MergeSort – requires N memory, runs in
N*LogN
as well
There are a few more, but this is what you'd see the most.
Now, tell me, what's your first instinct on how to get top 50 selling items out of a large unsorted list of them? 🙄🙋
100 giraffe spots
20 capybara toys
80 cat hugs
999 devops best practices
...
If you work on Unix, you'd likely instinctively write (and you'd be right!):
sort -n file.txt | tail -n 50
It's often convenient to write similar programming code as well.
However, sort
does sort the entire list first and then you only select the last (or first for min) items.
Similarly, you can sort using a Priority Queue by first inserting all elements into a heap and then removing them one by one which results in the ascending order.
This results in the same time complexity: N*LogN
since you're doing N insert/delete operations and the cost of each operation is logarithmic.
Have you spotted the trick already? 😉
We often forget that it's actually
N*LogM
complexity:
- N is the total number of elements sorted
- M is the elements in the heap
These are equal when you do a HeapSort. And you can make significant performance gains if you're only interested in the top M elements.
Here's a graph of what we expect to see, N*LogN
vs N*LogM
:
You can see a distinctive log shape. Now let's code this up and benchmark it!
Heap Implementation
We'll follow the most common and efficient Priority Queue implementation – array-packed binary tree.
Conveniently, as you can see in the diagram, indices work out such that the left child node is always in position 2*pos
and the right child is at index 2*pos+1
for all nodes in the tree.
In this arrangement, the smallest element is always at the root of the tree and child nodes are always larger than their parent.
It remains then to implement addition and removal operations such that they preserve this invariant.
Note, for convenience and code clarity I'll waste the 0th element since packed positions start at 1, but you can also re-map them back into the 0-based positioning if needed.
Let's get our Go module started with:
mkdir go-heap && cd go-heap && go mod init codelab/heapq
And start an empty heap.go
package file:
package heapq
...
Step 1: Container, Comparator and Go Generics
I'll allow any
type of elements in the container even though we should technically enforce comparable
and ordered
.
However, the Ordered
constraint didn't get into the standard library yet (it's still experimental), and I'm not a big fan of unnecessary dependencies in simple modules.
We can easily avoid this by taking a comparator function that tells us how to find a minimum of two elements.
type Heap[T any] struct { // accept Any type into the queue
// compare two elements of type T and return true if first is smaller
comparator Comparator[T]
// number of elements in the priority queue
size int
// memory buffer of fixed capacity for storing the binary tree of items
data []T
}
// return true is a < b
type Comparator[T any] func(a *T, b *T) bool
func NewHeap[T any](capacity int, comparator Comparator[T]) *Heap[T] {
return &Heap[T]{
comparator: comparator,
size: 0,
data: make([]T, capacity+1, capacity+1),
}
}
Step 2: Helper Functions
It would be useful to define a few helper functions to navigate tree nodes in an array:
func parentIdx(pos int) int {
return pos / 2
}
func leftIdx(pos int) int {
return pos * 2
}
func rightIdx(pos int) int {
return pos*2 + 1
}
And a couple more helper methods to check whether we're at the tree leaf already and cannot proceed and a swap method for two nodes in the tree:
func (q *Heap[T]) isLeaf(pos int) bool {
return leftIdx(pos) > q.size
}
func (q *Heap[T]) swap(a int, b int) {
q.data[a], q.data[b] = q.data[b], q.data[a]
}
Step 3: Push / Peek / Pop
Now, Peek
is the easy one, the minimal element is always at the root of the tree (element 1 in the array):
func (q *Heap[T]) Peek() (res T, err error) {
if q.size < 1 {
return res, fmt.Errorf("peeking into an empty queue")
}
res = q.data[1]
return res, nil
}
Adding elements is easy – we add a new leaf at the bottom of the tree and keep "bubbling" it up until this element is smaller than its parent:
func (q *Heap[T]) Push(item T) error {
if q.size >= len(q.data) {
return fmt.Errorf("pushing into a full container")
}
q.size++
cur := q.size
q.data[cur] = item
for q.comparator(&q.data[cur], &q.data[parentIdx(cur)]) {
q.swap(cur, parentIdx(cur))
cur = parentIdx(cur)
}
return nil
}
Removing is a little more difficult, we are taking the first element (root) out and need to find a good replacement in the children items below:
func (q *Heap[T]) Pop() (res T, err error) {
if q.size < 1 {
return res, fmt.Errorf("popping from an empty queue")
}
res = q.data[1]
q.data[1] = q.data[q.size]
q.size--
q.percolate(1)
return res, nil
}
func (q *Heap[T]) percolate(pos int) {
if q.isLeaf(pos) {
return
}
var cur *T = &q.data[pos]
var left *T = &q.data[leftIdx(pos)]
var right *T
if rightIdx(pos) <= q.size {
right = &q.data[rightIdx(pos)]
}
if q.comparator(left, cur) || q.comparator(right, cur) {
if q.comparator(left, right) {
q.swap(pos, leftIdx(pos))
q.percolate(leftIdx(pos))
} else {
q.swap(pos, rightIdx(pos))
q.percolate(rightIdx(pos))
}
}
}
Go Benchmark
Here's a simple benchmark to test the topM sort against the full size sort baseline:
func benchmarkTopM(topm int, maxsize int, b *testing.B) {
var arr []int = make([]int, maxsize, maxsize)
hq := NewHeap[int](topn, func(a, b *int) bool { return b == nil || a != nil && *a < *b })
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
// random init
for idx := 0; idx < len(arr); idx++ {
arr[idx] = rand.Int()
}
b.StartTimer()
for idx := 0; idx < len(arr); idx++ {
if hq.size >= topm {
hq.Pop()
}
hq.Push(arr[idx])
}
for idx := 0; idx < topn; idx++ {
hq.Pop()
}
}
}
func TestTopRuntimes(t *testing.T) {
file, err := os.Create("bench.csv")
if err != nil {
t.Fatal(err)
}
defer file.Close()
fmt.Fprintln(file, "M size, heap topM, heap baseline")
for topm := 1; topm < 1000; topm += 1 {
fnHeap := func(b *testing.B) {
benchmarkTopM(topn, 1000, b)
}
fnHeapBaseline := func(b *testing.B) {
benchmarkTopM(1000, 1000, b)
}
rHeap := testing.Benchmark(fnHeap)
rHeapBaseline := testing.Benchmark(fnHeapBaseline)
fmt.Fprintln(file, topm, ",",
int(rHeap.T)/rHeap.N, ",",
int(rHeapBaseline.T)/rHeapBaseline.N)
}
}
And here's the real life results on my Intel NUC which are a little noisy as expected and align with the model we saw at the start of this post:
What we're learnt
Hopefully, you see how useful a Priority Queue can be given the right use case :)
Let's recap:
- It's more cpu-time efficient than a
sort | head
because you only have anN*LogM
complexity whereM<N
instead of fullN*LogN
of a regular sort. - It's more memory efficient, you only ever need to store
M
items in memory and can even do online infinite data stream processing. And at any point, have the top M items ready.
That's it, thank you for reading!
You've been awesome! 🙌
Member discussion