"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); exports.removeJob = void 0; const content = `--[[ Remove a job from all the queues it may be in as well as all its data. In order to be able to remove a job, it cannot be active. Input: KEYS[1] queue prefix KEYS[2] meta key ARGV[1] jobId ARGV[2] remove children Events: 'removed' ]] local rcall = redis.call -- Includes --[[ Functions to destructure job key. Just a bit of warning, these functions may be a bit slow and affect performance significantly. ]] local getJobIdFromKey = function (jobKey) return string.match(jobKey, ".*:(.*)") end local getJobKeyPrefix = function (jobKey, jobId) return string.sub(jobKey, 0, #jobKey - #jobId) end --[[ Function to get max events value or set by default 10000. ]] local function getOrSetMaxEvents(metaKey) local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents") if not maxEvents then maxEvents = 10000 rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents) end return maxEvents end --[[ Function to recursively check if there are no locks on the jobs to be removed. returns: boolean ]] local function isLocked( prefix, jobId, removeChildren) local jobKey = prefix .. jobId; -- Check if this job is locked local lockKey = jobKey .. ':lock' local lock = rcall("GET", lockKey) if not lock then if removeChildren == "1" then local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies") if (#dependencies > 0) then for i, childJobKey in ipairs(dependencies) do -- We need to get the jobId for this job. local childJobId = getJobIdFromKey(childJobKey) local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId) local result = isLocked( childJobPrefix, childJobId, removeChildren ) if result then return true end end end end return false end return true end --[[ Function to remove deduplication key. ]] local function removeDeduplicationKey(prefixKey, jobKey) local deduplicationId = rcall("HGET", jobKey, "deid") if deduplicationId then local deduplicationKey = prefixKey .. "de:" .. deduplicationId rcall("DEL", deduplicationKey) end end --[[ Function to remove from any state. returns: prev state ]] local function removeJobFromAnyState( prefix, jobId) -- We start with the ZSCORE checks, since they have O(1) complexity if rcall("ZSCORE", prefix .. "completed", jobId) then rcall("ZREM", prefix .. "completed", jobId) return "completed" elseif rcall("ZSCORE", prefix .. "waiting-children", jobId) then rcall("ZREM", prefix .. "waiting-children", jobId) return "waiting-children" elseif rcall("ZSCORE", prefix .. "delayed", jobId) then rcall("ZREM", prefix .. "delayed", jobId) return "delayed" elseif rcall("ZSCORE", prefix .. "failed", jobId) then rcall("ZREM", prefix .. "failed", jobId) return "failed" elseif rcall("ZSCORE", prefix .. "prioritized", jobId) then rcall("ZREM", prefix .. "prioritized", jobId) return "prioritized" -- We remove only 1 element from the list, since we assume they are not added multiple times elseif rcall("LREM", prefix .. "wait", 1, jobId) == 1 then return "wait" elseif rcall("LREM", prefix .. "paused", 1, jobId) == 1 then return "paused" elseif rcall("LREM", prefix .. "active", 1, jobId) == 1 then return "active" end return "unknown" end --[[ Function to remove job keys. ]] local function removeJobKeys(jobKey) return rcall("DEL", jobKey, jobKey .. ':logs', jobKey .. ':dependencies', jobKey .. ':processed', jobKey .. ':failed') end --[[ Check if this job has a parent. If so we will just remove it from the parent child list, but if it is the last child we should move the parent to "wait/paused" which requires code from "moveToFinished" ]] -- Includes --[[ Function to add job in target list and add marker if needed. ]] -- Includes --[[ Add marker if needed when a job is available. ]] local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then rcall("ZADD", markerKey, 0, "0") end end local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end --[[ Function to check for the meta.paused key to decide if we are paused or not (since an empty list and !EXISTS are not really the same). ]] local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") if queueAttributes[1] then return pausedKey, true else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) if activeCount >= tonumber(queueAttributes[2]) then return waitKey, true else return waitKey, false end end end return waitKey, false end local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", parentPrefix .. "wait", parentPrefix .. "paused") addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" rcall("XADD", parentEventStream, "*", "event", "waiting", "jobId", parentId, "prev", "waiting-children") end end local function removeParentDependencyKey(jobKey, hard, parentKey, baseKey, debounceId) if parentKey then local parentDependenciesKey = parentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(parentKey) local parentPrefix = getJobKeyPrefix(parentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then -- remove parent in same queue if parentPrefix == baseKey then removeParentDependencyKey(parentKey, hard, nil, baseKey, nil) removeJobKeys(parentKey) if debounceId then rcall("DEL", parentPrefix .. "de:" .. debounceId) end else moveParentToWait(parentPrefix, parentId) end else moveParentToWait(parentPrefix, parentId, true) end end end return true end else local parentAttributes = rcall("HMGET", jobKey, "parentKey", "deid") local missedParentKey = parentAttributes[1] if( (type(missedParentKey) == "string") and missedParentKey ~= "" and (rcall("EXISTS", missedParentKey) == 1)) then local parentDependenciesKey = missedParentKey .. ":dependencies" local result = rcall("SREM", parentDependenciesKey, jobKey) if result > 0 then local pendingDependencies = rcall("SCARD", parentDependenciesKey) if pendingDependencies == 0 then local parentId = getJobIdFromKey(missedParentKey) local parentPrefix = getJobKeyPrefix(missedParentKey, parentId) local numRemovedElements = rcall("ZREM", parentPrefix .. "waiting-children", parentId) if numRemovedElements == 1 then if hard then if parentPrefix == baseKey then removeParentDependencyKey(missedParentKey, hard, nil, baseKey, nil) removeJobKeys(missedParentKey) if parentAttributes[2] then rcall("DEL", parentPrefix .. "de:" .. parentAttributes[2]) end else moveParentToWait(parentPrefix, parentId) end else moveParentToWait(parentPrefix, parentId, true) end end end return true end end end return false end local function removeJob(prefix, jobId, parentKey, removeChildren) local jobKey = prefix .. jobId; removeParentDependencyKey(jobKey, false, parentKey, nil) if removeChildren == "1" then -- Check if this job has children -- If so, we are going to try to remove the children recursively in deep first way because -- if some job is locked we must exit with and error. -- local countProcessed = rcall("HLEN", jobKey .. ":processed") local processed = rcall("HGETALL", jobKey .. ":processed") if (#processed > 0) then for i = 1, #processed, 2 do local childJobId = getJobIdFromKey(processed[i]) local childJobPrefix = getJobKeyPrefix(processed[i], childJobId) removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies") if (#dependencies > 0) then for i, childJobKey in ipairs(dependencies) do -- We need to get the jobId for this job. local childJobId = getJobIdFromKey(childJobKey) local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId) removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end local failed = rcall("HGETALL", jobKey .. ":failed") if (#failed > 0) then for i = 1, #failed, 2 do local childJobId = getJobIdFromKey(failed[i]) local childJobPrefix = getJobKeyPrefix(failed[i], childJobId) removeJob(childJobPrefix, childJobId, jobKey, removeChildren) end end end local prev = removeJobFromAnyState(prefix, jobId) removeDeduplicationKey(prefix, jobKey) if removeJobKeys(jobKey) > 0 then local maxEvents = getOrSetMaxEvents(KEYS[2]) rcall("XADD", prefix .. "events", "MAXLEN", "~", maxEvents, "*", "event", "removed", "jobId", jobId, "prev", prev) end end local prefix = KEYS[1] local jobId = ARGV[1] local shouldRemoveChildren = ARGV[2] local jobKey = prefix .. jobId -- Check if the job belongs to a job scheduler and it is in delayed state. if rcall("ZSCORE", prefix .. "delayed", jobId) and rcall("HGET", jobKey, "rjk") then return -8 -- Return error code as the job is part of a job scheduler and is in delayed state. end if not isLocked(prefix, jobId, shouldRemoveChildren) then removeJob(prefix, jobId, nil, shouldRemoveChildren) return 1 end return 0 `; exports.removeJob = { name: 'removeJob', content, keys: 2, }; //# sourceMappingURL=removeJob-2.js.map