/// import { URL } from 'url'; import { AbortController } from 'node-abort-controller'; import { GetNextJobOptions, IoredisListener, JobJsonRaw, Processor, RedisClient, WorkerOptions } from '../interfaces'; import { QueueBase } from './queue-base'; import { Repeat } from './repeat'; import { Job } from './job'; import { RedisConnection } from './redis-connection'; import { JobScheduler } from './job-scheduler'; export interface WorkerListener extends IoredisListener { /** * Listen to 'active' event. * * This event is triggered when a job enters the 'active' state. */ active: (job: Job, prev: string) => void; /** * Listen to 'closing' event. * * This event is triggered when the worker is closed. */ closed: () => void; /** * Listen to 'closing' event. * * This event is triggered when the worker is closing. */ closing: (msg: string) => void; /** * Listen to 'completed' event. * * This event is triggered when a job has successfully completed. */ completed: (job: Job, result: ResultType, prev: string) => void; /** * Listen to 'drained' event. * * This event is triggered when the queue has drained the waiting list. * Note that there could still be delayed jobs waiting their timers to expire * and this event will still be triggered as long as the waiting list has emptied. */ drained: () => void; /** * Listen to 'error' event. * * This event is triggered when an error is throw. */ error: (failedReason: Error) => void; /** * Listen to 'failed' event. * * This event is triggered when a job has thrown an exception. * Note: job parameter could be received as undefined when an stalled job * reaches the stalled limit and it is deleted by the removeOnFail option. */ failed: (job: Job | undefined, error: Error, prev: string) => void; /** * Listen to 'paused' event. * * This event is triggered when the queue is paused. */ paused: () => void; /** * Listen to 'progress' event. * * This event is triggered when a job updates it progress, i.e. the * Job##updateProgress() method is called. This is useful to notify * progress or any other data from within a processor to the rest of the * world. */ progress: (job: Job, progress: number | object) => void; /** * Listen to 'ready' event. * * This event is triggered when blockingConnection is ready. */ ready: () => void; /** * Listen to 'resumed' event. * * This event is triggered when the queue is resumed. */ resumed: () => void; /** * Listen to 'stalled' event. * * This event is triggered when a job has stalled and * has been moved back to the wait list. */ stalled: (jobId: string, prev: string) => void; } /** * * This class represents a worker that is able to process jobs from the queue. * As soon as the class is instantiated and a connection to Redis is established * it will start processing jobs. * */ export declare class Worker extends QueueBase { readonly opts: WorkerOptions; readonly id: string; private abortDelayController; private asyncFifoQueue; private blockingConnection; private blockUntil; private _concurrency; private childPool; private drained; private extendLocksTimer; private limitUntil; private resumeWorker; private stalledCheckStopper?; private waiting; private _repeat; private _jobScheduler; protected paused: Promise; protected processFn: Processor; protected running: boolean; static RateLimitError(): Error; constructor(name: string, processor?: string | URL | null | Processor, opts?: WorkerOptions, Connection?: typeof RedisConnection); emit>(event: U, ...args: Parameters[U]>): boolean; off>(eventName: U, listener: WorkerListener[U]): this; on>(event: U, listener: WorkerListener[U]): this; once>(event: U, listener: WorkerListener[U]): this; protected callProcessJob(job: Job, token: string): Promise; protected createJob(data: JobJsonRaw, jobId: string): Job; /** * * Waits until the worker is ready to start processing jobs. * In general only useful when writing tests. * */ waitUntilReady(): Promise; set concurrency(concurrency: number); get concurrency(): number; get repeat(): Promise; get jobScheduler(): Promise; run(): Promise; /** * Returns a promise that resolves to the next job in queue. * @param token - worker token to be assigned to retrieved job * @returns a Job or undefined if no job was available in the queue. */ getNextJob(token: string, { block }?: GetNextJobOptions): Promise>; private _getNextJob; /** * Overrides the rate limit to be active for the next jobs. * * @param expireTimeMs - expire time in ms of this rate limit. */ rateLimit(expireTimeMs: number): Promise; get minimumBlockTimeout(): number; protected moveToActive(client: RedisClient, token: string, name?: string): Promise>; private waitForJob; protected getBlockTimeout(blockUntil: number): number; protected getLimitUntil(limitUntil: number): number; /** * * This function is exposed only for testing purposes. */ delay(milliseconds?: number, abortController?: AbortController): Promise; private updateDelays; protected nextJobFromJobData(jobData?: JobJsonRaw, jobId?: string, token?: string): Promise>; processJob(job: Job, token: string, fetchNextCallback: () => boolean, jobsInProgress: Set<{ job: Job; ts: number; }>): Promise>; /** * * Pauses the processing of this queue only for this worker. */ pause(doNotWaitActive?: boolean): Promise; /** * * Resumes processing of this worker (if paused). */ resume(): void; /** * * Checks if worker is paused. * * @returns true if worker is paused, false otherwise. */ isPaused(): boolean; /** * * Checks if worker is currently running. * * @returns true if worker is running, false otherwise. */ isRunning(): boolean; /** * * Closes the worker and related redis connections. * * This method waits for current jobs to finalize before returning. * * @param force - Use force boolean parameter if you do not want to wait for * current jobs to be processed. When using telemetry, be mindful that it can * interfere with the proper closure of spans, potentially preventing them from being exported. * * @returns Promise that resolves when the worker has been closed. */ close(force?: boolean): Promise; /** * * Manually starts the stalled checker. * The check will run once as soon as this method is called, and * then every opts.stalledInterval milliseconds until the worker is closed. * Note: Normally you do not need to call this method, since the stalled checker * is automatically started when the worker starts processing jobs after * calling run. However if you want to process the jobs manually you need * to call this method to start the stalled checker. * * @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} */ startStalledCheckTimer(): Promise; private stalledChecker; private startLockExtenderTimer; /** * Returns a promise that resolves when active jobs are cleared * * @returns */ private whenCurrentJobsFinished; private retryIfFailed; protected extendLocks(jobs: Job[]): Promise; private moveStalledJobsToWait; private notifyFailedJobs; private moveLimitedBackToWait; }