Running Sneakers workers on Kubernetes

These are some notes on stuff I discovered when delving into running Sneakers workers on Kube recently.

Allow Sneakers to receive SIGTERM

Sneakers has good graceful shutdown handling built-in. When it receives SIGTERM, Sneakers will:

  1. Stop consuming the queue

In the context of Kube, it’s important that Sneakers runs on PID 1. This is because only the process on PID 1 will receive the SIGTERM from Kube. To ensure this make sure the command is:

command: ["bundle", "exec", "rake", "sneakers:run"]

It’s important it’s in the array format (command: “bundle exec rake sneakers:run” WON’T work).

It’s also important not to do:

command: ["/bin/sh", "-c", "WORKERS=MyWorker rake sneakers:run"]

as bash will start sneakers on another process.

The WORKERS variable can be added to the environment:

env:
- name: WORKERS
value: MyWorker

Increase the grace period

The grace period is how long Kube will wait for the process on PID 1 to finish before sending SIGKILL (immediate termination).

The default grace period is 30 seconds. But if you’re expecting work to take longer you may want to increase it. You may fall into this category if your Sneakers prefetch is high. This means that Sneakers will pull a large amount of messages off the queue to process in batch.

To increase the grace period — add this to your Deployment YAML:

spec:
template:
spec:
terminationGracePeriodSeconds: 180

Application level shutdown handling

You may want your Sneakers worker to do something special when the SIGTERM is received.

For example, I had a case where I was buffering thousands of messages in memory and then increasing a counter in a database after a certain number of messages had been received. When SIGTERM was received I wanted to flush the buffer immediately before I lost all the in-memory counters.

Sneakers supports this very simply by adding a stop method to your worker:

class MyWorker
include Sneakers::Worker
from_queue :my_queue,
durable: true,
amqp: ENV['AMQP_CONNECTION'],
exchange: 'domain_events',
exchange_options: { type: :topic },
prefetch: 100,
threads: 1,
routing_key: [:EVENT_HAPPENED]
def work(message)
@counter += 1

return ack! unless @counter >= 10_000

Updater.increment(@counter)
@counter = 0
ack!
end
def stop
Updater.increment(@counter)
end
end

Software engineer