diff --git a/README.md b/README.md index 82d150b..625cd66 100644 --- a/README.md +++ b/README.md @@ -33,22 +33,25 @@ pip install -e . ## Configuration -FQ reads a simple INI config file. Intervals are in milliseconds. -``` -[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 ; -1 retries forever, 0 means no retries - -[redis] -db : 0 -key_prefix : queue_server -conn_type : tcp_sock ; or unix_sock -host : 127.0.0.1 -port : 6379 -password : -clustered : false -unix_socket_path : /tmp/redis.sock +FQ accepts a simple config mapping. Intervals are in milliseconds. +```python +config = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, # -1 retries forever, 0 means no retries + }, + "redis": { + "db": 0, + "key_prefix": "queue_server", + "conn_type": "tcp_sock", # or "unix_sock" + "host": "127.0.0.1", + "port": 6379, + "password": "", + "clustered": False, + "unix_socket_path": "/tmp/redis.sock", + }, +} ``` > If you connect via Unix sockets, uncomment the `unixsocket` lines in your `redis.conf`: @@ -66,8 +69,26 @@ from fq import FQ async def main(): - fq = FQ("config.conf") - await fq.initialize() # load config, connect to Redis, register Lua scripts + config = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, + }, + "redis": { + "db": 0, + "key_prefix": "queue_server", + "conn_type": "tcp_sock", + "host": "127.0.0.1", + "port": 6379, + "password": "", + "clustered": False, + "unix_socket_path": "/tmp/redis.sock", + }, + } + + fq = FQ(config) + await fq.initialize() # connect to Redis and register Lua scripts job_id = str(uuid.uuid4()) await fq.enqueue( @@ -102,7 +123,7 @@ Common operations: ## Development -- Start Redis for local development: `make redis-up` (binds to `localhost:6379`; matches `tests/test.conf`). +- Start Redis for local development: `make redis-up` (binds to `localhost:6379`). - Run the suite: `make test` (automatically starts and tears down Redis). - Build a wheel: `make build` - Install/uninstall from the build: `make install` / `make uninstall` diff --git a/pyproject.toml b/pyproject.toml index 3c1c34b..80a1492 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ dynamic = ["version"] description = "A simple Redis-based job queue system." readme = "README.md" license = {text = "MIT"} -authors = [{name = "Ochui Princewill", email = "ochui@flowdacity.com"}] +authors = [{name = "Flowdacity Development Team", email = "admin@flowdacity.com"}] requires-python = ">=3.12" dependencies = [ "msgpack>=1.1.2", diff --git a/src/fq/default.conf b/src/fq/default.conf deleted file mode 100644 index 6163c9b..0000000 --- a/src/fq/default.conf +++ /dev/null @@ -1,14 +0,0 @@ -[fq] -job_expire_interval : 120000 ; in milliseconds -job_requeue_interval : 5000 ; in milliseconds -default_job_requeue_limit : 0 ; value of -1 retries infinitely - -[redis] -db = 0 -key_prefix = sharq_server -conn_type = tcp_sock -unix_socket_path = /tmp/redis.sock -port = 6379 -host = 127.0.0.1 -clustered = false -password = diff --git a/src/fq/queue.py b/src/fq/queue.py index 6434dae..cde545c 100644 --- a/src/fq/queue.py +++ b/src/fq/queue.py @@ -4,7 +4,7 @@ import asyncio import os -import configparser +from collections.abc import Mapping from redis.asyncio import Redis from redis.asyncio.cluster import RedisCluster @@ -25,51 +25,144 @@ class FQ(object): """The FQ object is the core of this queue. FQ does the following. - 1. Accepts a configuration file. + 1. Accepts structured configuration. 2. Initializes the queue. 3. Exposes functions to interact with the queue. """ - def __init__(self, config_path): + def __init__(self, config): """Construct a FQ object by doing the following. - 1. Read the configuration path. - 2. Load the config. + 1. Store the queue configuration. + 2. Validate the config shape. """ - self.config_path = config_path - self._load_config() self._r = None # redis client placeholder + if not isinstance(config, Mapping): + raise FQException("Config must be a mapping with redis and fq sections") + + normalized = {} + for section_name, section_values in config.items(): + if not isinstance(section_values, Mapping): + raise FQException( + "Config section '%s' must be a mapping" % section_name + ) + + normalized[str(section_name)] = { + str(option): value for option, value in section_values.items() + } + + if "redis" not in normalized or "fq" not in normalized: + raise FQException("Config missing required sections: redis, fq") + + redis_config = normalized["redis"] + fq_config = normalized["fq"] + + if "key_prefix" not in redis_config: + raise FQException("Missing config: redis.key_prefix") + if not isinstance(redis_config["key_prefix"], str) or not redis_config[ + "key_prefix" + ]: + raise FQException( + "Invalid config: redis.key_prefix must be a non-empty string" + ) + + if "conn_type" not in redis_config: + raise FQException("Missing config: redis.conn_type") + if redis_config["conn_type"] not in {"tcp_sock", "unix_sock"}: + raise FQException( + "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'" + ) + + if "db" not in redis_config: + raise FQException("Missing config: redis.db") + if isinstance(redis_config["db"], bool) or not isinstance( + redis_config["db"], int + ): + raise FQException("Invalid config: redis.db must be an integer") + + if "job_expire_interval" not in fq_config: + raise FQException("Missing config: fq.job_expire_interval") + if not is_valid_interval(fq_config["job_expire_interval"]): + raise FQException( + "Invalid config: fq.job_expire_interval must be a positive integer" + ) + + if "job_requeue_interval" not in fq_config: + raise FQException("Missing config: fq.job_requeue_interval") + if not is_valid_interval(fq_config["job_requeue_interval"]): + raise FQException( + "Invalid config: fq.job_requeue_interval must be a positive integer" + ) + + if "default_job_requeue_limit" not in fq_config: + raise FQException("Missing config: fq.default_job_requeue_limit") + if not is_valid_requeue_limit(fq_config["default_job_requeue_limit"]): + raise FQException( + "Invalid config: fq.default_job_requeue_limit must be an integer >= -1" + ) + + if redis_config["conn_type"] == "unix_sock": + if "unix_socket_path" not in redis_config: + raise FQException("Missing config: redis.unix_socket_path") + if not isinstance(redis_config["unix_socket_path"], str) or not redis_config[ + "unix_socket_path" + ]: + raise FQException( + "Invalid config: redis.unix_socket_path must be a non-empty string" + ) + + if redis_config["conn_type"] == "tcp_sock": + if "host" not in redis_config: + raise FQException("Missing config: redis.host") + if not isinstance(redis_config["host"], str) or not redis_config["host"]: + raise FQException( + "Invalid config: redis.host must be a non-empty string" + ) + + if "port" not in redis_config: + raise FQException("Missing config: redis.port") + if isinstance(redis_config["port"], bool) or not isinstance( + redis_config["port"], int + ): + raise FQException("Invalid config: redis.port must be an integer") + + if "clustered" in redis_config and not isinstance( + redis_config["clustered"], bool + ): + raise FQException("Invalid config: redis.clustered must be a boolean") + + if "password" in redis_config and redis_config["password"] is not None: + if not isinstance(redis_config["password"], str): + raise FQException("Invalid config: redis.password must be a string") + + self.config = normalized async def initialize(self): """Async initializer to set up redis and lua scripts.""" - await self._initialize() - - async def _initialize(self): - """Read the FQ configuration and set up redis + Lua scripts.""" + fq_config = self.config["fq"] + redis_config = self.config["redis"] - self._key_prefix = self._config.get("redis", "key_prefix") - self._job_expire_interval = int(self._config.get("fq", "job_expire_interval")) - self._default_job_requeue_limit = int( - self._config.get("fq", "default_job_requeue_limit") - ) + self._key_prefix = redis_config["key_prefix"] + self._job_expire_interval = int(fq_config["job_expire_interval"]) + self._default_job_requeue_limit = int(fq_config["default_job_requeue_limit"]) - redis_connection_type = self._config.get("redis", "conn_type") - db = self._config.get("redis", "db") + redis_connection_type = redis_config["conn_type"] + db = redis_config["db"] if redis_connection_type == "unix_sock": self._r = Redis( db=db, - unix_socket_path=self._config.get("redis", "unix_socket_path"), + unix_socket_path=redis_config["unix_socket_path"], ) elif redis_connection_type == "tcp_sock": isclustered = False - if self._config.has_option("redis", "clustered"): - isclustered = self._config.getboolean("redis", "clustered") + if "clustered" in redis_config: + isclustered = redis_config["clustered"] if isclustered: startup_nodes = [ { - "host": self._config.get("redis", "host"), - "port": int(self._config.get("redis", "port")), + "host": redis_config["host"], + "port": int(redis_config["port"]), } ] self._r = RedisCluster( @@ -80,9 +173,9 @@ async def _initialize(self): else: self._r = Redis( db=db, - host=self._config.get("redis", "host"), - port=int(self._config.get("redis", "port")), - password=self._config.get("redis", "password"), + host=redis_config["host"], + port=int(redis_config["port"]), + password=redis_config.get("password"), ) else: raise FQException("Unknown redis conn_type: %s" % redis_connection_type) @@ -107,36 +200,9 @@ async def _validate_redis_connection(self): if result is False: raise FQException("Failed to connect to Redis: ping returned False") - def _load_config(self): - """Read the configuration file and load it into memory.""" - if not os.path.isfile(self.config_path): - raise FQException("Config file not found: %s" % self.config_path) - - self._config = configparser.ConfigParser() - read_files = self._config.read(self.config_path) - - if not read_files: - raise FQException("Unable to read config file: %s" % self.config_path) - - if not self._config.has_section("redis") or not self._config.has_section( - "fq" - ): - raise FQException( - "Config file missing required sections: redis, fq (path: %s)" - % self.config_path - ) - def redis_client(self): return self._r - def reload_config(self, config_path=None): - """Reload the configuration from the new config file if provided - else reload the current config file. - """ - if config_path: - self.config_path = config_path - self._load_config() - def _load_lua_scripts(self): """Loads all lua scripts required by FQ.""" # load lua scripts diff --git a/tests/config.py b/tests/config.py new file mode 100644 index 0000000..617030c --- /dev/null +++ b/tests/config.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +from copy import deepcopy + + +TEST_CONFIG = { + "fq": { + "job_expire_interval": 5000, + "job_requeue_interval": 5000, + "default_job_requeue_limit": -1, + }, + "redis": { + "db": 0, + "key_prefix": "test_fq", + "conn_type": "tcp_sock", + "unix_socket_path": "/tmp/redis.sock", + "port": 6379, + "host": "127.0.0.1", + "clustered": False, + "password": "", + }, +} + + +def build_test_config(**section_overrides): + config = deepcopy(TEST_CONFIG) + for section_name, overrides in section_overrides.items(): + config.setdefault(section_name, {}) + config[section_name].update(overrides) + return config diff --git a/tests/test.conf b/tests/test.conf deleted file mode 100644 index b8fd6eb..0000000 --- a/tests/test.conf +++ /dev/null @@ -1,17 +0,0 @@ -[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq -conn_type : tcp_sock -; or unix_sock -;; unix connection settings -unix_socket_path : /tmp/redis.sock -;; tcp connection settings -port : 6379 -host : 127.0.0.1 -clustered : false -password : diff --git a/tests/test_edge_cases.py b/tests/test_edge_cases.py index 87c8721..b84a51d 100644 --- a/tests/test_edge_cases.py +++ b/tests/test_edge_cases.py @@ -2,14 +2,13 @@ # Copyright (c) 2025 Flowdacity Development Team. See LICENSE.txt for details. -import os -import tempfile import unittest from unittest.mock import patch from fq import FQ from fq.utils import is_valid_identifier from fq.exceptions import BadArgumentException, FQException +from tests.config import build_test_config class FakeCluster: @@ -113,8 +112,7 @@ async def delete(self, key): class TestEdgeCases(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - self.config_path = os.path.join(cwd, "test.conf") + self.config = build_test_config() self.fq_instance = None async def asyncTearDown(self): @@ -131,51 +129,53 @@ async def asyncTearDown(self): pass self.fq_instance = None - def test_missing_config_file_raises(self): - with self.assertRaisesRegex(FQException, "Config file not found"): + def test_invalid_config_type_raises(self): + with self.assertRaisesRegex(FQException, "Config must be a mapping"): FQ("/tmp/does-not-exist.conf") async def test_initialize_fails_fast_on_bad_redis(self): with patch("fq.queue.Redis", FakeRedisConnectionFailure): - fq = FQ(self.config_path) + fq = FQ(self.config) with self.assertRaisesRegex(FQException, "Failed to connect to Redis"): await fq.initialize() async def test_cluster_initialization(self): """Covers clustered Redis path (queue.py lines 69-75, 104-106).""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: - f.write( - """[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq_cluster -conn_type : tcp_sock -host : 127.0.0.1 -port : 6379 -clustered : true -password : -""" - ) - config_path = f.name - - try: - with patch("fq.queue.RedisCluster", FakeCluster): - fq = FQ(config_path) - await fq._initialize() - self.assertIsInstance(fq.redis_client(), FakeCluster) - await fq.close() - finally: - os.unlink(config_path) + config = build_test_config( + redis={"key_prefix": "test_fq_cluster", "clustered": True} + ) + with patch("fq.queue.RedisCluster", FakeCluster): + fq = FQ(config) + await fq.initialize() + self.assertIsInstance(fq.redis_client(), FakeCluster) + await fq.close() + + def test_clustered_config_must_be_boolean(self): + config = build_test_config(redis={"clustered": "true"}) + with self.assertRaisesRegex( + FQException, "Invalid config: redis.clustered must be a boolean" + ): + FQ(config) + + def test_missing_required_config_key_raises_with_path(self): + config = build_test_config() + del config["redis"]["key_prefix"] + with self.assertRaisesRegex(FQException, "Missing config: redis.key_prefix"): + FQ(config) + + def test_invalid_config_value_raises_with_path(self): + config = build_test_config(fq={"job_expire_interval": "5000"}) + with self.assertRaisesRegex( + FQException, + "Invalid config: fq.job_expire_interval must be a positive integer", + ): + FQ(config) async def test_dequeue_payload_none(self): """Covers dequeue branch where payload is None (queue.py line 212).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() fake_dequeue = FakeLuaDequeue() fq._lua_dequeue = fake_dequeue result = await fq.dequeue() @@ -184,24 +184,24 @@ async def test_dequeue_payload_none(self): async def test_clear_queue_delete_only(self): """Covers clear_queue else branch (queue.py lines 499, 502).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() await fq._r.flushdb() response = await fq.clear_queue(queue_type="noqueue", queue_id="missing") self.assertEqual(response["status"], "Failure") async def test_close_fallback_paths(self): """Covers close() fallback paths (queue.py lines 528-549).""" - fq = FQ(self.config_path) + fq = FQ(self.config) fq._r = FakeRedisForClose() await fq.close() self.assertIsNone(fq._r) async def test_deep_status_calls_set(self): """Covers deep_status (queue.py line 420).""" - fq = FQ(self.config_path) - fq._key_prefix = fq._config.get("redis", "key_prefix") + fq = FQ(self.config) + fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForDeepStatus() await fq.deep_status() self.assertEqual( @@ -215,39 +215,10 @@ def test_is_valid_identifier_non_string(self): self.assertFalse(is_valid_identifier(None)) self.assertFalse(is_valid_identifier(["a"])) - async def test_reload_config_with_new_path(self): - """Covers reload_config branch (queue.py lines 104-106).""" - with tempfile.NamedTemporaryFile(mode="w", suffix=".conf", delete=False) as f: - f.write( - """[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : new_prefix -conn_type : tcp_sock -port : 6379 -host : 127.0.0.1 -clustered : false -password : -""" - ) - new_config = f.name - - try: - fq = FQ(self.config_path) - fq.reload_config(new_config) - self.assertEqual(fq.config_path, new_config) - self.assertEqual(fq._config.get("redis", "key_prefix"), "new_prefix") - finally: - os.unlink(new_config) - async def test_clear_queue_purge_all_with_mixed_job_ids(self): """Covers purge_all loop branches (queue.py lines 463-468, 474-479).""" - fq = FQ(self.config_path) - fq._key_prefix = fq._config.get("redis", "key_prefix") + fq = FQ(self.config) + fq._key_prefix = fq.config["redis"]["key_prefix"] fq._r = FakeRedisForClear() response = await fq.clear_queue("qt", "qid", purge_all=True) self.assertEqual(response["status"], "Success") @@ -255,7 +226,7 @@ async def test_clear_queue_purge_all_with_mixed_job_ids(self): async def test_get_queue_length_invalid_params(self): """Covers validation branches (queue.py lines 499, 502).""" - fq = FQ(self.config_path) + fq = FQ(self.config) with self.assertRaises(BadArgumentException): await fq.get_queue_length("bad type", "qid") with self.assertRaises(BadArgumentException): @@ -263,9 +234,9 @@ async def test_get_queue_length_invalid_params(self): async def test_deep_status_real_redis(self): """Covers deep_status with real redis (queue.py line 420).""" - fq = FQ(self.config_path) + fq = FQ(self.config) self.fq_instance = fq - await fq._initialize() + await fq.initialize() result = await fq.deep_status() self.assertTrue(result) diff --git a/tests/test_func.py b/tests/test_func.py index 0116167..a2035d6 100644 --- a/tests/test_func.py +++ b/tests/test_func.py @@ -1,17 +1,16 @@ # -*- coding: utf-8 -*- # Copyright (c) 2014 Plivo Team. See LICENSE.txt for details. -import os import uuid import time import math import asyncio import unittest import msgpack -import tempfile from unittest.mock import AsyncMock, MagicMock from fq import FQ from fq.exceptions import FQException from fq.utils import generate_epoch, deserialize_payload +from tests.config import build_test_config @@ -23,11 +22,9 @@ class FQTestCase(unittest.IsolatedAsyncioTestCase): """ async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") # test config - self.queue = FQ(config_path) + self.queue = FQ(build_test_config()) # flush all the keys in the test db before starting test - await self.queue._initialize() + await self.queue.initialize() await self.queue._r.flushdb() # test specific values self._test_queue_id = "johndoe" @@ -1734,9 +1731,7 @@ async def test_deep_status(self): async def test_initialize_public_method(self): """Test the public initialize() method.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) + fq = FQ(build_test_config()) # Public initialize() should work await fq.initialize() @@ -1797,10 +1792,8 @@ async def test_redis_client_getter(self): async def test_close_properly_closes_connection(self): """Test close() method properly closes Redis connection.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) - await fq._initialize() + fq = FQ(build_test_config()) + await fq.initialize() self.assertIsNotNone(fq._r) await fq.close() @@ -1808,79 +1801,55 @@ async def test_close_properly_closes_connection(self): async def test_close_with_none_client(self): """Test close() when redis client is None.""" - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") - fq = FQ(config_path) + fq = FQ(build_test_config()) # Don't initialize, so _r is None await fq.close() # Should not crash self.assertIsNone(fq._r) async def test_initialize_unix_socket_connection(self): """Test initialization with Unix socket connection - tests line 59.""" - # Create a temporary config with unix_sock - with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: - f.write("""[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq_unix -conn_type : unix_sock -unix_socket_path : /tmp/redis_nonexistent.sock -""") - config_path = f.name - - try: - # Create a mock Redis class to capture initialization parameters - mock_redis_instance = MagicMock() - mock_redis_instance.ping = AsyncMock(return_value=True) - mock_redis_instance.register_script = MagicMock(return_value=MagicMock()) - mock_redis_instance.aclose = AsyncMock() - - redis_init_kwargs = {} - - def mock_redis_constructor(**kwargs): - redis_init_kwargs.update(kwargs) - return mock_redis_instance - - # Patch Redis to intercept the initialization - with unittest.mock.patch('fq.queue.Redis', side_effect=mock_redis_constructor): - fq = FQ(config_path) - await fq._initialize() - - # Verify that Redis was initialized with unix_socket_path - self.assertIn('unix_socket_path', redis_init_kwargs) - self.assertEqual(redis_init_kwargs['unix_socket_path'], '/tmp/redis_nonexistent.sock') - self.assertEqual(int(redis_init_kwargs['db']), 0) - - await fq.close() - finally: - os.unlink(config_path) + config = build_test_config( + redis={ + "key_prefix": "test_fq_unix", + "conn_type": "unix_sock", + "unix_socket_path": "/tmp/redis_nonexistent.sock", + } + ) + + # Create a mock Redis class to capture initialization parameters + mock_redis_instance = MagicMock() + mock_redis_instance.ping = AsyncMock(return_value=True) + mock_redis_instance.register_script = MagicMock(return_value=MagicMock()) + mock_redis_instance.aclose = AsyncMock() + + redis_init_kwargs = {} + + def mock_redis_constructor(**kwargs): + redis_init_kwargs.update(kwargs) + return mock_redis_instance + + # Patch Redis to intercept the initialization + with unittest.mock.patch("fq.queue.Redis", side_effect=mock_redis_constructor): + fq = FQ(config) + await fq.initialize() + + # Verify that Redis was initialized with unix_socket_path + self.assertIn("unix_socket_path", redis_init_kwargs) + self.assertEqual( + redis_init_kwargs["unix_socket_path"], "/tmp/redis_nonexistent.sock" + ) + self.assertEqual(int(redis_init_kwargs["db"]), 0) + + await fq.close() async def test_initialize_unknown_connection_type(self): - """Test initialization with invalid connection type raises error - tests line 88.""" - with tempfile.NamedTemporaryFile(mode='w', suffix='.conf', delete=False) as f: - f.write("""[fq] -job_expire_interval : 5000 -job_requeue_interval : 5000 -default_job_requeue_limit : -1 - -[redis] -db : 0 -key_prefix : test_fq -conn_type : invalid_type -""") - config_path = f.name - - try: - fq = FQ(config_path) - # This tests line 88 - unknown conn_type - with self.assertRaisesRegex(FQException, "Unknown redis conn_type"): - await fq._initialize() - finally: - os.unlink(config_path) + """Test constructor validation with invalid connection type.""" + config = build_test_config(redis={"conn_type": "invalid_type"}) + with self.assertRaisesRegex( + FQException, + "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'", + ): + FQ(config) async def test_clear_queue_with_purge_all_and_string_job_uuid(self): """Test clear_queue with purge_all=True handles string job UUIDs - tests lines 464, 468.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 840996d..e94a2f0 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,21 +1,19 @@ # -*- coding: utf-8 -*- # Copyright (c) 2014 Plivo Team. See LICENSE.txt for details. -import os import unittest from datetime import date from fq import FQ from fq.exceptions import BadArgumentException +from tests.config import build_test_config class FQTest(unittest.IsolatedAsyncioTestCase): """The FQTest contains test cases which validate the FQ interface.""" async def asyncSetUp(self): - cwd = os.path.dirname(os.path.realpath(__file__)) - config_path = os.path.join(cwd, "test.conf") # test config - self.queue = FQ(config_path) - await self.queue._initialize() + self.queue = FQ(build_test_config()) + await self.queue.initialize() self.valid_queue_type = "5m5_qu-eue" self.invalid_queue_type_1 = "s!ms_queue" @@ -63,7 +61,7 @@ async def asyncSetUp(self): async def asyncTearDown(self): # flush redis at the end and close connection await self.queue._r.flushdb() - await self.queue._r.aclose() + await self.queue.close() # ---------- enqueue ----------