"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.drain = void 0; const content = `--[[ Drains the queue, removes all jobs that are waiting or delayed, but not active, completed or failed Input: KEYS[1] 'wait', KEYS[2] 'paused' KEYS[3] 'delayed' KEYS[4] 'prioritized' KEYS[5] 'jobschedulers' (repeat) ARGV[1] queue key prefix ]] local rcall = redis.call local queueBaseKey = ARGV[1] --[[ Functions to remove jobs. ]] -- Includes --[[ Functions to remove jobs. ]] -- Includes --[[ Function to remove job. ]] -- Includes --[[ Function to remove deduplication key. ]] local function removeDeduplicationKey(prefixKey, jobKey) local deduplicationId = rcall("HGET", jobKey, "deid") if deduplicationId then local deduplicationKey = prefixKey .. "de:" .. deduplicationId rcall("DEL", deduplicationKey) end end --[[ Function to remove job keys. ]] local function removeJobKeys(jobKey) return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed') end --[[ Check if this job has a parent. If so we will just remove it from the parent child list, but if it is the last child we should move the parent to "wait/paused" which requires code from "moveToFinished" ]] -- Includes --[[ Function to add job in target list and add marker if needed. ]] -- Includes --[[ Add marker if needed when a job is available. ]] local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end --[[ Functions to destructure job key. Just a bit of warning, these functions may be a bit slow and affect performance significantly. ]] local getJobIdFromKey = function (jobKey) return string.match(jobKey, ".*:(.*)") end local getJobKeyPrefix = function (jobKey, jobId) return string.sub(jobKey, 0, #jobKey - #jobId) end --[[ Function to check for the meta.paused key to decide if we are paused or not (since an empty list and !EXISTS are not really the same). ]] local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") if queueAttributes[1] then return pausedKey, true else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) if activeCount >= tonumber(queueAttributes[2]) then return waitKey, true else return waitKey, false end end end return waitKey, false end local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", parentPrefix .. "wait", parentPrefix .. "paused") addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children") end end local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(parentKey) local parentPrefix = getJobKeyPrefix(parentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then -- remove parent in same queue if parentPrefix == baseKey then removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) if debounceId then rcall("DEL", parentPrefix .. "de:" .. debounceId) end else moveParentToWait(parentPrefix, parentId) end else moveParentToWait(parentPrefix, parentId, true) end end end return true end else local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid") local missedParentKey = parentAttributes[1] if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(missedParentKey) local parentPrefix = getJobKeyPrefix(missedParentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then if parentPrefix == baseKey then removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil) removeJobKeys(missedParentKey) if parentAttributes[2] then rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2]) end else moveParentToWait(parentPrefix, parentId) end else moveParentToWait(parentPrefix, parentId, true) end end end return true end end end return false end local function removeJob(jobId, hard, baseKey, shouldRemoveDeduplicationKey) local jobKey = baseKey .. jobId removeParentDependencyKey(jobKey, hard, nil, baseKey) if shouldRemoveDeduplicationKey then removeDeduplicationKey(baseKey, jobKey) end removeJobKeys(jobKey) end local function removeJobs(keys, hard, baseKey, max) for i, key in ipairs(keys) do removeJob(key, hard, baseKey, true --[[remove debounce key]]) end return max - #keys end local function getListItems(keyName, max) return rcall('LRANGE', keyName, 0, max - 1) end local function removeListJobs(keyName, hard, baseKey, max) local jobs = getListItems(keyName, max) local count = removeJobs(jobs, hard, baseKey, max) rcall("LTRIM", keyName, #jobs, -1) return count end -- Includes --[[ Function to loop in batches. Just a bit of warning, some commands as ZREM could receive a maximum of 7000 parameters per call. ]] local function batches(n, batchSize) local i = 0 return function() local from = i * batchSize + 1 i = i + 1 if (from <= n) then local to = math.min(from + batchSize - 1, n) return from, to end end end --[[ Function to get ZSet items. ]] local function getZSetItems(keyName, max) return rcall('ZRANGE', keyName, 0, max - 1) end local function removeZSetJobs(keyName, hard, baseKey, max, jobsToIgnore) local jobs = getZSetItems(keyName, max) -- filter out jobs to ignore if jobsToIgnore then local filteredJobs = {} for i = 1, #jobs do if not jobsToIgnore[jobs[i]] then table.insert(filteredJobs, jobs[i]) end end jobs = filteredJobs end local count = removeJobs(jobs, hard, baseKey, max) if(#jobs > 0) then for from, to in batches(#jobs, 7000) do rcall("ZREM", keyName, unpack(jobs, from, to)) end end return count end removeListJobs(KEYS[1], true, queueBaseKey, 0) -- wait removeListJobs(KEYS[2], true, queueBaseKey, 0) -- paused if KEYS[3] ~= "" then -- We must not remove delayed jobs if they are associated to a job scheduler. local scheduledJobs = {} local jobSchedulers = rcall("ZRANGE", KEYS[5], 0, -1, "WITHSCORES") -- For every job scheduler, get the current delayed job id. for i = 1, #jobSchedulers, 2 do local jobSchedulerId = jobSchedulers[i] local jobSchedulerMillis = jobSchedulers[i + 1] local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. jobSchedulerMillis scheduledJobs[delayedJobId] = true end removeZSetJobs(KEYS[3], true, queueBaseKey, 0, scheduledJobs) -- delayed end removeZSetJobs(KEYS[4], true, queueBaseKey, 0) -- prioritized `; exports.drain = { name: 'drain', content, keys: 5, }; //# sourceMappingURL=drain-5.js.map