From 227c85c2cfaf0001287632fc14aae060a674fe5d Mon Sep 17 00:00:00 2001 From: riku6460 <17585784+riku6460@users.noreply.github.com> Date: Fri, 18 Oct 2024 08:40:15 +0900 Subject: [PATCH] =?UTF-8?q?enhance(queue):=20deliver=20queue=20=E3=82=92?= =?UTF-8?q?=E8=A4=87=E6=95=B0=E5=80=8B=E8=A8=AD=E5=AE=9A=E3=81=A7=E3=81=8D?= =?UTF-8?q?=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=99=E3=82=8B=20(Missk?= =?UTF-8?q?eyIO#745)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/backend/src/config.ts | 6 +- packages/backend/src/core/QueueModule.ts | 5 +- packages/backend/src/misc/queues.ts | 69 +++++++++++++++++++ .../src/queue/QueueProcessorService.ts | 50 +++++++------- .../src/server/web/ClientServerService.ts | 4 +- packages/frontend/vite.config.local-dev.ts | 1 + 6 files changed, 105 insertions(+), 30 deletions(-) create mode 100644 packages/backend/src/misc/queues.ts diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index 918fa6021..712ee5442 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -50,7 +50,7 @@ type Source = { redisForJobQueue?: RedisOptionsSource; redisForSystemQueue?: RedisOptionsSource; redisForEndedPollNotificationQueue?: RedisOptionsSource; - redisForDeliverQueue?: RedisOptionsSource; + redisForDeliverQueues?: Array; redisForInboxQueue?: RedisOptionsSource; redisForDbQueue?: RedisOptionsSource; redisForRelationshipQueue?: RedisOptionsSource; @@ -220,7 +220,7 @@ export type Config = { redisForPubsub: RedisOptions & RedisOptionsSource; redisForSystemQueue: RedisOptions & RedisOptionsSource; redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource; - redisForDeliverQueue: RedisOptions & RedisOptionsSource; + redisForDeliverQueues: Array; redisForInboxQueue: RedisOptions & RedisOptionsSource; redisForDbQueue: RedisOptions & RedisOptionsSource; redisForRelationshipQueue: RedisOptions & RedisOptionsSource; @@ -296,7 +296,7 @@ export function loadConfig(): Config { redisForPubsub: config.redisForPubsub ? convertRedisOptions(config.redisForPubsub, host) : redis, redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue, redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue, - redisForDeliverQueue: config.redisForDeliverQueue ? convertRedisOptions(config.redisForDeliverQueue, host) : redisForJobQueue, + redisForDeliverQueues: config.redisForDeliverQueues ? config.redisForDeliverQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue], redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue, redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue, redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue, diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 1537deca6..85ea2f415 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -9,12 +9,13 @@ import { DI } from '@/di-symbols.js'; import type { Config } from '@/config.js'; import { QUEUE, baseQueueOptions } from '@/queue/const.js'; import { allSettled } from '@/misc/promise-tracker.js'; +import { Queues } from '@/misc/queues.js'; import type { Provider } from '@nestjs/common'; import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, WebhookDeliverJobData, RelationshipJobData } from '../queue/types.js'; export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; -export type DeliverQueue = Bull.Queue; +export type DeliverQueue = Queues; export type InboxQueue = Bull.Queue; export type DbQueue = Bull.Queue; export type RelationshipQueue = Bull.Queue; @@ -35,7 +36,7 @@ const $endedPollNotification: Provider = { const $deliver: Provider = { provide: 'queue:deliver', - useFactory: (config: Config) => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(config.redisForDeliverQueue, config.bullmqQueueOptions, QUEUE.DELIVER)), + useFactory: (config: Config) => new Queues(config.redisForDeliverQueues.map(queueConfig => new Bull.Queue(QUEUE.DELIVER, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.DELIVER)))), inject: [DI.config], }; diff --git a/packages/backend/src/misc/queues.ts b/packages/backend/src/misc/queues.ts new file mode 100644 index 000000000..6a6fb9ba6 --- /dev/null +++ b/packages/backend/src/misc/queues.ts @@ -0,0 +1,69 @@ +import { EventEmitter } from 'node:events'; +import * as Bull from 'bullmq'; + +export class Queues { + public readonly queues: ReadonlyArray>; + + constructor(queues: Bull.Queue[]) { + if (queues.length === 0) { + throw new Error('queues cannot be empty.'); + } + this.queues = queues; + } + + getRandomQueue(): Bull.Queue { + return this.queues[Math.floor(Math.random() * this.queues.length)]; + } + + add(name: NameType, data: DataType, opts?: Bull.JobsOptions): Promise> { + return this.getRandomQueue().add(name, data, opts); + } + + async addBulk(jobs: { name: NameType; data: DataType; opts?: Bull.BulkJobOptions }[]): Promise[]> { + return (await Promise.allSettled(jobs.map(job => this.add(job.name, job.data, job.opts)))) + .filter((value): value is PromiseFulfilledResult> => value.status === 'fulfilled') + .flatMap(value => value.value); + } + + async close(): Promise { + await Promise.allSettled(this.queues.map(queue => queue.close())); + } + + async getDelayed(start?: number, end?: number): Promise[]> { + return (await Promise.allSettled(this.queues.map(queue => queue.getDelayed(start, end)))) + .filter((value): value is PromiseFulfilledResult[]> => value.status === 'fulfilled') + .flatMap(value => value.value); + } + + async getJobCounts(...types: Bull.JobType[]): Promise<{ [p: string]: number }> { + return (await Promise.allSettled(this.queues.map(queue => queue.getJobCounts(...types)))) + .filter((value): value is PromiseFulfilledResult> => value.status === 'fulfilled') + .reduce((previousValue, currentValue) => { + for (const key in currentValue.value) { + previousValue[key] = (previousValue[key] || 0) + currentValue.value[key]; + } + return previousValue; + }, {} as Record); + } + + once>(event: U, listener: Bull.QueueListener[U]): void { + const e = new EventEmitter(); + e.once(event, listener); + + const listener1 = (...args: any[]) => e.emit(event, ...args); + this.queues.forEach(queue => queue.once(event, listener1)); + e.once(event, () => this.queues.forEach(queue => queue.off(event, listener1))); + } + + async clean(grace: number, limit: number, type?: 'completed' | 'wait' | 'active' | 'paused' | 'prioritized' | 'delayed' | 'failed'): Promise { + return (await Promise.allSettled(this.queues.map(queue => queue.clean(grace, limit, type)))) + .filter((value): value is PromiseFulfilledResult => value.status === 'fulfilled') + .flatMap(value => value.value); + } + + async getJobs(types?: Bull.JobType[] | Bull.JobType, start?: number, end?: number, asc?: boolean): Promise[]> { + return (await Promise.allSettled(this.queues.map(queue => queue.getJobs(types, start, end, asc)))) + .filter((value): value is PromiseFulfilledResult[]> => value.status === 'fulfilled') + .flatMap(value => value.value); + } +} diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index 738ba9e25..e1b369ab8 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -75,7 +75,7 @@ export class QueueProcessorService implements OnApplicationShutdown { private logger: Logger; private systemQueueWorker: Bull.Worker; private dbQueueWorker: Bull.Worker; - private deliverQueueWorker: Bull.Worker; + private deliverQueueWorkers: Bull.Worker[]; private inboxQueueWorker: Bull.Worker; private webhookDeliverQueueWorker: Bull.Worker; private relationshipQueueWorker: Bull.Worker; @@ -206,27 +206,31 @@ export class QueueProcessorService implements OnApplicationShutdown { //#endregion //#region deliver - this.deliverQueueWorker = new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForDeliverQueue, this.config.bullmqWorkerOptions, QUEUE.DELIVER), - autorun: false, - concurrency: this.config.deliverJobConcurrency ?? 128, - limiter: { - max: this.config.deliverJobPerSec ?? 128, - duration: 1000, - }, - settings: { - backoffStrategy: httpRelatedBackoff, - }, + this.deliverQueueWorkers = this.config.redisForDeliverQueues + .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) + .map(config => new Bull.Worker(QUEUE.DELIVER, (job) => this.deliverProcessorService.process(job), { + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.DELIVER), + autorun: false, + concurrency: this.config.deliverJobConcurrency ?? 128, + limiter: { + max: this.config.deliverJobPerSec ?? 128, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + })); + + this.deliverQueueWorkers.forEach((worker, index) => { + const deliverLogger = this.logger.createSubLogger(`deliver-${index}`); + + worker + .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) + .on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) + .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) })) + .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); }); - - const deliverLogger = this.logger.createSubLogger('deliver'); - - this.deliverQueueWorker - .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} to=${job ? job.data.to : '-'}`)) - .on('error', (err: Error) => deliverLogger.error(`error ${err.stack}`, { error: renderError(err) })) - .on('stalled', (jobId) => deliverLogger.warn(`stalled id=${jobId}`)); //#endregion //#region inbox @@ -342,7 +346,7 @@ export class QueueProcessorService implements OnApplicationShutdown { await Promise.all([ this.systemQueueWorker.run(), this.dbQueueWorker.run(), - this.deliverQueueWorker.run(), + ...this.deliverQueueWorkers.map(worker => worker.run()), this.inboxQueueWorker.run(), this.webhookDeliverQueueWorker.run(), this.relationshipQueueWorker.run(), @@ -356,7 +360,7 @@ export class QueueProcessorService implements OnApplicationShutdown { await Promise.all([ this.systemQueueWorker.close(), this.dbQueueWorker.close(), - this.deliverQueueWorker.close(), + ...this.deliverQueueWorkers.map(worker => worker.close()), this.inboxQueueWorker.close(), this.webhookDeliverQueueWorker.close(), this.relationshipQueueWorker.close(), diff --git a/packages/backend/src/server/web/ClientServerService.ts b/packages/backend/src/server/web/ClientServerService.ts index 027fe75dd..a0b0d6038 100644 --- a/packages/backend/src/server/web/ClientServerService.ts +++ b/packages/backend/src/server/web/ClientServerService.ts @@ -235,12 +235,12 @@ export class ClientServerService { queues: [ this.systemQueue, this.endedPollNotificationQueue, - this.deliverQueue, this.inboxQueue, this.dbQueue, this.objectStorageQueue, this.webhookDeliverQueue, - ].map(q => new BullMQAdapter(q)), + ].map(q => new BullMQAdapter(q)) + .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), serverAdapter, }); diff --git a/packages/frontend/vite.config.local-dev.ts b/packages/frontend/vite.config.local-dev.ts index 589e7c3dc..04e821083 100644 --- a/packages/frontend/vite.config.local-dev.ts +++ b/packages/frontend/vite.config.local-dev.ts @@ -53,6 +53,7 @@ const devConfig = { '/cli': httpUrl, '/inbox': httpUrl, '/emoji/': httpUrl, + '/queue': httpUrl, '/notes': { target: httpUrl, headers: {