import { __rest } from "tslib"; import { parseExpression } from 'cron-parser'; import { Job } from './job'; import { QueueBase } from './queue-base'; export class JobScheduler extends QueueBase { constructor(name, opts, Connection) { super(name, opts, Connection); this.repeatStrategy = (opts.settings && opts.settings.repeatStrategy) || defaultRepeatStrategy; } async upsertJobScheduler(jobSchedulerId, repeatOpts, jobName, jobData, opts, { override }) { const { every, pattern } = repeatOpts; if (pattern && every) { throw new Error('Both .pattern and .every options are defined for this repeatable job'); } if (repeatOpts.immediately && repeatOpts.startDate) { throw new Error('Both .immediately and .startDate options are defined for this repeatable job'); } // Check if we reached the limit of the repeatable job's iterations const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1; if (typeof repeatOpts.limit !== 'undefined' && iterationCount > repeatOpts.limit) { return; } // Check if we reached the end date of the repeatable job let now = Date.now(); const { endDate } = repeatOpts; if (!(typeof endDate === undefined) && now > new Date(endDate).getTime()) { return; } const prevMillis = opts.prevMillis || 0; // Check if we have a start date for the repeatable job const { startDate, immediately } = repeatOpts, filteredRepeatOpts = __rest(repeatOpts, ["startDate", "immediately"]); if (startDate) { const startMillis = new Date(startDate).getTime(); now = startMillis > now ? startMillis : now; } let nextMillis; if (every) { nextMillis = prevMillis + every; if (nextMillis < now) { nextMillis = now; } } else if (pattern) { now = prevMillis < now ? now : prevMillis; nextMillis = await this.repeatStrategy(now, repeatOpts, jobName); } const multi = (await this.client).multi(); if (nextMillis) { if (override) { await this.scripts.addJobScheduler(multi, jobSchedulerId, nextMillis, { name: jobName, endDate: endDate ? new Date(endDate).getTime() : undefined, tz: repeatOpts.tz, pattern, every, }); } else { await this.scripts.updateJobSchedulerNextMillis(multi, jobSchedulerId, nextMillis); } const job = this.createNextJob(multi, jobName, nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts }), jobData, iterationCount); const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] // Check if there are any errors const erroredResult = results.find(result => result[0]); if (erroredResult) { throw new Error(`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`); } // Get last result with the job id const lastResult = results.pop(); job.id = lastResult[1]; return job; } } createNextJob(client, name, nextMillis, jobSchedulerId, opts, data, currentCount) { // // Generate unique job id for this iteration. // const jobId = this.getSchedulerNextJobId({ jobSchedulerId, nextMillis, }); const now = Date.now(); const delay = nextMillis - now; const mergedOpts = Object.assign(Object.assign({}, opts), { jobId, delay: delay < 0 ? 0 : delay, timestamp: now, prevMillis: nextMillis, repeatJobKey: jobSchedulerId }); mergedOpts.repeat = Object.assign(Object.assign({}, opts.repeat), { count: currentCount }); const job = new Job(this, name, data, mergedOpts, jobId); job.addJob(client); return job; } async removeJobScheduler(jobSchedulerId) { return this.scripts.removeJobScheduler(jobSchedulerId); } async getSchedulerData(client, key, next) { const jobData = await client.hgetall(this.toKey('repeat:' + key)); if (jobData) { return { key, name: jobData.name, endDate: parseInt(jobData.endDate) || null, tz: jobData.tz || null, pattern: jobData.pattern || null, every: jobData.every || null, next, }; } return this.keyToData(key, next); } keyToData(key, next) { const data = key.split(':'); const pattern = data.slice(4).join(':') || null; return { key, name: data[0], id: data[1] || null, endDate: parseInt(data[2]) || null, tz: data[3] || null, pattern, next, }; } async getJobScheduler(id) { const client = await this.client; const jobData = await client.hgetall(this.toKey('repeat:' + id)); if (jobData) { return { key: id, name: jobData.name, endDate: parseInt(jobData.endDate) || null, tz: jobData.tz || null, pattern: jobData.pattern || null, every: jobData.every || null, }; } } async getJobSchedulers(start = 0, end = -1, asc = false) { const client = await this.client; const jobSchedulersKey = this.keys.repeat; const result = asc ? await client.zrange(jobSchedulersKey, start, end, 'WITHSCORES') : await client.zrevrange(jobSchedulersKey, start, end, 'WITHSCORES'); const jobs = []; for (let i = 0; i < result.length; i += 2) { jobs.push(this.getSchedulerData(client, result[i], parseInt(result[i + 1]))); } return Promise.all(jobs); } async getSchedulersCount(client, prefix, queueName) { return client.zcard(`${prefix}:${queueName}:repeat`); } getSchedulerNextJobId({ nextMillis, jobSchedulerId, }) { return `repeat:${jobSchedulerId}:${nextMillis}`; } } export const defaultRepeatStrategy = (millis, opts) => { const { pattern } = opts; const currentDate = new Date(millis); const interval = parseExpression(pattern, Object.assign(Object.assign({}, opts), { currentDate })); try { if (opts.immediately) { return new Date().getTime(); } else { return interval.next().getTime(); } } catch (e) { // Ignore error } }; //# sourceMappingURL=job-scheduler.js.map