When developing Savant, we create auxiliary technologies that can be used in other projects. We would like to share with you a new project developed by the Savant team – RocksQ. We needed a high-performance persistent queue to buffer video frames and metadata in situations when the receiving party is out of order. Previously, we used persist-queue, a Pythonic persistent queue on top of Sqlite. However, it is obviously an overhead to use a full-scale embedded SQL database just for queueing.
We use Rust with PyO3 very broadly in Savant and used RocksDB previously in other high-performance solutions. So, we decided to use two proven technologies to implement a high-performance thread-safe persistent queue. In Savant, we also have an additional requirement: long-lasting operations must release GIL when possible to unlock the power of multithreading in Python.
Thus, we implemented a new Python library – RocksQ, providing two flavors for persistent queuing: synchronous blocking and asynchronous unblocking implementations allowing flexible usage models.
Currently, only a fixed capacity queue is implemented. Under “fixed capacity” I assume that the developer configured the maximum amount of elements to store and the queue doesn’t allow keeping extra elements until the capacity is freed by consuming.
- Code on GitHub: https://github.com/insight-platform/RocksQ
- License: Apache 2
Storage Design
RocksDB is a well-known high-performance key-value store widely used in the database world. To implement RocksQ we use a single namespace database with 64-bit unsigned int
keys and arbitrary binary values. The MAX and MAX-1 keys are used to keep write and read counters. Upon initialization, those values are loaded in the memory and synced atomically with every read/write operation within the RocksDB batch.
The storage supports batched read and write operations to help users build more efficient queues when possible. You can either write a single data element or multiple data elements at once. When reading, you specify the desired maximum amount of elements to retrieve.
From the Python perspective, the queue works with Python Bytes objects, so it is not limited to a certain serialization technology: a user decides how to serialize data on their side.
Blocking Queue
The blocking implementation helps build simple applications relying on a basic sequential computational model. When blocking write or read happens, the queue blocks the execution until the operation is complete. To optimize resource optimization developers can optionally release GIL allowing other threads to run. For the blocking implementation, GIL is released by default.
import time
import os
from rocksq import remove_queue
from rocksq.blocking import PersistentQueueWithCapacity
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/queue'
# if directory exists, remove it
if os.path.exists(PATH):
remove_queue(PATH)
q = PersistentQueueWithCapacity(PATH)
start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
q.push(data, no_gil=RELEASE_GIL)
for i in range(OPS):
v = q.pop(max_elements=1, no_gil=RELEASE_GIL)
assert len(v) == NUM
assert v == [bytes(str(i), 'utf-8')]
end = time.time()
print("Time taken: %f" % (end - start))
As you can see, the code is pretty straightforward. You can explore the documentation for the blocking queue in the RocksQ docs.
When the push
or pop
operation is complete the queue state is guaranteed to be consistent and will be recovered during the next launch.
Non-Blocking Queue
The non-blocking flavor helps build systems that don’t hold back while data are written or retrieved. Instead, the implementation returns a future-like object which can be checked on the operation readiness condition and the result is extracted when needed. The user can decide to block or not. The design makes it possible to use the non-blocking queue in asynchronous code which can yield
when the result is not yet ready.
import time
import os
from rocksq import remove_queue
from rocksq.nonblocking import PersistentQueueWithCapacity
NUM = 1
OPS = 1000
RELEASE_GIL = True
PATH = '/tmp/queue'
# if directory exists, remove it
if os.path.exists(PATH):
remove_queue(PATH)
q = PersistentQueueWithCapacity(PATH)
start = time.time()
for i in range(OPS):
data = [bytes(str(i), 'utf-8')]
fut = q.push(data, no_gil=RELEASE_GIL) # instantly returns
fut.get() # get() blocks, try_get() is non-blocking
fut = q.pop(max_elements=NUM, no_gil=RELEASE_GIL) # instantly returns
v = fut.get().data # get() blocks, try_get() is non-blocking
assert len(v) == NUM
assert v == data
end = time.time()
print("Time taken: %f" % (end - start))
Additional details for the non-blocking API are available on docs.
Max In-Flight Operations Limit
When the queue is created the user can pass an extra argumentmax_inflight_ops
(default is1000
). When the max is reached, thepush
operation blocks until the capacity is available.
When the future object is_ready()
the queue state is guaranteed to be consistent and will be restored during the next launch.
The get()
operation does not support GIL release intentionally. If you need it, you likely wanted to use the blocking implementation.
Performance (Blocking Implementation)
Hardware: Core(TM) i7-13700H, 16GB RAM, Kingston KC3000 NVME (with LUKS).
To Write 1M, and read 1M of small messages. The benchmark takes less than 8 seconds, giving ~120K
messages/sec.
To Write/Read 1M of small messages. The benchmark takes less than 6 seconds, giving ~160K
messages/sec.
To Write 1M, and read 1M of 1024-byte messages. The benchmark takes less than 12 seconds, giving 83K messages/sec.
To Write/Read 1M of 1024-byte messages. The benchmark takes less than 7 seconds, giving 142K messages/sec.
To Write 1K and read 1K of 1MB messages. The benchmark takes 1.7 seconds, giving 588 messages/sec (588MB/s).
To Write/Read 1K of 1MB messages. The benchmark takes 1.2 seconds, giving 833 messages/sec (833MB/s).
Supported Platforms
RocksQ CI builds automatically for the following platforms and Python interpreters. The users can use PIP
to set up the package in their environment:
pip install rocksq
Windows: Python versions: 3.7-3.12.
Linux: ManyLinux Python versions: 3.7-3.12. CI does not build for PyPy, but it should work if you build it manually.
MacOS: Currently, we do not have a MacOS environment to debug the build process in MacOS, volunteers are welcome.