Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,25 @@ pip install -e .

## Configuration

FQ reads a simple INI config file. Intervals are in milliseconds.
```
[fq]
job_expire_interval : 5000
job_requeue_interval : 5000
default_job_requeue_limit : -1 ; -1 retries forever, 0 means no retries

[redis]
db : 0
key_prefix : queue_server
conn_type : tcp_sock ; or unix_sock
host : 127.0.0.1
port : 6379
password :
clustered : false
unix_socket_path : /tmp/redis.sock
FQ accepts a simple config mapping. Intervals are in milliseconds.
```python
config = {
"fq": {
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1, # -1 retries forever, 0 means no retries
},
"redis": {
"db": 0,
"key_prefix": "queue_server",
"conn_type": "tcp_sock", # or "unix_sock"
"host": "127.0.0.1",
"port": 6379,
"password": "",
"clustered": False,
"unix_socket_path": "/tmp/redis.sock",
},
}
```

> If you connect via Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`:
Expand All @@ -66,8 +69,26 @@ from fq import FQ


async def main():
fq = FQ("config.conf")
await fq.initialize() # load config, connect to Redis, register Lua scripts
config = {
"fq": {
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1,
},
"redis": {
"db": 0,
"key_prefix": "queue_server",
"conn_type": "tcp_sock",
"host": "127.0.0.1",
"port": 6379,
"password": "",
"clustered": False,
"unix_socket_path": "/tmp/redis.sock",
},
}

fq = FQ(config)
await fq.initialize() # connect to Redis and register Lua scripts

