I used to use SQS but Postgres gives me everything I want. I can also do priority queueing and sorting.
I gave up on SQS when it couldn't be accessed from a VPC. AWS might have fixed that now.
All the other queueing mechanisms I investigated were dramatically more complex and heavyweight than Postgres SKIP LOCKED.
https://aws.amazon.com/about-aws/whats-new/2018/12/amazon-sq...
I think I have a rough idea about how it works because I implemented something similar about four years ago in PostgreSQL but kept getting locking issues I couldn't get out of:
- https://stackoverflow.com/questions/33467813/concurrent-craw...
- https://stackoverflow.com/questions/29807033/postgresql-conc...
Also, what kind of queue size / concurrency on the polling side are you able to sustain for your current hardware?
As for acking, I see two common methods: using an additional boolean column, something like is_processed. Consumers skip truthy ones. Or, after the work is done, simply delete the entry or move elsewhere (e.g. For archival / auditing).
Offcourse one could delay the commit until all processing is completed but then reasoning about the queue throughput becomes tricky.
If you ack before processing, and then you crash, those messages are lost (assuming you can't recover from the crash and you are not using something like a two-phase commit).
If you ack after processing, you may fail after the messages have been processed but before you've been able to ack them. This leads to duplicates, in which case you better hope your work units are idempotent. If they are not, you can always keep a separate table of message IDs that have been processed, and check against it.
Either way, it's hard, complex and there are thousands of intermediate failure cases you have to think about. And for each possible solution (2pc, separate table of message IDs for idempotency, etc) you bring more complexity and problems to the table.
To be clear, it is not that the SKIP LOCKED solution is invalid, it is just that there are scenarios where it is not sufficient.
1. By combining services, 1 less service to manage in your stack (e.g. do your demo/local/qa envs all connect to Sqs?)
2. Postgres preserves your data if it goes down
3. You already have the tools on each machine and everybody knows the querying language to examine the stack
4. All your existing DB tools (e.g. backup solutions) automatically now cover your queue too, for free.
5. Performance is a non-issue for any company doing < 10m queue items a day.
import psycopg2
import psycopg2.extras
import random
db_params = {
'database': 'jobs',
'user': 'jobsuser',
'password': 'superSecret',
'host': '127.0.0.1',
'port': '5432',
}
conn = psycopg2.connect(**db_params)
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
def do_some_work(job_data):
if random.choice([True, False]):
print('do_some_work FAILED')
raise Exception
else:
print('do_some_work SUCCESS')
def process_job():
sql = """DELETE FROM message_queue
WHERE id = (
SELECT id
FROM message_queue
WHERE status = 'new'
ORDER BY created ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *;
"""
cur.execute(sql)
queue_item = cur.fetchone()
print('message_queue says to process job id: ', queue_item['target_id'])
sql = """SELECT * FROM jobs WHERE id =%s AND status='new_waiting' AND attempts <= 3 FOR UPDATE;"""
cur.execute(sql, (queue_item['target_id'],))
job_data = cur.fetchone()
if job_data:
try:
do_some_work(job_data)
sql = """UPDATE jobs SET status = 'complete' WHERE id =%s;"""
cur.execute(sql, (queue_item['target_id'],))
except Exception as e:
sql = """UPDATE jobs SET status = 'failed', attempts = attempts + 1 WHERE id =%s;"""
# if we want the job to run again, insert a new item to the message queue with this job id
cur.execute(sql, (queue_item['target_id'],))
else:
print('no job found, did not get job id: ', queue_item['target_id'])
conn.commit()
process_job()
cur.close()
conn.close()The SKIP LOCKED bit means that once the queue item has been grabbed FOR UPDATE, it cannot then be grabbed FOR UPDATE by any other queries, so it has an exclusive lock on the worker queue item.
It's pretty robust and works fine for servicing multiple workers.
Because it is hacky from the perspective of a distributed system architecture. It's coupling 2 components that probably ought not be coupled because it's perceived as "convenient" to do so. The idea that your system's control and data planes are tightly coupled is a dangerous one if your system grows quickly. It forces a lot of complexity into areas where you'd rather not deal with it, later manifesting as technical debt as folks who need to solve the problem have to go hunting through the codebase to find where all the control and throttling decisions are being made and what they touch.
> 1. By combining services, 1 less service to manage in your stack (e.g. do your demo/local/qa envs all connect to Sqs?)
"Oh this SQS is such a pain to manage", is not something anyone who has used SQS is going to say. It's much less work than a database. It has other benefits too, like being easier for devs by being able to trivially replicate staging and production environments is a major selling point of cloud services in the first place. And unlike Postgres, you can do it from anywhere.
The decision to use the same postgres you use for data as a queue also concentrates failure in your app and increases the blast radius of performance issues concentrated around one set of hardware interfaces. A bad day for your disk can now manifest in many more ways.
> 2. Postgres preserves your data if it goes down
Not if you're using a SKIP LOCKED queue (in the sense that it has exactly the same failure semantics as SQS, and you're more likely to lose your postgres instance than SQS with a whole big SRE team is to go down). Can you name a data loss scenario for SQS in the past 8 years? I can think of one that didn't hit every customer, and I'm not at AWS. Personally, I haven't lost data to it despite a fair amount of use since the time us-east-1 flooded. Certainly "reliability" isn't SQS's primary problem.
> 3. You already have the tools on each machine and everybody knows the querying language to examine the stack
SQS has a web console. You don't need to know a querying language. If you're querying off a queue, you have ceased to have an unqualified queue.
> 4. All your existing DB tools (e.g. backup solutions) automatically now cover your queue too, for free.
Given most folks who are using AWS are strongly encouraged and empoewred to use RDS, this seems like a wash. But also: SHOULD you back up your queues? This seems to me like an important architectural question. If your system goes down hard, should/can it resume where it left off when restarting?
It'd argue the answer is almost always "no." But YMMV.
> 5. Performance is a non-issue for any company doing < 10m queue items a day.
You say, and then your marketing team says, "We're gonna be on Ellen next week, get ready!"
What does "hacky" even mean? If it means using side effects for a primary purpose then no, SKIP LOCKED is not a side effect.
I researched alot of alternative queues to SQS and tried several of them but all of them were complex, heavyweight, with questionable library support and more trouble than they were worth.
The good thing about using a database as a queue is that you get to easily customise queue behaviour to implement things like ordering and priority and whatever other stuff you want and its all as easy as SQL.
As you say, using Postgres as a queue cut out alot of complexity associated with using a standalone queueing system.
I think MySQL/Oracle/SQL server might also support SKIP LOCKED.
>>>Not if you're using a SKIP LOCKED queue,
Can you provide a reference for this? I'm not aware that using SKIP LOCKED data is treated any differently to other Postgres data in terms of durability.
The rest of the sentence is a challenge, "show me a data loss bug in SQS that extends beyond the failure cases you've already implicitly agreed to using postgres itself."
Personally, I would never use a relational database as a queue, but I'm already very familiar with SQS's semantics.
Especially for temporary batch processing jobs, where I really don't want to be modifying my production tables, SQS queues rock.
Definitely similar experience here. We handle ~10 million messages a day in a pubsub system quite similar in spirit to the above, running on AWS Aurora MySQL.
Our system isn't a queue. We track a little bit of short-lived state for groups of clients, and do low-latency, in-order message delivery between clients. But a lot of the architecture concerns are the same as with your queue implementation.
We switched over to our own pubsub code, implemented the simplest way we could think of, on top of vanilla SQL, after running for several months on a well-regarded SaaS NoSQL provider. After it became clear that both reliability and scaling were issues, we built several prototypes on top of other infrastructure offerings that looked promising.
We didn't want to run any infrastructure ourselves, and didn't want to write this "low-level" message delivery code. But, in the end, we felt that we could achieve better system observability, benchmarking, and modeling, with much less work, using SQL to solve our problems.
For us, the arguments are pretty much Dan McKinley's from the Choose Boring Technology paper.[0]
It's definitely been the right decision. We've had very few issues with this part of our codebase. Far, far fewer than we had before, when we were trying to trace down failures in code we didn't write ourselves on hardware that we had no visibility into at all. This has turned out to be a counter-data point to my learned aversion to writing any code if somebody else has already written and debugged code that I can use.
One caveat is that I've built three or four pubsub-ish systems over the course of my career, and built lots and lots of stuff on top of SQL databases. If I had 20 years of experience using specific NoSQL systems to solve similar problems, those would probably qualify as "boring" technology, to me, and SQL would probably seem exotic and full of weird corner cases. :-)
Same as SQS.
If you are using the queue as a log of events (i.e. user actions), you get an atomic guarantee that the db data is updated and the event describing this update has been recorded.
https://github.com/mbuhot/ecto_job for Elixir
https://github.com/timgit/pg-boss for Node.js
If you want a reliable system along those lines than you need to use SKIP LOCKED to SELECT one row to lock, then process it, and then DELETE the row. If your process dies then the lock will be release. You still have a new flavor of the same problem: you might process a message twice because the process might die in between completing processing and deleting the row. You could add complexity: first use SKIP LOCKED to SELECT one row to UPDATE to mark in-progress and LOCK the row, then later if the process dies another can go check if the job was performed (then clean the garbage) or not (pick and perform the job) -- a two-phase commit, essentially.
Factor out PG, and you'll see that the problem similar no matter the implementation.
If you don't want to keep the transaction open than you can just go back to updating a column containing the message status, which avoids keeping a transaction open but might need a background process to check for stalled out consumers.
It is a very, very slippery slope and it's seductive to put everything into Postgres at low scale but I've seen more places sunk by technical debt (because they can't move fast enough to catch up to parity with market demands, can't scale operationally with the budget, or address competitors' features) than ones that don't get enough traction where technical debt becomes a problem. But perhaps my experience is perversely biased and I'm just driven to over-engineer stuff because I've been routinely handed the reins of quite literally business non-viable software systems that were over-sold repeatedly while Silicon Valley sounds like it has the inverse problem of over-engineering for non-viable businesses.
That's hard to argue with.
The example code shows how it should be done - a simple table dedicated to messages which supplies the id of the work/job to be carried out.
Not much can be done architecturally to address developers messing that up.
Looking solely from a code perspective, how would this be done prior to Postgres 9.5 (before SKIP LOCKED)? Of the four examples I saw (either MySQL or Postgres), all of them choked constantly and had a pathological case of a job failing to execute expediently during peak usage which made on-call a nightmare. So what do we do about the places that can't upgrade their Postgres databases because it's too complicated to decouple messaging and business data now? Based upon my observations the answer is "it's never done" or the company goes under from lack of ability to enact changes caused by the paralysis.
The reality is that simple jobs are almost never enough outside toy examples. Soon, someone wants to be notified about job status changes - then comes an event table with a foreign key constraint on the job table's primary key. Also add in job digraphs with sub-tasks and workflows (AKA weak transactions) - these are non-trivial to do well. Depending upon your SQL variant and version combined with access patterns, row level locking and indexing can be an O(1) lookup or an O(n^2) nightmare that causes tons of contention depending (once again) upon the manner in which jobs are modified.
Instead of thinking about "what are my transaction boundaries and what do my asynchronous jobs look like?" which are much more invariant and important to the business trying to map their existing solution onto as many problems as possible. Then it should be more clear whether you would be fine with a RDBMS table, Airflow, SQS, Redis, RabbitMQ, JMS, etc. Operations-wise more components is certainly a headache, but I've had more headaches in production due to inappropriate technology for the problem domain than "this is just too many parts!"
It's possible to implement SKIP LOCKED in userland in PostgreSQL (using NOWAIT, which has been in PG), although it's obviously a bit slower.
The very handy thing about the setup described, is that your data tables are part of the same MVCC world-state as your message queue. So you do all the work for the job, in the context of the same MVCC transaction that is holding the job locked; and anything that causes the job to fail, will fail the entire transaction, and thus rollback any changes that the job's operation made to the data.