44
55import asyncio
66import os
7- import configparser
7+ from collections . abc import Mapping
88
99from redis .asyncio import Redis
1010from redis .asyncio .cluster import RedisCluster
@@ -25,51 +25,144 @@ class FQ(object):
2525 """The FQ object is the core of this queue.
2626 FQ does the following.
2727
28- 1. Accepts a configuration file .
28+ 1. Accepts structured configuration.
2929 2. Initializes the queue.
3030 3. Exposes functions to interact with the queue.
3131 """
3232
33- def __init__ (self , config_path ):
33+ def __init__ (self , config ):
3434 """Construct a FQ object by doing the following.
35- 1. Read the configuration path .
36- 2. Load the config.
35+ 1. Store the queue configuration .
36+ 2. Validate the config shape .
3737 """
38- self .config_path = config_path
39- self ._load_config ()
4038 self ._r = None # redis client placeholder
39+ if not isinstance (config , Mapping ):
40+ raise FQException ("Config must be a mapping with redis and fq sections" )
41+
42+ normalized = {}
43+ for section_name , section_values in config .items ():
44+ if not isinstance (section_values , Mapping ):
45+ raise FQException (
46+ "Config section '%s' must be a mapping" % section_name
47+ )
48+
49+ normalized [str (section_name )] = {
50+ str (option ): value for option , value in section_values .items ()
51+ }
52+
53+ if "redis" not in normalized or "fq" not in normalized :
54+ raise FQException ("Config missing required sections: redis, fq" )
55+
56+ redis_config = normalized ["redis" ]
57+ fq_config = normalized ["fq" ]
58+
59+ if "key_prefix" not in redis_config :
60+ raise FQException ("Missing config: redis.key_prefix" )
61+ if not isinstance (redis_config ["key_prefix" ], str ) or not redis_config [
62+ "key_prefix"
63+ ]:
64+ raise FQException (
65+ "Invalid config: redis.key_prefix must be a non-empty string"
66+ )
67+
68+ if "conn_type" not in redis_config :
69+ raise FQException ("Missing config: redis.conn_type" )
70+ if redis_config ["conn_type" ] not in {"tcp_sock" , "unix_sock" }:
71+ raise FQException (
72+ "Invalid config: redis.conn_type must be 'tcp_sock' or 'unix_sock'"
73+ )
74+
75+ if "db" not in redis_config :
76+ raise FQException ("Missing config: redis.db" )
77+ if isinstance (redis_config ["db" ], bool ) or not isinstance (
78+ redis_config ["db" ], int
79+ ):
80+ raise FQException ("Invalid config: redis.db must be an integer" )
81+
82+ if "job_expire_interval" not in fq_config :
83+ raise FQException ("Missing config: fq.job_expire_interval" )
84+ if not is_valid_interval (fq_config ["job_expire_interval" ]):
85+ raise FQException (
86+ "Invalid config: fq.job_expire_interval must be a positive integer"
87+ )
88+
89+ if "job_requeue_interval" not in fq_config :
90+ raise FQException ("Missing config: fq.job_requeue_interval" )
91+ if not is_valid_interval (fq_config ["job_requeue_interval" ]):
92+ raise FQException (
93+ "Invalid config: fq.job_requeue_interval must be a positive integer"
94+ )
95+
96+ if "default_job_requeue_limit" not in fq_config :
97+ raise FQException ("Missing config: fq.default_job_requeue_limit" )
98+ if not is_valid_requeue_limit (fq_config ["default_job_requeue_limit" ]):
99+ raise FQException (
100+ "Invalid config: fq.default_job_requeue_limit must be an integer >= -1"
101+ )
102+
103+ if redis_config ["conn_type" ] == "unix_sock" :
104+ if "unix_socket_path" not in redis_config :
105+ raise FQException ("Missing config: redis.unix_socket_path" )
106+ if not isinstance (redis_config ["unix_socket_path" ], str ) or not redis_config [
107+ "unix_socket_path"
108+ ]:
109+ raise FQException (
110+ "Invalid config: redis.unix_socket_path must be a non-empty string"
111+ )
112+
113+ if redis_config ["conn_type" ] == "tcp_sock" :
114+ if "host" not in redis_config :
115+ raise FQException ("Missing config: redis.host" )
116+ if not isinstance (redis_config ["host" ], str ) or not redis_config ["host" ]:
117+ raise FQException (
118+ "Invalid config: redis.host must be a non-empty string"
119+ )
120+
121+ if "port" not in redis_config :
122+ raise FQException ("Missing config: redis.port" )
123+ if isinstance (redis_config ["port" ], bool ) or not isinstance (
124+ redis_config ["port" ], int
125+ ):
126+ raise FQException ("Invalid config: redis.port must be an integer" )
127+
128+ if "clustered" in redis_config and not isinstance (
129+ redis_config ["clustered" ], bool
130+ ):
131+ raise FQException ("Invalid config: redis.clustered must be a boolean" )
132+
133+ if "password" in redis_config and redis_config ["password" ] is not None :
134+ if not isinstance (redis_config ["password" ], str ):
135+ raise FQException ("Invalid config: redis.password must be a string" )
136+
137+ self .config = normalized
41138
42139 async def initialize (self ):
43140 """Async initializer to set up redis and lua scripts."""
44- await self ._initialize ()
45-
46- async def _initialize (self ):
47- """Read the FQ configuration and set up redis + Lua scripts."""
141+ fq_config = self .config ["fq" ]
142+ redis_config = self .config ["redis" ]
48143
49- self ._key_prefix = self ._config .get ("redis" , "key_prefix" )
50- self ._job_expire_interval = int (self ._config .get ("fq" , "job_expire_interval" ))
51- self ._default_job_requeue_limit = int (
52- self ._config .get ("fq" , "default_job_requeue_limit" )
53- )
144+ self ._key_prefix = redis_config ["key_prefix" ]
145+ self ._job_expire_interval = int (fq_config ["job_expire_interval" ])
146+ self ._default_job_requeue_limit = int (fq_config ["default_job_requeue_limit" ])
54147
55- redis_connection_type = self . _config . get ( "redis" , " conn_type")
56- db = self . _config . get ( "redis" , " db")
148+ redis_connection_type = redis_config [ " conn_type"]
149+ db = redis_config [ " db"]
57150
58151 if redis_connection_type == "unix_sock" :
59152 self ._r = Redis (
60153 db = db ,
61- unix_socket_path = self . _config . get ( "redis" , " unix_socket_path") ,
154+ unix_socket_path = redis_config [ " unix_socket_path"] ,
62155 )
63156 elif redis_connection_type == "tcp_sock" :
64157 isclustered = False
65- if self . _config . has_option ( "redis" , " clustered") :
66- isclustered = self . _config . getboolean ( "redis" , " clustered")
158+ if " clustered" in redis_config :
159+ isclustered = redis_config [ " clustered"]
67160
68161 if isclustered :
69162 startup_nodes = [
70163 {
71- "host" : self . _config . get ( "redis" , " host") ,
72- "port" : int (self . _config . get ( "redis" , " port") ),
164+ "host" : redis_config [ " host"] ,
165+ "port" : int (redis_config [ " port"] ),
73166 }
74167 ]
75168 self ._r = RedisCluster (
@@ -80,9 +173,9 @@ async def _initialize(self):
80173 else :
81174 self ._r = Redis (
82175 db = db ,
83- host = self . _config . get ( "redis" , " host") ,
84- port = int (self . _config . get ( "redis" , " port") ),
85- password = self . _config . get ("redis" , "password" ),
176+ host = redis_config [ " host"] ,
177+ port = int (redis_config [ " port"] ),
178+ password = redis_config . get ("password" ),
86179 )
87180 else :
88181 raise FQException ("Unknown redis conn_type: %s" % redis_connection_type )
@@ -107,36 +200,9 @@ async def _validate_redis_connection(self):
107200 if result is False :
108201 raise FQException ("Failed to connect to Redis: ping returned False" )
109202
110- def _load_config (self ):
111- """Read the configuration file and load it into memory."""
112- if not os .path .isfile (self .config_path ):
113- raise FQException ("Config file not found: %s" % self .config_path )
114-
115- self ._config = configparser .ConfigParser ()
116- read_files = self ._config .read (self .config_path )
117-
118- if not read_files :
119- raise FQException ("Unable to read config file: %s" % self .config_path )
120-
121- if not self ._config .has_section ("redis" ) or not self ._config .has_section (
122- "fq"
123- ):
124- raise FQException (
125- "Config file missing required sections: redis, fq (path: %s)"
126- % self .config_path
127- )
128-
129203 def redis_client (self ):
130204 return self ._r
131205
132- def reload_config (self , config_path = None ):
133- """Reload the configuration from the new config file if provided
134- else reload the current config file.
135- """
136- if config_path :
137- self .config_path = config_path
138- self ._load_config ()
139-
140206 def _load_lua_scripts (self ):
141207 """Loads all lua scripts required by FQ."""
142208 # load lua scripts
0 commit comments