Also, I really wanted something simple and easy to maintain after working around so many bugs. Dramatiq isn't as complicated as Celery but it still has features I don't need.
1. What’s the error handling strategy for when a task/payload fails?
2. How exactly do delayed tasks work? For example, are you delaying execution until say 10mins later? How do you process delayed tasks in sequential timed order?
3. What kind of metrics/stats are available?
4. Is there a way pause and resume or is this the same as start and stop?
Congrats.
Workers don't run the scheduled tasks, only a scheduler does. You should start only one scheduler but as many workers as you want. wakaq-scheduler and wakaq-worker are the command line interfaces.
> Is there any communication between workers?
Someone I know is working on a solution for this as an open source project. I don't have the link off hand but I'll send him your comment.
It would be awesome if Airflow starts using this.
this very effort is deserves praising. but i have seen it happen many times before. dramatiq, huey, rq, schedule, etc. all one needs to do is wait for those pesky corner cases to start piling up. and maintainers being unable to solve them. or not solve them fast enough.
we need primitives, building blocks, to build such systems when needed. we need more control.
i like what golang does.
Something like Erlang's OTP might have suitable primitives, but only because it's already a distributed system.
> Each queue is implemented using a Redis list.
That seems to imply that queue itself is not distributed.
But maybe "distributed" refers to the fact that workers are running on multiple machines? (Though that is a bit confusing to me; sort of like calling Postgres a distributed database because the clients are on multiple machines.)
2. Delayed tasks are added to a Redis sorted set, where the eta datetime when task should run is the score. Then the sorted set is queried for all items/tasks between zero and the current time. That returns eta tasks which are ready to run. Those tasks are added to their equivalent non-eta queue at the front and executed by the next worker. Eta tasks might not run right at the eta datetime, but they shouldn't run too early if all worker machines have clocks synced with time servers.
3. wakaq-info prints all queues with counts of pending tasks and pending eta tasks. Throughput isn't printed and has to be calculated by sampling wakaq-info at least twice. You can also import info from wakaq.utils to use it from a Python script.
4. No built-in way to pause. I pause by sending a broadcast task to all workers, which sets exclude_queues to all queues and then restarts the worker. Then it only listens for broadcast tasks and can be turned back on with another broadcast task.
> Congrats
Thanks!
These days I use Go when I need types.
https://github.com/tobymao/saq
Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.
Awesome! I'll check out your implementation.
> Looks like you use BLPOP, but there could be race conditions or lost jobs if your workers crash, check out BLMOVE and RPOPLPUSH.
I prefer ack-first (blpop) and designing the tasks to expect some amount of errors.
and
https://github.com/wakatime/wakaq/blob/main/wakaq/worker.py
is the meat of it. The blog post talks about the Redis data structures used, and there's not much to it beyond that.
https://stackoverflow.com/questions/39740632/python-type-hin...
But honestly, it seems like you did not read the article at all. That’s seems to be exactly what they do for the queue part, but you still need to manage the workers executing the tasks, lifecycle management and interfacing/serialization/deserialization for the tasks and results
Old when I confused streams with pubsub: Streams aren't durable... a worker goes offline for a second and all of a sudden you lost all your pending tasks. Redis lists are durable, so you can add tasks first then boot up a worker later and the worker will process those waiting tasks. With streams, the worker wouldn't see any pending tasks when it boots.
There's nothing wrong with the trade off you made, but when I read distributed I assumed that you meant the task broker was distributed and therefore redundant, not the consumers.
Also while plain XREAD doesn't give you pending tasks, using consumer groups and look at XREADGROUP (specifically using ID = 0 to get all pending tasks) should get you all pending tasks before continuing onto unseen tasks. There is also XCLAIM and XAUTOCLAIM which you can filter the pending tasks by how long they have gone unacknowledged for and have another worker claim the tasks.
Otherwised good product.
But the author expresses way too much confidence in the description of the project. It has no test, no type hints, no pluggable error handling, not fall back when the redis instance dies, and has been used in prod by apparently a single entity.
Stating "WakaQ is stable and ready to use in production" is irresponsible, as a task queue is critical infrastructure and this could lure devs into thinking this code is way more mature than it is.
Of course, one should do due diligences when evaluation a new part of the stack. I also appreciate the enthousiasm of the article, and I'm sure the code already procudes many benefits for the author.
But no, a asynchronous execution system dealing with priority, resiliance and network messages, that have existed for a month, is in no way stable and ready. It's nascent and promising.
I'm one of TaskTiger's current maintainers. Just curious if any specific features are missing (or how it compares in benchmarks)?
I've changed it to "WakaQ is still a new project, so use at your own risk."
> how it compares in benchmarks
WakaQ processes many tasks per fork, while I think TaskTiger only processes one? That should make WakaQ slightly faster, but it still depends on too many factors to know without benchmarks.
I actually benchmarked TaskTiger in production a long time ago against Celery. From memory, I think it was slower than Celery. I can't find those benchmarks and even if I could, WakaTime's scale is bigger now so not sure how useful they would be. If you create some benchmarks would love to see them.
I do a combination of a few things to handle exceptions in tasks:
1. `@wakaq.task(max_retries=X)` auto retries the task on soft timeouts, which are usually intermittent and faster the second time the task runs
2. use error handling to track task failure rates https://gist.github.com/alanhamlett/365d48276ac054ae75e59525...
3. build tasks to expect some failures by being safe to re-run without, for ex: sending an email twice. this usually means exclusive locking in tasks or keeping counters of recent task runs or task progress somewhere
4. try catch blocks inside tasks, which rollback and re-enqueue the same task to retry later
Basically I handle all crashes at the application task level, not in WakaQ.
Your comment makes it seem like you haven't experienced Python types enough, or you wouldn't think it was so easy.
Generally yes, though it also supports "batching" tasks where tasks are `.delay`'d normally but the task is given batches of N sets of args at a time and the task's code has to understand batching logic.
:+1: I'd definitely believe the task handling code is slower than celery. Honestly I haven't personally done too much optimization work on it (though perhaps previous maintainers have, I'm not sure). Performance of TaskTiger itself hasn't really been problematic at all for us in production with hundreds of workers. Usually it's been external things (like databases, third party API's, etc) that impact our task throughput the most. Or wanting to shift the architecture entirely to avoid using memory-bound Redis as a queue with an overflow risk.
I wanted to use SSDB[1] instead of Redis for that reason, but it doesn't support the necessary data structures.
Oh trust me I did and I constantly slap on the wrists juniors who over-complicate their solutions to the problem :)
> Why would there exist a way in Python to conditionally import types, for the purpose of preventing cyclic imports, if cyclic imports weren't a problem?
Because it's easier to understand than the solution to cyclic imports without conditional imports.