diff --git a/packages/backend/src/config.ts b/packages/backend/src/config.ts index 712ee5442..046d9fe1b 100644 --- a/packages/backend/src/config.ts +++ b/packages/backend/src/config.ts @@ -51,9 +51,9 @@ type Source = { redisForSystemQueue?: RedisOptionsSource; redisForEndedPollNotificationQueue?: RedisOptionsSource; redisForDeliverQueues?: Array; - redisForInboxQueue?: RedisOptionsSource; + redisForInboxQueues?: Array; redisForDbQueue?: RedisOptionsSource; - redisForRelationshipQueue?: RedisOptionsSource; + redisForRelationshipQueues?: Array; redisForObjectStorageQueue?: RedisOptionsSource; redisForWebhookDeliverQueue?: RedisOptionsSource; redisForTimelines?: RedisOptionsSource; @@ -221,9 +221,9 @@ export type Config = { redisForSystemQueue: RedisOptions & RedisOptionsSource; redisForEndedPollNotificationQueue: RedisOptions & RedisOptionsSource; redisForDeliverQueues: Array; - redisForInboxQueue: RedisOptions & RedisOptionsSource; + redisForInboxQueues: Array; redisForDbQueue: RedisOptions & RedisOptionsSource; - redisForRelationshipQueue: RedisOptions & RedisOptionsSource; + redisForRelationshipQueues: Array; redisForObjectStorageQueue: RedisOptions & RedisOptionsSource; redisForWebhookDeliverQueue: RedisOptions & RedisOptionsSource; redisForTimelines: RedisOptions & RedisOptionsSource; @@ -297,9 +297,9 @@ export function loadConfig(): Config { redisForSystemQueue: config.redisForSystemQueue ? convertRedisOptions(config.redisForSystemQueue, host) : redisForJobQueue, redisForEndedPollNotificationQueue: config.redisForEndedPollNotificationQueue ? convertRedisOptions(config.redisForEndedPollNotificationQueue, host) : redisForJobQueue, redisForDeliverQueues: config.redisForDeliverQueues ? config.redisForDeliverQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue], - redisForInboxQueue: config.redisForInboxQueue ? convertRedisOptions(config.redisForInboxQueue, host) : redisForJobQueue, + redisForInboxQueues: config.redisForInboxQueues ? config.redisForInboxQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue], redisForDbQueue: config.redisForDbQueue ? convertRedisOptions(config.redisForDbQueue, host) : redisForJobQueue, - redisForRelationshipQueue: config.redisForRelationshipQueue ? convertRedisOptions(config.redisForRelationshipQueue, host) : redisForJobQueue, + redisForRelationshipQueues: config.redisForRelationshipQueues ? config.redisForRelationshipQueues.map(config => convertRedisOptions(config, host)) : [redisForJobQueue], redisForObjectStorageQueue: config.redisForObjectStorageQueue ? convertRedisOptions(config.redisForObjectStorageQueue, host) : redisForJobQueue, redisForWebhookDeliverQueue: config.redisForWebhookDeliverQueue ? convertRedisOptions(config.redisForWebhookDeliverQueue, host) : redisForJobQueue, redisForTimelines: config.redisForTimelines ? convertRedisOptions(config.redisForTimelines, host) : redis, diff --git a/packages/backend/src/core/QueueModule.ts b/packages/backend/src/core/QueueModule.ts index 85ea2f415..d89fff81e 100644 --- a/packages/backend/src/core/QueueModule.ts +++ b/packages/backend/src/core/QueueModule.ts @@ -16,9 +16,9 @@ import type { DeliverJobData, InboxJobData, EndedPollNotificationJobData, Webhoo export type SystemQueue = Bull.Queue>; export type EndedPollNotificationQueue = Bull.Queue; export type DeliverQueue = Queues; -export type InboxQueue = Bull.Queue; +export type InboxQueue = Queues; export type DbQueue = Bull.Queue; -export type RelationshipQueue = Bull.Queue; +export type RelationshipQueue = Queues; export type ObjectStorageQueue = Bull.Queue; export type WebhookDeliverQueue = Bull.Queue; @@ -42,7 +42,7 @@ const $deliver: Provider = { const $inbox: Provider = { provide: 'queue:inbox', - useFactory: (config: Config) => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(config.redisForInboxQueue, config.bullmqQueueOptions, QUEUE.INBOX)), + useFactory: (config: Config) => new Queues(config.redisForInboxQueues.map(queueConfig => new Bull.Queue(QUEUE.INBOX, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.INBOX)))), inject: [DI.config], }; @@ -54,7 +54,7 @@ const $db: Provider = { const $relationship: Provider = { provide: 'queue:relationship', - useFactory: (config: Config) => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(config.redisForRelationshipQueue, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)), + useFactory: (config: Config) => new Queues(config.redisForRelationshipQueues.map(queueConfig => new Bull.Queue(QUEUE.RELATIONSHIP, baseQueueOptions(queueConfig, config.bullmqQueueOptions, QUEUE.RELATIONSHIP)))), inject: [DI.config], }; diff --git a/packages/backend/src/queue/QueueProcessorService.ts b/packages/backend/src/queue/QueueProcessorService.ts index e1b369ab8..ac571db25 100644 --- a/packages/backend/src/queue/QueueProcessorService.ts +++ b/packages/backend/src/queue/QueueProcessorService.ts @@ -76,9 +76,9 @@ export class QueueProcessorService implements OnApplicationShutdown { private systemQueueWorker: Bull.Worker; private dbQueueWorker: Bull.Worker; private deliverQueueWorkers: Bull.Worker[]; - private inboxQueueWorker: Bull.Worker; + private inboxQueueWorkers: Bull.Worker[]; private webhookDeliverQueueWorker: Bull.Worker; - private relationshipQueueWorker: Bull.Worker; + private relationshipQueueWorkers: Bull.Worker[]; private objectStorageQueueWorker: Bull.Worker; private endedPollNotificationQueueWorker: Bull.Worker; @@ -234,27 +234,31 @@ export class QueueProcessorService implements OnApplicationShutdown { //#endregion //#region inbox - this.inboxQueueWorker = new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { - ...baseWorkerOptions(this.config.redisForInboxQueue, this.config.bullmqWorkerOptions, QUEUE.INBOX), - autorun: false, - concurrency: this.config.inboxJobConcurrency ?? 16, - limiter: { - max: this.config.inboxJobPerSec ?? 32, - duration: 1000, - }, - settings: { - backoffStrategy: httpRelatedBackoff, - }, + this.inboxQueueWorkers = this.config.redisForInboxQueues + .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) + .map(config => new Bull.Worker(QUEUE.INBOX, (job) => this.inboxProcessorService.process(job), { + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.INBOX), + autorun: false, + concurrency: this.config.inboxJobConcurrency ?? 16, + limiter: { + max: this.config.inboxJobPerSec ?? 32, + duration: 1000, + }, + settings: { + backoffStrategy: httpRelatedBackoff, + }, + })); + + this.inboxQueueWorkers.forEach((worker, index) => { + const inboxLogger = this.logger.createSubLogger(`inbox-${index}`); + + worker + .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) + .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) + .on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, error: renderError(err) })) + .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { error: renderError(err) })) + .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); }); - - const inboxLogger = this.logger.createSubLogger('inbox'); - - this.inboxQueueWorker - .on('active', (job) => inboxLogger.debug(`active ${getJobInfo(job, true)}`)) - .on('completed', (job, result) => inboxLogger.debug(`completed(${result}) ${getJobInfo(job, true)}`)) - .on('failed', (job, err) => inboxLogger.warn(`failed(${err.stack}) ${getJobInfo(job)} activity=${job ? (job.data.activity ? job.data.activity.id : 'none') : '-'}`, { job, error: renderError(err) })) - .on('error', (err: Error) => inboxLogger.error(`error ${err.stack}`, { error: renderError(err) })) - .on('stalled', (jobId) => inboxLogger.warn(`stalled id=${jobId}`)); //#endregion //#region webhook deliver @@ -282,32 +286,36 @@ export class QueueProcessorService implements OnApplicationShutdown { //#endregion //#region relationship - this.relationshipQueueWorker = new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { - switch (job.name) { - case 'follow': return this.relationshipProcessorService.processFollow(job); - case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); - case 'block': return this.relationshipProcessorService.processBlock(job); - case 'unblock': return this.relationshipProcessorService.processUnblock(job); - default: throw new Error(`unrecognized job type ${job.name} for relationship`); - } - }, { - ...baseWorkerOptions(this.config.redisForRelationshipQueue, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP), - autorun: false, - concurrency: this.config.relationshipJobConcurrency ?? 16, - limiter: { - max: this.config.relationshipJobPerSec ?? 64, - duration: 1000, - }, + this.relationshipQueueWorkers = this.config.redisForRelationshipQueues + .filter((_, index) => process.env.QUEUE_WORKER_INDEX == null || index === Number.parseInt(process.env.QUEUE_WORKER_INDEX, 10)) + .map(config => new Bull.Worker(QUEUE.RELATIONSHIP, (job) => { + switch (job.name) { + case 'follow': return this.relationshipProcessorService.processFollow(job); + case 'unfollow': return this.relationshipProcessorService.processUnfollow(job); + case 'block': return this.relationshipProcessorService.processBlock(job); + case 'unblock': return this.relationshipProcessorService.processUnblock(job); + default: throw new Error(`unrecognized job type ${job.name} for relationship`); + } + }, { + ...baseWorkerOptions(config, this.config.bullmqWorkerOptions, QUEUE.RELATIONSHIP), + autorun: false, + concurrency: this.config.relationshipJobConcurrency ?? 16, + limiter: { + max: this.config.relationshipJobPerSec ?? 64, + duration: 1000, + }, + })); + + this.relationshipQueueWorkers.forEach((worker, index) => { + const relationshipLogger = this.logger.createSubLogger(`relationship-${index}`); + + worker + .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) + .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) + .on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, error: renderError(err) })) + .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { error: renderError(err) })) + .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); }); - - const relationshipLogger = this.logger.createSubLogger('relationship'); - - this.relationshipQueueWorker - .on('active', (job) => relationshipLogger.debug(`active id=${job.id}`)) - .on('completed', (job, result) => relationshipLogger.debug(`completed(${result}) id=${job.id}`)) - .on('failed', (job, err) => relationshipLogger.warn(`failed(${err.stack}) id=${job ? job.id : '-'}`, { job, error: renderError(err) })) - .on('error', (err: Error) => relationshipLogger.error(`error ${err.stack}`, { error: renderError(err) })) - .on('stalled', (jobId) => relationshipLogger.warn(`stalled id=${jobId}`)); //#endregion //#region object storage @@ -347,9 +355,9 @@ export class QueueProcessorService implements OnApplicationShutdown { this.systemQueueWorker.run(), this.dbQueueWorker.run(), ...this.deliverQueueWorkers.map(worker => worker.run()), - this.inboxQueueWorker.run(), + this.inboxQueueWorkers.map(worker => worker.run()), this.webhookDeliverQueueWorker.run(), - this.relationshipQueueWorker.run(), + this.relationshipQueueWorkers.map(worker => worker.run()), this.objectStorageQueueWorker.run(), this.endedPollNotificationQueueWorker.run(), ]); @@ -361,9 +369,9 @@ export class QueueProcessorService implements OnApplicationShutdown { this.systemQueueWorker.close(), this.dbQueueWorker.close(), ...this.deliverQueueWorkers.map(worker => worker.close()), - this.inboxQueueWorker.close(), + this.inboxQueueWorkers.map(worker => worker.close()), this.webhookDeliverQueueWorker.close(), - this.relationshipQueueWorker.close(), + this.relationshipQueueWorkers.map(worker => worker.close()), this.objectStorageQueueWorker.close(), this.endedPollNotificationQueueWorker.close(), ]); diff --git a/packages/backend/src/server/web/ClientServerService.ts b/packages/backend/src/server/web/ClientServerService.ts index 7e5fe612f..f6a57d8fe 100644 --- a/packages/backend/src/server/web/ClientServerService.ts +++ b/packages/backend/src/server/web/ClientServerService.ts @@ -245,13 +245,13 @@ export class ClientServerService { queues: [ this.systemQueue, this.endedPollNotificationQueue, - this.inboxQueue, this.dbQueue, - this.relationshipQueue, this.objectStorageQueue, this.webhookDeliverQueue, ].map(q => new BullMQAdapter(q)) - .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), + .concat(this.deliverQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) + .concat(this.inboxQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))) + .concat(this.relationshipQueue.queues.map((q, index) => new BullMQAdapter(q, { prefix: `${index}-` }))), serverAdapter, });