--[[ Function to move job from wait state to active. Input: opts - token - lock token opts - lockDuration opts - limiter ]] local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, jobId, processedOn, maxJobs, opts) local jobKey = keyPrefix .. jobId -- Check if we need to perform rate limiting. if maxJobs then local jobCounter = tonumber(rcall("INCR", rateLimiterKey)) if jobCounter == 1 then local limiterDuration = opts['limiter'] and opts['limiter']['duration'] local integerDuration = math.floor(math.abs(limiterDuration)) rcall("PEXPIRE", rateLimiterKey, integerDuration) end end local lockKey = jobKey .. ':lock' -- get a lock if opts['token'] ~= "0" then rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration']) end local optionalValues = {} if opts['name'] then -- Set "processedBy" field to the worker name table.insert(optionalValues, "pb") table.insert(optionalValues, opts['name']) end rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting") rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues)) rcall("HINCRBY", jobKey, "ats", 1) return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data end