Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 43 additions & 137 deletions apps/sim/background/cleanup-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, inArray, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
chunkedBatchDelete,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'

const logger = createLogger('CleanupLogs')

const BATCH_SIZE = 2000
const MAX_BATCHES_PER_TIER = 10

interface TierResults {
total: number
deleted: number
deleteFailed: number
interface FileDeleteStats {
filesTotal: number
filesDeleted: number
filesDeleteFailed: number
}

function emptyTierResults(): TierResults {
return {
total: 0,
deleted: 0,
deleteFailed: 0,
filesTotal: 0,
filesDeleted: 0,
filesDeleteFailed: 0,
}
}

async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return

const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
results.filesTotal += keys.length
stats.filesTotal += keys.length

await Promise.all(
keys.map(async (key) => {
try {
await StorageService.deleteFile({ key, context: 'execution' })
await deleteFileMetadata(key)
results.filesDeleted++
stats.filesDeleted++
} catch (fileError) {
results.filesDeleteFailed++
stats.filesDeleteFailed++
logger.error(`Failed to delete file ${key}:`, { fileError })
}
})
)
}

async function cleanupTier(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<TierResults> {
const results = emptyTierResults()
if (workspaceIds.length === 0) return results

let batchesProcessed = 0
let hasMore = true

while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({
id: workflowExecutionLogs.id,
files: workflowExecutionLogs.files,
})
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
)
)
.limit(BATCH_SIZE)

results.total += batch.length

if (batch.length === 0) {
hasMore = false
break
}

for (const log of batch) {
await deleteExecutionFiles(log.files, results)
}

const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(workflowExecutionLogs)
.where(inArray(workflowExecutionLogs.id, logIds))
.returning({ id: workflowExecutionLogs.id })

results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label}:`, { deleteError })
}

batchesProcessed++
hasMore = batch.length === BATCH_SIZE

logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
}

return results
}

interface JobLogCleanupResults {
deleted: number
deleteFailed: number
}

async function cleanupJobExecutionLogsTier(
async function cleanupWorkflowExecutionLogs(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<JobLogCleanupResults> {
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
if (workspaceIds.length === 0) return results

let batchesProcessed = 0
let hasMore = true

while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
const batch = await db
.select({ id: jobExecutionLogs.id })
.from(jobExecutionLogs)
.where(
and(
inArray(jobExecutionLogs.workspaceId, workspaceIds),
lt(jobExecutionLogs.startedAt, retentionDate)
): Promise<TableCleanupResult & FileDeleteStats> {
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }

const dbStats = await chunkedBatchDelete({
tableDef: workflowExecutionLogs,
workspaceIds,
tableName: `${label}/workflow_execution_logs`,
selectChunk: (chunkIds, limit) =>
db
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workspaceId, chunkIds),
lt(workflowExecutionLogs.startedAt, retentionDate)
)
)
)
.limit(BATCH_SIZE)

if (batch.length === 0) {
hasMore = false
break
}
.limit(limit),
onBatch: async (rows) => {
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
},
})

const logIds = batch.map((log) => log.id)
try {
const deleted = await db
.delete(jobExecutionLogs)
.where(inArray(jobExecutionLogs.id, logIds))
.returning({ id: jobExecutionLogs.id })

results.deleted += deleted.length
} catch (deleteError) {
results.deleteFailed += logIds.length
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
}

batchesProcessed++
hasMore = batch.length === BATCH_SIZE

logger.info(
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
)
}

return results
return { ...dbStats, ...fileStats }
}

export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
Expand All @@ -190,15 +92,19 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
)

const results = await cleanupTier(workspaceIds, retentionDate, label)
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
logger.info(
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
)

const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
logger.info(
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
)
await batchDeleteByWorkspaceAndTimestamp({
tableDef: jobExecutionLogs,
workspaceIdCol: jobExecutionLogs.workspaceId,
timestampCol: jobExecutionLogs.startedAt,
workspaceIds,
retentionDate,
tableName: `${label}/job_execution_logs`,
})

// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
if (payload.plan === 'free') {
Expand Down
91 changes: 48 additions & 43 deletions apps/sim/background/cleanup-soft-deletes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
DEFAULT_BATCH_SIZE,
DEFAULT_MAX_BATCHES_PER_TABLE,
deleteRowsById,
selectRowsByIdChunks,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
import type { StorageContext } from '@/lib/uploads'
Expand All @@ -44,35 +43,37 @@ async function selectExpiredWorkspaceFiles(
workspaceIds: string[],
retentionDate: Date
): Promise<WorkspaceFileScope> {
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE

const [legacyRows, multiContextRows] = await Promise.all([
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, workspaceIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workspaceFile.id, key: workspaceFile.key })
.from(workspaceFile)
.where(
and(
inArray(workspaceFile.workspaceId, chunkIds),
isNotNull(workspaceFile.deletedAt),
lt(workspaceFile.deletedAt, retentionDate)
)
)
)
.limit(limit),
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, workspaceIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
.limit(chunkLimit)
),
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({
id: workspaceFiles.id,
key: workspaceFiles.key,
context: workspaceFiles.context,
})
.from(workspaceFiles)
.where(
and(
inArray(workspaceFiles.workspaceId, chunkIds),
isNotNull(workspaceFiles.deletedAt),
lt(workspaceFiles.deletedAt, retentionDate)
)
)
)
.limit(limit),
.limit(chunkLimit)
),
])

return {
Expand Down Expand Up @@ -182,29 +183,33 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
// (chats + S3) AND the DB deletes below — selecting twice could return
// different subsets above the LIMIT cap and orphan or prematurely purge data.
const [doomedWorkflows, fileScope] = await Promise.all([
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, workspaceIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: workflow.id })
.from(workflow)
.where(
and(
inArray(workflow.workspaceId, chunkIds),
isNotNull(workflow.archivedAt),
lt(workflow.archivedAt, retentionDate)
)
)
)
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
.limit(chunkLimit)
),
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
])

const doomedWorkflowIds = doomedWorkflows.map((w) => w.id)
let chatCleanup: { execute: () => Promise<void> } | null = null

if (doomedWorkflowIds.length > 0) {
const doomedChats = await db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
db
.select({ id: copilotChats.id })
.from(copilotChats)
.where(inArray(copilotChats.workflowId, chunkIds))
.limit(chunkLimit)
)

const doomedChatIds = doomedChats.map((c) => c.id)
if (doomedChatIds.length > 0) {
Expand Down
Loading
Loading