job_id = str(uuid.uuid4())
await fq.enqueue(
Expand Down Expand Up @@ -102,7 +123,7 @@ Common operations:

## Development

- Start Redis for local development: `make redis-up` (binds to `localhost:6379`; matches `tests/test.conf`).
- Start Redis for local development: `make redis-up` (binds to `localhost:6379`).
- Run the suite: `make test` (automatically starts and tears down Redis).
- Build a wheel: `make build`
- Install/uninstall from the build: `make install` / `make uninstall`
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dynamic = ["version"]
description = "A simple Redis-based job queue system."
readme = "README.md"
license = {text = "MIT"}
authors = [{name = "Ochui Princewill", email = "ochui@flowdacity.com"}]
authors = [{name = "Flowdacity Development Team", email = "admin@flowdacity.com"}]
requires-python = ">=3.12"
dependencies = [
"msgpack>=1.1.2",
Expand Down
14 changes: 0 additions & 14 deletions src/fq/default.conf

This file was deleted.

172 changes: 119 additions & 53 deletions src/fq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncio
import os
import configparser
from collections.abc import Mapping

from redis.asyncio import Redis
from redis.asyncio.cluster import RedisCluster
Expand All @@ -25,51 +25,144 @@ class FQ(object):
"""The FQ object is the core of this queue.
FQ does the following.

1. Accepts a configuration file.
1. Accepts structured configuration.
2. Initializes the queue.
3. Exposes functions to interact with the queue.
"""

def __init__(self, config_path):
def __init__(self, config):
"""Construct a FQ object by doing the following.
1. Read the configuration path.
2. Load the config.
1. Store the queue configuration.
2. Validate the config shape.
"""
self.config_path = config_path
self._load_config()
self._r = None # redis client placeholder
if not isinstance(config, Mapping):
raise FQException("Config must be a mapping with redis and fq sections")

normalized = {}
for section_name, section_values in config.items():
if not isinstance(section_values, Mapping):
raise FQException(
"Config section '%s' must be a mapping" % section_name
)

normalized[str(section_name)] = {
str(option): value for option, value in section_values.items()
}

if "redis" not in normalized or "fq" not in normalized:
raise FQException("Config missing required sections: redis, fq")

redis_config = normalized["redis"]
fq_config = normalized["fq"]

if "key_prefix" not in redis_config:
raise FQException("Missing config: redis.key_prefix")
if not isinstance(redis_config["key_prefix"], str) or not redis_config[
"key_prefix"
]:
raise FQException(
"Invalid config: redis.key_prefix must be a non-empty string"
)

if "conn_type" not in redis_config:
raise FQException("Missing config: redis.conn_type")
if redis_config["conn_type"] not in {"tcp_sock", "unix_sock"}:
raise FQException(
"Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'"
)

if "db" not in redis_config:
raise FQException("Missing config: redis.db")
if isinstance(redis_config["db"], bool) or not isinstance(
redis_config["db"], int
):
raise FQException("Invalid config: redis.db must be an integer")

if "job_expire_interval" not in fq_config:
raise FQException("Missing config: fq.job_expire_interval")
if not is_valid_interval(fq_config["job_expire_interval"]):
raise FQException(
"Invalid config: fq.job_expire_interval must be a positive integer"
)

if "job_requeue_interval" not in fq_config:
raise FQException("Missing config: fq.job_requeue_interval")
if not is_valid_interval(fq_config["job_requeue_interval"]):
raise FQException(
"Invalid config: fq.job_requeue_interval must be a positive integer"
)

if "default_job_requeue_limit" not in fq_config:
raise FQException("Missing config: fq.default_job_requeue_limit")
if not is_valid_requeue_limit(fq_config["default_job_requeue_limit"]):
raise FQException(
"Invalid config: fq.default_job_requeue_limit must be an integer >= -1"
)

if redis_config["conn_type"] == "unix_sock":
if "unix_socket_path" not in redis_config:
raise FQException("Missing config: redis.unix_socket_path")
if not isinstance(redis_config["unix_socket_path"], str) or not redis_config[
"unix_socket_path"
]:
raise FQException(
"Invalid config: redis.unix_socket_path must be a non-empty string"
)

if redis_config["conn_type"] == "tcp_sock":
if "host" not in redis_config:
raise FQException("Missing config: redis.host")
if not isinstance(redis_config["host"], str) or not redis_config["host"]:
raise FQException(
"Invalid config: redis.host must be a non-empty string"
)

if "port" not in redis_config:
raise FQException("Missing config: redis.port")
if isinstance(redis_config["port"], bool) or not isinstance(
redis_config["port"], int
):
raise FQException("Invalid config: redis.port must be an integer")

if "clustered" in redis_config and not isinstance(
redis_config["clustered"], bool
):
raise FQException("Invalid config: redis.clustered must be a boolean")

if "password" in redis_config and redis_config["password"] is not None:
if not isinstance(redis_config["password"], str):
raise FQException("Invalid config: redis.password must be a string")

self.config = normalized

async def initialize(self):
"""Async initializer to set up redis and lua scripts."""
await self._initialize()

async def _initialize(self):
"""Read the FQ configuration and set up redis + Lua scripts."""
fq_config = self.config["fq"]
redis_config = self.config["redis"]

self._key_prefix = self._config.get("redis", "key_prefix")
self._job_expire_interval = int(self._config.get("fq", "job_expire_interval"))
self._default_job_requeue_limit = int(
self._config.get("fq", "default_job_requeue_limit")
)
self._key_prefix = redis_config["key_prefix"]
self._job_expire_interval = int(fq_config["job_expire_interval"])
self._default_job_requeue_limit = int(fq_config["default_job_requeue_limit"])

redis_connection_type = self._config.get("redis", "conn_type")
db = self._config.get("redis", "db")
redis_connection_type = redis_config["conn_type"]
db = redis_config["db"]
Comment on lines +141 to +149
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_initialize() indexes directly into fq_config / redis_config for required keys (e.g., redis_config["key_prefix"], fq_config["job_expire_interval"]). If a caller omits a key, this will raise a raw KeyError instead of an FQException, which makes misconfiguration harder to diagnose and is inconsistent with the rest of the error surface. Consider explicitly validating required keys (and basic types) up front and raising FQException with a clear path like redis.key_prefix / fq.job_expire_interval when missing/invalid.

Copilot uses AI. Check for mistakes.

if redis_connection_type == "unix_sock":
self._r = Redis(
db=db,
unix_socket_path=self._config.get("redis", "unix_socket_path"),
unix_socket_path=redis_config["unix_socket_path"],
)
elif redis_connection_type == "tcp_sock":
isclustered = False
if self._config.has_option("redis", "clustered"):
isclustered = self._config.getboolean("redis", "clustered")
if "clustered" in redis_config:
isclustered = redis_config["clustered"]

if isclustered:
startup_nodes = [
{
"host": self._config.get("redis", "host"),
"port": int(self._config.get("redis", "port")),
"host": redis_config["host"],
"port": int(redis_config["port"]),
}
]
self._r = RedisCluster(
Expand All @@ -80,9 +173,9 @@ async def _initialize(self):
else:
self._r = Redis(
db=db,
host=self._config.get("redis", "host"),
port=int(self._config.get("redis", "port")),
password=self._config.get("redis", "password"),
host=redis_config["host"],
port=int(redis_config["port"]),
password=redis_config.get("password"),
)
else:
raise FQException("Unknown redis conn_type: %s" % redis_connection_type)
Expand All @@ -107,36 +200,9 @@ async def _validate_redis_connection(self):
if result is False:
raise FQException("Failed to connect to Redis: ping returned False")

def _load_config(self):
"""Read the configuration file and load it into memory."""
if not os.path.isfile(self.config_path):
raise FQException("Config file not found: %s" % self.config_path)

self._config = configparser.ConfigParser()
read_files = self._config.read(self.config_path)

if not read_files:
raise FQException("Unable to read config file: %s" % self.config_path)

if not self._config.has_section("redis") or not self._config.has_section(
"fq"
):
raise FQException(
"Config file missing required sections: redis, fq (path: %s)"
% self.config_path
)

def redis_client(self):
return self._r

def reload_config(self, config_path=None):
"""Reload the configuration from the new config file if provided
else reload the current config file.
"""
if config_path:
self.config_path = config_path
self._load_config()

def _load_lua_scripts(self):
"""Loads all lua scripts required by FQ."""
# load lua scripts
Expand Down
30 changes: 30 additions & 0 deletions tests/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-

from copy import deepcopy


TEST_CONFIG = {
"fq": {
"job_expire_interval": 5000,
"job_requeue_interval": 5000,
"default_job_requeue_limit": -1,
},
"redis": {
"db": 0,
"key_prefix": "test_fq",
"conn_type": "tcp_sock",
"unix_socket_path": "/tmp/redis.sock",
"port": 6379,
"host": "127.0.0.1",
"clustered": False,
"password": "",
},
}


def build_test_config(**section_overrides):
config = deepcopy(TEST_CONFIG)
for section_name, overrides in section_overrides.items():
config.setdefault(section_name, {})
config[section_name].update(overrides)
return config
17 changes: 0 additions & 17 deletions tests/test.conf

This file was deleted.

Loading
Loading