Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/memos-local-plugin/agent-contract/jsonrpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ export const RPC_METHODS = {
SESSION_CLOSE: "session.close",
EPISODE_OPEN: "episode.open",
EPISODE_CLOSE: "episode.close",
EPISODE_DELETE: "episode.delete",
EPISODE_DELETE_BULK: "episode.delete_bulk",

// ── pipeline (per turn) ──
TURN_START: "turn.start",
Expand Down
4 changes: 4 additions & 0 deletions apps/memos-local-plugin/agent-contract/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ export interface MemoryCore {
userMessage?: string;
}): Promise<EpisodeId>;
closeEpisode(episodeId: EpisodeId): Promise<void>;
/** Hard-delete a closed episode by id (idempotent). Rejects if the episode is still open. */
deleteEpisode(id: string): Promise<{ deleted: boolean }>;
/** Bulk delete closed episodes — returns how many rows were actually removed. */
deleteEpisodes(ids: readonly string[]): Promise<{ deleted: number }>;

// ── pipeline (per turn) ──
/** Called *before* the agent acts. Returns the context to inject. */
Expand Down
23 changes: 23 additions & 0 deletions apps/memos-local-plugin/bridge/methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,29 @@ export function makeDispatcher(
await core.closeEpisode(requireString(p, "episodeId", method) as EpisodeId);
return { ok: true };
}
case RPC_METHODS.EPISODE_DELETE: {
const p = asRecord(params, method);
return await core.deleteEpisode(requireString(p, "episodeId", method) as EpisodeId);
}
case RPC_METHODS.EPISODE_DELETE_BULK: {
const p = asRecord(params, method);
const ids = p.ids;
if (!Array.isArray(ids) || ids.length === 0) {
throw new MemosError(
"invalid_argument",
"ids must be a non-empty array",
);
}
for (const id of ids) {
if (typeof id !== "string" || id.trim().length === 0) {
throw new MemosError(
"invalid_argument",
"each element in ids must be a non-empty string",
);
}
}
return await core.deleteEpisodes(ids as EpisodeId[]);
}

// ── turn lifecycle ──
case RPC_METHODS.TURN_START: {
Expand Down
65 changes: 63 additions & 2 deletions apps/memos-local-plugin/core/pipeline/memory-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,25 @@ export function createMemoryCore(
// to `closed` + sets `closeReason: "abandoned"` without touching
// trace_ids_json.
try {
const orphans = handle.repos.episodes.list({ status: "open", limit: 500 });
const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 });
// Batch-fetch sessions to avoid N+1 lookups per episode.
const sessionIds = new Set(openEpisodes.map((ep) => ep.sessionId));
const sessionById = new Map<string, ReturnType<typeof handle.repos.sessions.getById>>();
for (const sid of sessionIds) {
sessionById.set(sid, handle.repos.sessions.getById(sid));
}
// Only treat an open episode as an orphan if its session has been
// explicitly closed (meta.closedAt is set) or no longer exists.
// Otherwise the session might reconnect — leave it alone.
const orphans = openEpisodes.filter((ep) => {
const session = sessionById.get(ep.sessionId);
if (!session) return true;
if (session.meta?.closedAt != null) return true;
return false;
});
if (orphans.length > 0) {
log.info("init.orphan_episodes.close", { count: orphans.length });
const skipped = openEpisodes.length - orphans.length;
log.info("init.orphan_episodes.close", { count: orphans.length, skipped });
const endedAt = Date.now();
for (const ep of orphans) {
try {
Expand Down Expand Up @@ -632,6 +648,49 @@ export function createMemoryCore(
handle.sessionManager.finalizeEpisode(episodeId);
}

function assertEpisodeDeletable(episodeId: EpisodeId): void {
const snap = handle.sessionManager.getEpisode(episodeId);
if (snap?.status === "open") {
throw new MemosError(
"conflict",
`cannot delete open episode: ${episodeId}`,
);
}
if (!snap && handle.repos.episodes.getById(episodeId)?.status === "open") {
throw new MemosError(
"conflict",
`cannot delete open episode: ${episodeId} (open in DB)`,
);
}
}

function deleteClosedEpisode(episodeId: EpisodeId): boolean {
const existing = handle.repos.episodes.getById(episodeId);
assertEpisodeDeletable(episodeId);
const deleted = handle.repos.episodes.deleteById(episodeId);
if (existing && !deleted) {
throw new MemosError(
"internal",
`failed to delete closed episode: ${episodeId}`,
);
}
return deleted;
}

async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> {
ensureLive();
return { deleted: deleteClosedEpisode(episodeId) };
}

async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> {
ensureLive();
let deleted = 0;
for (const id of ids) {
if (deleteClosedEpisode(id)) deleted++;
}
return { deleted };
}

// ─── Pipeline (per turn) ──
async function onTurnStart(
turn: Parameters<MemoryCore["onTurnStart"]>[0],
Expand Down Expand Up @@ -1947,6 +2006,8 @@ export function createMemoryCore(
closeSession,
openEpisode,
closeEpisode,
deleteEpisode,
deleteEpisodes,
onTurnStart,
onTurnEnd,
submitFeedback,
Expand Down
Loading