zlacker

[return to "On SQS"]
1. andrew+7a[view] [source] 2019-05-27 08:51:45
>>mpweih+(OP)
I use Postgres SKIP LOCKED as a queue.

I used to use SQS but Postgres gives me everything I want. I can also do priority queueing and sorting.

I gave up on SQS when it couldn't be accessed from a VPC. AWS might have fixed that now.

All the other queueing mechanisms I investigated were dramatically more complex and heavyweight than Postgres SKIP LOCKED.

◧◩
2. andrew+sF[view] [source] 2019-05-27 14:29:28
>>andrew+7a
Here is a complete implementation:

    import psycopg2
    import psycopg2.extras
    import random
    
    db_params = {
        'database': 'jobs',
        'user': 'jobsuser',
        'password': 'superSecret',
        'host': '127.0.0.1',
        'port': '5432',
    }
    
    conn = psycopg2.connect(**db_params)
    cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
    
    def do_some_work(job_data):
        if random.choice([True, False]):
            print('do_some_work FAILED')
            raise Exception
        else:
            print('do_some_work SUCCESS')
    
    def process_job():
    
        sql = """DELETE FROM message_queue 
    WHERE id = (
      SELECT id
      FROM message_queue
      WHERE status = 'new'
      ORDER BY created ASC 
      FOR UPDATE SKIP LOCKED
      LIMIT 1
    )
    RETURNING *;
    """
        cur.execute(sql)
        queue_item = cur.fetchone()
        print('message_queue says to process job id: ', queue_item['target_id'])
        sql = """SELECT * FROM jobs WHERE id =%s AND status='new_waiting' AND attempts <= 3 FOR UPDATE;"""
        cur.execute(sql, (queue_item['target_id'],))
        job_data = cur.fetchone()
        if job_data:
            try:
                do_some_work(job_data)
                sql = """UPDATE jobs SET status = 'complete' WHERE id =%s;"""
                cur.execute(sql, (queue_item['target_id'],))
            except Exception as e:
                sql = """UPDATE jobs SET status = 'failed', attempts = attempts + 1 WHERE id =%s;"""
                # if we want the job to run again, insert a new item to the message queue with this job id
                cur.execute(sql, (queue_item['target_id'],))
        else:
            print('no job found, did not get job id: ', queue_item['target_id'])
        conn.commit()
    
    process_job()
    cur.close()
    conn.close()
◧◩◪
3. _sh+6R1[view] [source] 2019-05-28 03:32:52
>>andrew+sF
Interesting... How do you schedule this? If the queue is empty, do you back off and retry later, or spin the query until it returns a queue item, or some other way? It's a nice approach.
[go to top]