https://www.pgcasts.com/episodes/the-skip-locked-feature-in-...
It’s not “web scale” but it easily extends to several thousand background jobs in my experience
Python has Celery, but maybe the author is looking for more choice between brokers. https://docs.celeryq.dev/en/stable/index.html
https://github.com/bensheldon/good_job
Had it in production for about a quarter and it’s worked well.
1. The main downside to using PostgreSQL as a pub/sub bus with LISTEN/NOTIFY is that LISTEN is a session feature, making it incompatible with statement level connection pooling.
2. If you are going to do this use advisory locks [0]. Other forms of explicit locking put more pressure on the database while advisory locks are deliberately very lightweight.
My favorite example implementation is que [1] which is ported to several languages.
[0] https://www.postgresql.org/docs/current/explicit-locking.htm...
I built a complete implementation in Python designed to work the same as SQS but be more simple:
https://github.com/starqueue/starqueue
Alternatively if you just want to quickly hack something into your application, here is a complete solution in one Python function with retries (ask ChatGPT to tell you what the table structure is):
import psycopg2
import psycopg2.extras
import random
db_params = {
'database': 'jobs',
'user': 'jobsuser',
'password': 'superSecret',
'host': '127.0.0.1',
'port': '5432',
}
conn = psycopg2.connect(**db_params)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
def do_some_work(job_data):
if random.choice([True, False]):
print('do_some_work FAILED')
raise Exception
else:
print('do_some_work SUCCESS')
def process_job():
sql = """DELETE FROM message_queue
WHERE id = (
SELECT id
FROM message_queue
WHERE status = 'new'
ORDER BY created ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;
"""
cur.execute(sql)
queue_item = cur.fetchone()
print('message_queue says to process job id: ', queue_item['target_id'])
sql = """SELECT * FROM jobs WHERE id =%s AND status='new_waiting' AND attempts <= 3 FOR UPDATE;"""
cur.execute(sql, (queue_item['target_id'],))
job_data = cur.fetchone()
if job_data:
try:
do_some_work(job_data)
sql = """UPDATE jobs SET status = 'complete' WHERE id =%s;"""
cur.execute(sql, (queue_item['target_id'],))
except Exception as e:
sql = """UPDATE jobs SET status = 'failed', attempts = attempts + 1 WHERE id =%s;"""
# if we want the job to run again, insert a new item to the message queue with this job id
cur.execute(sql, (queue_item['target_id'],))
else:
print('no job found, did not get job id: ', queue_item['target_id'])
conn.commit()
process_job()
cur.close()
conn.close()are you queuing jobs, or are you queuing messages?
that's a fuzzy distinction, so somewhat equivalently, what's the expected time it takes for a worker to process a given queue item?
at one end, an item on the queue may take several seconds to a minute or longer to process. at the other end, an item might take only a few milliseconds to process. in that latter case, it's often useful to do micro-batching, where a single worker pulls 100 or 1000 items off the queue at once, and processes them as a batch (such as by writing them to a separate datastore)
the "larger" the items are (in terms of wall-clock processing time, not necessarily in terms of size in bytes of the serialized item payload) the more effective the database-as-a-queue solution is, in my experience.
as queue items get smaller / shorter to process, and start to feel more like "messages" rather than discrete "jobs", that's when I tend to reach for a queue system over a database.
for example, there's a RabbitMQ blog post [0] on cluster sizing where their recommendations start at 1000 messages/second. that same message volume on a database-as-a-queue would require, generally speaking, 3000 write transactions per second (if we assume one transaction to enqueue the message, one for a worker to claim it, and one for a worker to mark it as complete / delete it).
can Postgres and other relational databases be scaled & tuned to handle that write volume? yes, absolutely. however, how much write volume are you expecting from your queue workload, compared to the write volume from its "normal database" workload? [1]
I think that ends up being a useful heuristic when deciding whether or not to use a database-as-a-queue - will you have a relational database with a "side gig" of acting like a queue, or will you have a relational database that in terms of data volume is primarily acting like a queue, with "normal database" work relegated to "side gig" status?
0: https://blog.rabbitmq.com/posts/2020/06/cluster-sizing-and-o...
1: there's also a Postgres-specific consideration here where a lot of very short-lived "queue item" database rows can put excessive pressure on the autovacuum system.
QC (and equivs) use the same db, and same connection, so same transaction. Saves quite a bit of cruft.
My guess is that many people are implementing queuing mechanisms just for sending email.
You can see how this works in Arnie SMTP buffer server, a super simple queue just for emails, no database at all, just the file system.
delete from task
where task_id in
( select task_id
from task
order by random() -- use tablesample for better performance
for update
skip locked
limit 1
)
returning task_id, task_type, params::jsonb as params
[1] https://taylor.town/pg-taskTransactionally Staged Job Drains in Postgres - https://brandur.org/job-drain
It's about the challenge of matching up transactions with queues - where you want a queue to be populated reliably if a transaction completes, and also reliably NOT be populated if it doesn't.
Brandur's pattern is to have an outgoing queue in a database table that gets updated as part of that transaction, and can then be separately drained to whatever queue system you like.
Oban is able to run over 1m jobs a minute, and the ultimate bottleneck is throttling in application code to prevent thrashing the database: https://getoban.pro/articles/one-million-jobs-a-minute-with-...
Having transactions is quite handy.
https://wiki.postgresql.org/wiki/SkyTools
I did a few talks on this at Sydpy as I used it at work quite a bit. It's handy when you already have postgresql running well and supported.
This said, I'd use a dedicated queue these days. Anything but RabbitMQ.
Recently I had to stop using it because after a while all NOTIFY/LISTENS would stop working, and only a database restart would fix the issue https://dba.stackexchange.com/questions/325104/error-could-n...
In the beginning you can do a naive UPDATE ... SET, which locks way too much. While you can make your locking more efficient, doing UPDATE with SELECT subqueries for dequeues and SELECT FOR UPDATE SKIP LOCKED, eventually your dequeue queries will throttle each other's locks and your queue will grind to a halt. You can try to disable enqueues at that point to give your DB more breathing room but you'll have data loss on lost enqueues and it'll mostly be your dequeues locking each other out.
You can try very quickly to shard out your task tables to avoid locking and that may work but it's brittle to roll out across multiple workers and can result in data loss. You can of course drop a random subset of tasks but this will cause data loss. Any of these options is not only highly stressful in a production scenario but also very hard to recover from without a ground-up rearchitecture.
Is this kind of a nightmare production scenario really worth choosing Boring Technology? Maybe if you have a handful of customers and are confident you'll be working at tens of tasks per second forever. Having been in the hot seat for one of these I will always choose a real queue technology over a database when possible.
It's more like a few thousand per second, and enqueues win, not dequeues like you say... on very small hardware without tuning. If you're at tens of tasks per second, you have a whole lot of breathing room: don't build for 100x current requirements.
https://chbussler.medium.com/implementing-queues-in-postgres...
> eventually your dequeue queries will throttle each other's locks a
This doesn't really make sense to me. To me, the main problem seems to be that you end up with having a lot of snapshots around.
This link is simply raw enqueue/dequeue performance. Factor in workers that perform work or execute remote calls and the numbers change. Also, I find when your jobs have high variance in times, performance degrades significantly.
> This doesn't really make sense to me. To me, the main problem seems to be that you end up with having a lot of snapshots around.
The dequeuer needs to know which tasks to "claim", so this requires some form of locking. Eventually this becomes a bottleneck.
> don't build for 100x current requirements
What happens if you get 100x traffic? Popularity spikes can do it, so can attacks. Is the answer to just accept data loss in those situations? Queue systems are super simple to use. I'm counting "NOTIFY/LISTEN" on Postgres as a queue, because it is a queue from the bottom up.
Oban's been great, especially if you pay for Web UI and Pro for the extra features [3]
The main issue we've noticed though is that due to its simple fetching mechanism using locks, jobs aren't distributed evenly across your workers due to the greedy `SELECT...LIMIT X` [2]
If you have long running and/or resource intensive jobs, this can be problematic. Lets say you have 3 workers with a local limit of 10 per node. If there are only 10 jobs in the queue, the first node to fetch available jobs will grab and lock all 10, with the other 2 nodes sitting idle.
[1] https://github.com/sorentwo/oban [2] https://github.com/sorentwo/oban/blob/main/lib/oban/engines/... [3] https://getoban.pro/#feature-comparison
The nice thing about "boring" tech like Postgres is that it has great documentation. So just peruse https://www.postgresql.org/docs/current/sql-notify.html . No need for google-fu.
UPDATE ... SET status = 'locked' ... RETURNING message_id
Or you can just use an IMMEDIATE transaction, SELECT the next message ID to retrieve, and UPDATE the row.On top of that, if you want to be extra safe, you can do:
UPDATE Queue SET status = 'locked' WHERE status = 'ready' AND message_id = '....'
To make sure you that the message you are trying to retrieve hasn't been locked already by another worker.[0]: https://github.com/litements/litequeue/
[1]: https://github.com/litements/litequeue/blob/3fece7aa9e9a31e4...
https://www.postgresql.org/docs/current/transaction-iso.html...
Especially given the emphasis on YAGNI, you don’t need a UUID primary key, and all of its problems they bring for B+trees (that thing RDBMS is built on), nor do you need the collision resistance of SHA256 - the odds of you creating a dupe job hash with MD5 are vanishingly small.
As to the actual topic, it’s fine IFF you carefully monitor for accumulating dead tuples, and adjust auto-vacuum for that table as necessary. While not something you’d run into at the start, at a modest scale you may start to see issues. May. You may also opt to switch to Redis or something else before that point anyway.
EDIT: if you choose ULID, UUIDv7, or some other k-sortable key, the problem isn’t nearly as bad, but you still don’t need it in this situation. Save yourself 8 bytes per key.
I’ve experimented with making this easier via libraries that provide high-level APIs for using Postgres as a queue and manage the schemas, listen/notify, etc for you: https://github.com/adriangb/pgjobq
I don't think that there's anything wrong with using a database as a queue, however, I think that there probably could have been better ways to get across the idea, rather than just dismissing an honest opinion as BS. I don't necessarily agree with all of what was said there, but at the same time I can see why those arguments would be reasonable: >>20022572
For example:
> Because it is hacky from the perspective of a distributed system architecture. It's coupling 2 components that probably ought not be coupled because it's perceived as "convenient" to do so. The idea that your system's control and data planes are tightly coupled is a dangerous one if your system grows quickly.
To me, this makes perfect sense, if you're using the same database instance for the typical RDBMS use case AND also for the queue. Then again, that could be avoided by having separate database instances/clusters and treating those as separate services: prod-app-database and prod-queue-database.
That said, using something like RabbitMQ or another specialized queue solution might also have the additional benefit of bunches of tutorials and libraries, as well as other resources available, which is pretty much the case whenever you have a well known and a more niche technology, even when the latter might be in some ways better! After all, there is a reason why many would use Sidekiq, resque, rq, Hangfire, asynq and other libraries that were mentioned and already have lots of content around them.
Though whether the inherent complexity of the system or the complexity of your code that's needed to integrate with it is more important, is probably highly situational.
Good thing I didn't listen to your advice... my DIY background task queue saved my website when Celery couldn't scale. Why are you against rolling your own task queue besides it seeming complicated?
https://wakatime.com/blog/56-building-a-background-task-queu...
https://github.com/wakatime/wakaq
We currently process ~20 million tasks per day, and I don't have to worry about running VACUUM on my queue ;)
TTL typically deletes expired items within a few days. Depending on the size and activity level of a table, the actual delete operation of an expired item can vary. Because TTL is meant to be a background process, the nature of the capacity used to expire and delete items via TTL is variable (but free of charge). [0]
Because of that limitation, I would not use that approach. Instead I would do Scheduled Lambdas to check for items every 15 minutes in a Serverless Aurora and then add them to SQS with delays.I've had my eye on this problem for a few years and keep thinking that a simple SaaS that does one-shot scheduled actions would probably be a worthy side project. Not enough to build a company around, but maintenance would be low and there's probably some pricing that would attract enough customers to be sustainable.
[0] https://docs.aws.amazon.com/amazondynamodb/latest/developerg...
so pg_try_advisory_lock/pg_advisory_unlock can lock over transactions while for update skip locked can't, thus you would either need to keep a transaction open or use status+job_timeout (and in postgres you should not use long transactions)
basically we use c#, but we looked into https://github.com/que-rb/que which uses advisory_locks, since our jobs take like 1 min to 2 hours it was a no-brainer to use advisory_locks. it's just not the best thing if you have thousands of fast jobs per second, but for a more moderate queue where you have like 10000 jobs per minute/10 minutes/30 minutes and they take like 1 min to 2 hours its fine.
we also do not delete jobs, we do not care about storage since the job table basically does not take a lot. and we have a lot of time to catchup at night since we are only in europe
IDK maybe <1000 messages per minute
Not saying SKIP LOCKED can't work with that many. But you'll probably want to do something with lower overhead.
FWIW, Que uses advisory locks [1]
https://archive.org/download/completedictiona00falluoft
There's many uses in British literature of the 1800's, and a whole lot of uses in academic literature of the 70's to 80's. https://i.imgur.com/BhMv2nF.png "Disabuse" would fit into many of these slots, but not all.
Only common use now is RPG jargon; imbuing something with an attribute is something role playing nerds talk about, and it really needs an antonym.
I’m not confusing anything. I’ve seen random selection “job queues” implemented many times. As long as you truly don’t care about start order, it’s fine to trade it for increased throughout.