2. Delayed tasks are added to a Redis sorted set, where the eta datetime when task should run is the score. Then the sorted set is queried for all items/tasks between zero and the current time. That returns eta tasks which are ready to run. Those tasks are added to their equivalent non-eta queue at the front and executed by the next worker. Eta tasks might not run right at the eta datetime, but they shouldn't run too early if all worker machines have clocks synced with time servers.
3. wakaq-info prints all queues with counts of pending tasks and pending eta tasks. Throughput isn't printed and has to be calculated by sampling wakaq-info at least twice. You can also import info from wakaq.utils to use it from a Python script.
4. No built-in way to pause. I pause by sending a broadcast task to all workers, which sets exclude_queues to all queues and then restarts the worker. Then it only listens for broadcast tasks and can be turned back on with another broadcast task.
> Congrats
Thanks!
https://github.com/tobymao/saq
Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.
Awesome! I'll check out your implementation.
> Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.
I prefer ack-first (blpop) and designing the tasks to expect some amount of errors.
and
https://github.com/wakatime/wakaq/blob/main/wakaq/worker.py
is the meat of it. The blog post talks about the Redis data structures used, and there's not much to it beyond that.
https://stackoverflow.com/questions/39740632/python-type-hin...
I do a combination of a few things to handle exceptions in tasks:
1. `@wakaq.task(max_retries=X)` auto retries the task on soft timeouts, which are usually intermittent and faster the second time the task runs
2. use error handling to track task failure rates https://gist.github.com/alanhamlett/365d48276ac054ae75e59525...
3. build tasks to expect some failures by being safe to re-run without, for ex: sending an email twice. this usually means exclusive locking in tasks or keeping counters of recent task runs or task progress somewhere
4. try catch blocks inside tasks, which rollback and re-enqueue the same task to retry later
Basically I handle all crashes at the application task level, not in WakaQ.
I wanted to use SSDB[1] instead of Redis for that reason, but it doesn't support the necessary data structures.