Skip to content

Commit 65e17de

Browse files
fix(retention-job): add chunking strategy for cleanup (#4305)
* fix(retention-job): add chunking strategy for cleanup * change stats to be perjob not per chunk
1 parent 79ff5d8 commit 65e17de

4 files changed

Lines changed: 276 additions & 245 deletions

File tree

apps/sim/background/cleanup-logs.ts

Lines changed: 43 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
44
import { task } from '@trigger.dev/sdk'
55
import { and, inArray, lt } from 'drizzle-orm'
66
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
7+
import {
8+
batchDeleteByWorkspaceAndTimestamp,
9+
chunkedBatchDelete,
10+
type TableCleanupResult,
11+
} from '@/lib/cleanup/batch-delete'
712
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
813
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
914
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
1015

1116
const logger = createLogger('CleanupLogs')
1217

13-
const BATCH_SIZE = 2000
14-
const MAX_BATCHES_PER_TIER = 10
15-
16-
interface TierResults {
17-
total: number
18-
deleted: number
19-
deleteFailed: number
18+
interface FileDeleteStats {
2019
filesTotal: number
2120
filesDeleted: number
2221
filesDeleteFailed: number
2322
}
2423

25-
function emptyTierResults(): TierResults {
26-
return {
27-
total: 0,
28-
deleted: 0,
29-
deleteFailed: 0,
30-
filesTotal: 0,
31-
filesDeleted: 0,
32-
filesDeleteFailed: 0,
33-
}
34-
}
35-
36-
async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
24+
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
3725
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return
3826

3927
const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
40-
results.filesTotal += keys.length
28+
stats.filesTotal += keys.length
4129

4230
await Promise.all(
4331
keys.map(async (key) => {
4432
try {
4533
await StorageService.deleteFile({ key, context: 'execution' })
4634
await deleteFileMetadata(key)
47-
results.filesDeleted++
35+
stats.filesDeleted++
4836
} catch (fileError) {
49-
results.filesDeleteFailed++
37+
stats.filesDeleteFailed++
5038
logger.error(`Failed to delete file ${key}:`, { fileError })
5139
}
5240
})
5341
)
5442
}
5543

56-
async function cleanupTier(
57-
workspaceIds: string[],
58-
retentionDate: Date,
59-
label: string
60-
): Promise<TierResults> {
61-
const results = emptyTierResults()
62-
if (workspaceIds.length === 0) return results
63-
64-
let batchesProcessed = 0
65-
let hasMore = true
66-
67-
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
68-
const batch = await db
69-
.select({
70-
id: workflowExecutionLogs.id,
71-
files: workflowExecutionLogs.files,
72-
})
73-
.from(workflowExecutionLogs)
74-
.where(
75-
and(
76-
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
77-
lt(workflowExecutionLogs.startedAt, retentionDate)
78-
)
79-
)
80-
.limit(BATCH_SIZE)
81-
82-
results.total += batch.length
83-
84-
if (batch.length === 0) {
85-
hasMore = false
86-
break
87-
}
88-
89-
for (const log of batch) {
90-
await deleteExecutionFiles(log.files, results)
91-
}
92-
93-
const logIds = batch.map((log) => log.id)
94-
try {
95-
const deleted = await db
96-
.delete(workflowExecutionLogs)
97-
.where(inArray(workflowExecutionLogs.id, logIds))
98-
.returning({ id: workflowExecutionLogs.id })
99-
100-
results.deleted += deleted.length
101-
} catch (deleteError) {
102-
results.deleteFailed += logIds.length
103-
logger.error(`Batch delete failed for ${label}:`, { deleteError })
104-
}
105-
106-
batchesProcessed++
107-
hasMore = batch.length === BATCH_SIZE
108-
109-
logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
110-
}
111-
112-
return results
113-
}
114-
115-
interface JobLogCleanupResults {
116-
deleted: number
117-
deleteFailed: number
118-
}
119-
120-
async function cleanupJobExecutionLogsTier(
44+
async function cleanupWorkflowExecutionLogs(
12145
workspaceIds: string[],
12246
retentionDate: Date,
12347
label: string
124-
): Promise<JobLogCleanupResults> {
125-
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
126-
if (workspaceIds.length === 0) return results
127-
128-
let batchesProcessed = 0
129-
let hasMore = true
130-
131-
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
132-
const batch = await db
133-
.select({ id: jobExecutionLogs.id })
134-
.from(jobExecutionLogs)
135-
.where(
136-
and(
137-
inArray(jobExecutionLogs.workspaceId, workspaceIds),
138-
lt(jobExecutionLogs.startedAt, retentionDate)
48+
): Promise<TableCleanupResult & FileDeleteStats> {
49+
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
50+
51+
const dbStats = await chunkedBatchDelete({
52+
tableDef: workflowExecutionLogs,
53+
workspaceIds,
54+
tableName: `${label}/workflow_execution_logs`,
55+
selectChunk: (chunkIds, limit) =>
56+
db
57+
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
58+
.from(workflowExecutionLogs)
59+
.where(
60+
and(
61+
inArray(workflowExecutionLogs.workspaceId, chunkIds),
62+
lt(workflowExecutionLogs.startedAt, retentionDate)
63+
)
13964
)
140-
)
141-
.limit(BATCH_SIZE)
142-
143-
if (batch.length === 0) {
144-
hasMore = false
145-
break
146-
}
65+
.limit(limit),
66+
onBatch: async (rows) => {
67+
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
68+
},
69+
})
14770

148-
const logIds = batch.map((log) => log.id)
149-
try {
150-
const deleted = await db
151-
.delete(jobExecutionLogs)
152-
.where(inArray(jobExecutionLogs.id, logIds))
153-
.returning({ id: jobExecutionLogs.id })
154-
155-
results.deleted += deleted.length
156-
} catch (deleteError) {
157-
results.deleteFailed += logIds.length
158-
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
159-
}
160-
161-
batchesProcessed++
162-
hasMore = batch.length === BATCH_SIZE
163-
164-
logger.info(
165-
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
166-
)
167-
}
168-
169-
return results
71+
return { ...dbStats, ...fileStats }
17072
}
17173

17274
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
@@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
19092
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
19193
)
19294

