Guidance for working with LTQ, a lightweight async-first task queue built on Redis with a middleware pattern for cross-cutting concerns.
Expert guidance for working with LTQ, a lightweight async-first task queue built on Redis. LTQ uses a middleware pattern for cross-cutting concerns like retries, rate limiting, and error tracking.
LTQ is a lightweight, async-first task queue built on Redis. It uses a middleware pattern for cross-cutting concerns like retries, rate limiting, and error tracking. The architecture emphasizes composability and developer ergonomics.
When working with this project, use these commands:
```bash
uv sync
uv run ltq examples.github:worker
uv run ltq examples.github:worker --concurrency 100 --log-level DEBUG
uv run python examples/github.py
```
Understanding the message flow is critical when debugging or extending LTQ:
1. `Task.send()` serializes args/kwargs into a `Message` and pushes to Redis queue
2. `Worker` polls queues and retrieves batches of messages (up to `concurrency` count)
3. Messages pass through the middleware chain (applied in reverse order via `functools.partial`)
4. The innermost handler executes the actual task function
5. Messages are acknowledged after processing; `RetryMessage` exceptions trigger re-enqueue with delay
When reading or modifying code, focus on these core components:
**Worker** (`worker.py`)
**Queue** (`q.py`)
**Middleware** (`middleware.py`)
**Message** (`message.py`)
**Task** (`task.py`)
When implementing custom middleware, follow this pattern:
```python
class Middleware(ABC):
@abstractmethod
async def handle(self, message: Message, next_handler: Handler) -> Any: ...
```
**Key guidelines:**
LTQ uses exception-based control flow for task lifecycle:
When debugging failed tasks, check the worker logs for exceptions and inspect the `message.ctx` for middleware state.
When modifying or extending LTQ:
1. **Adding new middleware**: Create a subclass of `Middleware` in `middleware.py`, implement `async handle()`, and ensure it calls `await next_handler(message)`
2. **Adding new tasks**: Decorate async functions with `@worker.task(queue='name')`, ensure they accept serializable args/kwargs
3. **Debugging message flow**: Add logging in middleware `handle()` methods, inspect `message.ctx` for state
4. **Testing**: Write async tests using `pytest-asyncio`, mock Redis interactions with `fakeredis`
5. **Performance tuning**: Adjust `--concurrency` flag for workers, use `RateLimit` middleware to prevent overwhelming downstream services
The `examples/` directory contains reference implementations. The `github.py` example demonstrates:
Always refer to examples when implementing new features.
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/ltq-task-queue/raw