diff --git a/.server-changes/run-queue-sweeper-stale-entry.md b/.server-changes/run-queue-sweeper-stale-entry.md new file mode 100644 index 00000000000..04fe688399b --- /dev/null +++ b/.server-changes/run-queue-sweeper-stale-entry.md @@ -0,0 +1,9 @@ +--- +area: webapp +type: fix +--- + +Concurrency sweeper now removes the message from the worker queue list +when acking marked runs, eliminating stale `messageKey` tombstones that +produced "Failed to dequeue message from worker queue" errors when +consumed by a later BLPOP. diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index de0df73ad05..74f08471bb9 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -2847,7 +2847,7 @@ export class RunQueue { await this.acknowledgeMessage(run.orgId, run.messageId, { skipDequeueProcessing: true, - removeFromWorkerQueue: false, + removeFromWorkerQueue: true, }); } diff --git a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts index 739a6bb190b..07c5890d046 100644 --- a/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/concurrencySweeper.test.ts @@ -175,4 +175,93 @@ describe("RunQueue Concurrency Sweeper", () => { } } ); + + // When the sweeper acks a run whose messageKey value is still sitting on the worker + // queue list (e.g. fast-path enqueued, never BLPOP'd), it must remove the entry from + // the list as well as deleting the message body. Otherwise the list keeps a stale + // tombstone — the next BLPOP returns the messageKey, GET returns nil, and the dequeue + // path logs "Failed to dequeue message from worker queue". + redisTest( + "should clear the worker queue list when sweeper acks a fast-path-enqueued run", + async ({ redisContainer }) => { + let enableConcurrencySweeper = false; + + const queue = new RunQueue({ + ...testOptions, + logLevel: "debug", + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + workerOptions: { + pollIntervalMs: 100, + immediatePollIntervalMs: 100, + }, + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + concurrencySweeper: { + scanSchedule: "* * * * * *", + scanJitterInMs: 5, + processMarkedSchedule: "* * * * * *", + processMarkedJitterInMs: 5, + callback: async (runIds) => { + if (!enableConcurrencySweeper) { + return []; + } + return [{ id: messageDev.runId, orgId: "o1234" }]; + }, + }, + }); + + try { + await queue.updateEnvConcurrencyLimits(authenticatedEnvDev); + + // Fast-path enqueue: SET messageKey, RPUSH messageKeyValue onto worker queue list, + // SADD runId into currentConcurrency. The message is on the list waiting to be popped. + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + workerQueue: authenticatedEnvDev.id, + enableFastPath: true, + }); + + // Pre-conditions: list has the entry, run is "in-flight" per operational concurrency, + // body exists. Fast-path bumps the operational currentConcurrency (SADD) but not + // currentDequeued — the displayed concurrency is bumped only when a worker BLPOPs. + expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toHaveLength(1); + expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(1); + expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeDefined(); + + // Sweeper now considers the run completed (test callback returns it), so + // processMarkedRun acks with removeFromWorkerQueue: true. + enableConcurrencySweeper = true; + await setTimeout(5_000); + + // Sweeper has run: operational concurrency released, message body deleted, AND + // the messageKey value has been LREM'd from the worker queue list. Without the + // LREM, the list would still contain the messageKey, and the next BLPOP would + // pop the tombstone and emit "Failed to dequeue message from worker queue". + expect(await queue.operationalCurrentConcurrencyOfEnvironment(authenticatedEnvDev)).toBe(0); + expect(await queue.readMessage(messageDev.orgId, messageDev.runId)).toBeUndefined(); + expect(await queue.peekAllOnWorkerQueue(authenticatedEnvDev.id)).toEqual([]); + + // A subsequent blocking dequeue finds nothing — no real message and no tombstone. + const dequeued = await queue.dequeueMessageFromWorkerQueue( + "test_consumer", + authenticatedEnvDev.id, + { blockingPop: true, blockingPopTimeoutSeconds: 2 } + ); + expect(dequeued).toBeUndefined(); + } finally { + await queue.quit(); + } + } + ); });