193-
const results = await cleanupTier(workspaceIds, retentionDate, label)
95+
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
19496
logger.info(
195-
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
97+
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
19698
)
19799

198-
const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
199-
logger.info(
200-
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
201-
)
100+
await batchDeleteByWorkspaceAndTimestamp({
101+
tableDef: jobExecutionLogs,
102+
workspaceIdCol: jobExecutionLogs.workspaceId,
103+
timestampCol: jobExecutionLogs.startedAt,
104+
workspaceIds,
105+
retentionDate,
106+
tableName: `${label}/job_execution_logs`,
107+
})
202108

203109
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
204110
if (payload.plan === 'free') {

apps/sim/background/cleanup-soft-deletes.ts

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
1818
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
1919
import {
2020
batchDeleteByWorkspaceAndTimestamp,
21-
DEFAULT_BATCH_SIZE,
22-
DEFAULT_MAX_BATCHES_PER_TABLE,
2321
deleteRowsById,
22+
selectRowsByIdChunks,
2423
} from '@/lib/cleanup/batch-delete'
2524
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
2625
import type { StorageContext } from '@/lib/uploads'
@@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles(
4443
workspaceIds: string[],
4544
retentionDate: Date
4645
): Promise<WorkspaceFileScope> {
47-
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
48-
4946
const [legacyRows, multiContextRows] = await Promise.all([
50-
db
51-
.select({ id: workspaceFile.id, key: workspaceFile.key })
52-
.from(workspaceFile)
53-
.where(
54-
and(
55-
inArray(workspaceFile.workspaceId, workspaceIds),
56-
isNotNull(workspaceFile.deletedAt),
57-
lt(workspaceFile.deletedAt, retentionDate)
47+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
48+
db
49+
.select({ id: workspaceFile.id, key: workspaceFile.key })
50+
.from(workspaceFile)
51+
.where(
52+
and(
53+
inArray(workspaceFile.workspaceId, chunkIds),
54+
isNotNull(workspaceFile.deletedAt),
55+
lt(workspaceFile.deletedAt, retentionDate)
56+
)
5857
)
59-
)
60-
.limit(limit),
61-
db
62-
.select({
63-
id: workspaceFiles.id,
64-
key: workspaceFiles.key,
65-
context: workspaceFiles.context,
66-
})
67-
.from(workspaceFiles)
68-
.where(
69-
and(
70-
inArray(workspaceFiles.workspaceId, workspaceIds),
71-
isNotNull(workspaceFiles.deletedAt),
72-
lt(workspaceFiles.deletedAt, retentionDate)
58+
.limit(chunkLimit)
59+
),
60+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
61+
db
62+
.select({
63+
id: workspaceFiles.id,
64+
key: workspaceFiles.key,
65+
context: workspaceFiles.context,
66+
})
67+
.from(workspaceFiles)
68+
.where(
69+
and(
70+
inArray(workspaceFiles.workspaceId, chunkIds),
71+
isNotNull(workspaceFiles.deletedAt),
72+
lt(workspaceFiles.deletedAt, retentionDate)
73+
)
7374
)
74-
)
75-
.limit(limit),
75+
.limit(chunkLimit)
76+
),
7677
])
7778

7879
return {
@@ -182,29 +183,33 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
182183
// (chats + S3) AND the DB deletes below — selecting twice could return
183184
// different subsets above the LIMIT cap and orphan or prematurely purge data.
184185
const [doomedWorkflows, fileScope] = await Promise.all([
185-
db
186-
.select({ id: workflow.id })
187-
.from(workflow)
188-
.where(
189-
and(
190-
inArray(workflow.workspaceId, workspaceIds),
191-
isNotNull(workflow.archivedAt),
192-
lt(workflow.archivedAt, retentionDate)
186+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
187+
db
188+
.select({ id: workflow.id })
189+
.from(workflow)
190+
.where(
191+
and(
192+
inArray(workflow.workspaceId, chunkIds),
193+
isNotNull(workflow.archivedAt),
194+
lt(workflow.archivedAt, retentionDate)
195+
)
193196
)
194-
)
195-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
197+
.limit(chunkLimit)
198+
),
196199
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
197200
])
198201

199202
const doomedWorkflowIds = doomedWorkflows.map((w) => w.id)
200203
let chatCleanup: { execute: () => Promise<void> } | null = null
201204

202205
if (doomedWorkflowIds.length > 0) {
203-
const doomedChats = await db
204-
.select({ id: copilotChats.id })
205-
.from(copilotChats)
206-
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
207-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
206+
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
207+
db
208+
.select({ id: copilotChats.id })
209+
.from(copilotChats)
210+
.where(inArray(copilotChats.workflowId, chunkIds))
211+
.limit(chunkLimit)
212+
)
208213

209214
const doomedChatIds = doomedChats.map((c) => c.id)
210215
if (doomedChatIds.length > 0) {

0 commit comments

Comments
 (0)