diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py index b63b190fe..64b2415e5 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/bridge_client.py @@ -1,12 +1,14 @@ -"""JSON-RPC 2.0 over stdio client for the MemOS bridge. +"""JSON-RPC 2.0 client for the MemOS bridge. -Spawns ``node bridge.cts --agent=hermes`` as a subprocess and communicates -via line-delimited JSON messages on its stdin/stdout. Responses are -matched by ``id``. Notifications (events + logs) are forwarded to -registered callbacks on a reader thread. +Two transport modes: +- **TCP** (preferred): connects to an existing daemon bridge via + ``host:port``. Hermes CLI exits without disrupting the daemon's + session — episodes finalize properly. +- **stdio** (fallback): spawns ``node bridge.cts --agent=hermes`` as a + subprocess and communicates via line-delimited JSON on stdin/stdout. -The client is *blocking* by design — callers wanting async behaviour -should wrap requests in a thread pool. +Responses are matched by ``id``. Notifications (events + logs) are +forwarded to registered callbacks on a reader thread. Thread-safe. """ from __future__ import annotations @@ -15,6 +17,7 @@ import json import logging import os +import socket as _socket import shutil import subprocess import threading @@ -22,6 +25,8 @@ from pathlib import Path from typing import TYPE_CHECKING, Any +from daemon_manager import kill_existing_bridge, register_bridge + if TYPE_CHECKING: from collections.abc import Callable @@ -29,6 +34,9 @@ logger = logging.getLogger(__name__) +DEFAULT_TCP_HOST = "127.0.0.1" +DEFAULT_TCP_PORT = 18911 + class BridgeError(RuntimeError): """Raised when the bridge returns a JSON-RPC error object.""" @@ -40,22 +48,56 @@ def __init__(self, code: str, message: str, data: Any = None) -> None: self.data = data +class _SocketTransport: + """TCP socket wrapper with line-delimited JSON read/write.""" + + def __init__(self, host: str, port: int) -> None: + self._sock = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) + self._sock.settimeout(15.0) + self._sock.connect((host, port)) + self._sock.settimeout(None) + self._rfile = self._sock.makefile("r", buffering=1, encoding="utf-8") + + def write_line(self, text: str) -> None: + payload = text if text.endswith("\n") else text + "\n" + self._sock.sendall(payload.encode("utf-8")) + + def read_line(self) -> str | None: + line = self._rfile.readline() + return line if line else None + + def close(self) -> None: + try: + self._sock.shutdown(_socket.SHUT_RDWR) + except OSError: + pass + try: + self._rfile.close() + except Exception: + pass + self._sock.close() + + class MemosBridgeClient: - """Client wrapping a line-delimited JSON-RPC 2.0 stdio bridge. + """Client wrapping a line-delimited JSON-RPC 2.0 bridge. + + By default attempts TCP connection to an existing daemon bridge at + ``127.0.0.1:18911``. On failure falls back to spawning a stdio + subprocess — transparent to callers. Usage: >>> client = MemosBridgeClient() >>> client.request("core.health", {}) {'ok': True, 'version': '...'} >>> client.close() - - Thread-safe: per-request locking ensures concurrent callers don't - interleave writes. """ def __init__( self, *, + prefer_tcp: bool = True, + tcp_host: str = DEFAULT_TCP_HOST, + tcp_port: int = DEFAULT_TCP_PORT, bridge_path: str | None = None, node_binary: str | None = None, agent: str = "hermes", @@ -67,12 +109,36 @@ def __init__( self._events: list[Callable[[dict[str, Any]], None]] = [] self._logs: list[Callable[[dict[str, Any]], None]] = [] self._closed = False + self._transport: _SocketTransport | None = None + + # ── TCP mode ───────────────────────────────────────────────── + if prefer_tcp: + try: + self._transport = _SocketTransport(tcp_host, tcp_port) + self._reader = threading.Thread( + target=self._read_loop_tcp, + daemon=True, + name="memos-bridge-tcp-reader", + ) + self._reader.start() + logger.info( + "MemosBridgeClient: connected via TCP (%s:%d)", + tcp_host, tcp_port, + ) + return + except (ConnectionRefusedError, OSError) as exc: + logger.info( + "MemosBridgeClient: TCP connect failed (%s), falling back to stdio", + exc, + ) + # ── stdio mode ─────────────────────────────────────────────── node = node_binary or shutil.which("node") or "node" script = bridge_path or str( Path(__file__).resolve().parent.parent.parent.parent / "bridge.cts" ) env = {**os.environ, **(extra_env or {})} + kill_existing_bridge() self._proc = subprocess.Popen( [node, "--experimental-strip-types", script, f"--agent={agent}"], stdin=subprocess.PIPE, @@ -82,8 +148,9 @@ def __init__( bufsize=1, env=env, ) + register_bridge(self._proc) self._reader = threading.Thread( - target=self._read_loop, + target=self._read_loop_stdio, daemon=True, name="memos-bridge-reader", ) @@ -95,7 +162,7 @@ def __init__( ) self._stderr_reader.start() - # ─── Public API ── + # ─── Public API ────────────────────────────────────────────────── def request( self, @@ -116,12 +183,7 @@ def request( {"jsonrpc": "2.0", "id": rpc_id, "method": method, "params": params}, ensure_ascii=False, ) - try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError) as err: - self._pending.pop(rpc_id, None) - raise BridgeError("transport_closed", str(err)) from err + self._write_or_raise(payload + "\n") if not waiter.wait(timeout=timeout): with self._lock: @@ -142,9 +204,8 @@ def notify(self, method: str, params: Any = None) -> None: with self._lock: payload = json.dumps({"jsonrpc": "2.0", "method": method, "params": params}) try: - self._proc.stdin.write(payload + "\n") - self._proc.stdin.flush() - except (BrokenPipeError, OSError): + self._write_text(payload + "\n") + except (BrokenPipeError, OSError, ConnectionError): pass def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None: @@ -157,13 +218,17 @@ def close(self) -> None: if self._closed: return self._closed = True - with contextlib.suppress(Exception): - self._proc.stdin.close() - try: - self._proc.wait(timeout=5.0) - except subprocess.TimeoutExpired: - self._proc.kill() - # unblock any pending waiters + if self._transport is not None: + self._transport.close() + self._transport = None + else: + with contextlib.suppress(Exception): + self._proc.stdin.close() + try: + self._proc.wait(timeout=5.0) + except subprocess.TimeoutExpired: + self._proc.kill() + register_bridge(None) with self._lock: for entry in list(self._pending.values()): entry["error"] = { @@ -174,36 +239,58 @@ def close(self) -> None: entry["event"].set() self._pending.clear() - # ─── Internals ── + # ─── Internals: write helpers ──────────────────────────────────── - def _read_loop(self) -> None: + def _write_or_raise(self, text: str) -> None: + if self._transport is not None: + try: + self._transport.write_line(text) + except (BrokenPipeError, OSError, ConnectionError) as err: + raise BridgeError("transport_closed", str(err)) from err + else: + assert self._proc.stdin is not None + try: + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError) as err: + raise BridgeError("transport_closed", str(err)) from err + + def _write_text(self, text: str) -> None: + try: + if self._transport is not None: + self._transport.write_line(text) + else: + assert self._proc.stdin is not None + self._proc.stdin.write(text) + self._proc.stdin.flush() + except (BrokenPipeError, OSError, ConnectionError): + pass + + # ─── Internals: read loops ─────────────────────────────────────── + + def _read_loop_tcp(self) -> None: + transport = self._transport + if transport is None: + return + while not self._closed: + try: + line = transport.read_line() + except (OSError, ConnectionError): + if not self._closed: + logger.error("bridge_client: TCP read error, reader exiting") + break + if line is None: + if not self._closed: + logger.warning("bridge_client: TCP connection closed by peer") + break + self._dispatch(line) + + def _read_loop_stdio(self) -> None: assert self._proc.stdout is not None for line in self._proc.stdout: - line = line.strip() - if not line: - continue - try: - msg = json.loads(line) - except json.JSONDecodeError: - logger.debug("bridge: malformed line: %r", line[:120]) - continue - if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): - self._resolve(msg) - continue - if msg.get("method") == "events.notify": - for cb in list(self._events): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("event listener threw", exc_info=True) - continue - if msg.get("method") == "logs.forward": - for cb in list(self._logs): - try: - cb(msg.get("params") or {}) - except Exception: - logger.debug("log listener threw", exc_info=True) - continue + if self._closed: + break + self._dispatch(line) def _stderr_loop(self) -> None: assert self._proc.stderr is not None @@ -212,6 +299,35 @@ def _stderr_loop(self) -> None: if line: logger.debug("bridge.stderr: %s", line) + # ─── Common dispatch ───────────────────────────────────────────── + + def _dispatch(self, line: str) -> None: + line = line.strip() + if not line: + return + try: + msg = json.loads(line) + except json.JSONDecodeError: + logger.debug("bridge: malformed line: %r", line[:120]) + return + if "id" in msg and msg["id"] is not None and ("result" in msg or "error" in msg): + self._resolve(msg) + return + if msg.get("method") == "events.notify": + for cb in list(self._events): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("event listener threw", exc_info=True) + return + if msg.get("method") == "logs.forward": + for cb in list(self._logs): + try: + cb(msg.get("params") or {}) + except Exception: + logger.debug("log listener threw", exc_info=True) + return + def _resolve(self, msg: dict[str, Any]) -> None: rpc_id = msg.get("id") if not isinstance(rpc_id, int): diff --git a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py index 62810cc5b..136a0d845 100644 --- a/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py +++ b/apps/memos-local-plugin/adapters/hermes/memos_provider/daemon_manager.py @@ -5,6 +5,8 @@ - Probe Node.js availability so ``MemTensorProvider.is_available`` can answer cheaply at plugin-startup time. - Graceful shutdown helpers invoked from ``MemTensorProvider.shutdown``. +- PID file management to prevent duplicate bridge processes across + Hermes session restarts. This file intentionally has **no runtime dependency** on the client; the provider instantiates its own client. Keeping these concerns split means @@ -17,9 +19,12 @@ from __future__ import annotations import logging +import os import shutil +import signal import subprocess import threading +import time from pathlib import Path @@ -28,6 +33,54 @@ _lock = threading.Lock() _bridge_ok: bool | None = None +_ACTIVE_BRIDGE_PROC: subprocess.Popen | None = None + + +# ─── PID file helpers ──────────────────────────────────────────────────── + + +def _pid_path() -> Path: + """Path to the singleton PID file under the runtime daemon directory. + + Respects ``MEMOS_HOME`` when set (``~/.hermes/memos-plugin`` by + convention), falling back to the plugin source tree only when the env + var is absent for compatibility with development installs. + """ + memos_home = os.environ.get("MEMOS_HOME") + if memos_home: + return Path(memos_home) / "daemon" / "bridge.pid" + return Path(__file__).resolve().parent.parent.parent.parent / "data" / "bridge.pid" + + +def _read_pid() -> int | None: + try: + return int(_pid_path().read_text().strip()) + except (FileNotFoundError, ValueError): + return None + + +def _write_pid(pid: int) -> None: + pid_path = _pid_path() + pid_path.parent.mkdir(parents=True, exist_ok=True) + pid_path.write_text(str(pid)) + + +def _clean_pid() -> None: + _pid_path().unlink(missing_ok=True) + + +def _pid_alive(pid: int) -> bool: + try: + os.kill(pid, 0) + return True + except PermissionError: + # Process exists but is owned by another user — still alive. + return True + except OSError: + return False + + +# ─── Bridge lifecycle ──────────────────────────────────────────────────── def _bridge_script() -> Path: @@ -69,8 +122,109 @@ def ensure_bridge_running(*, probe_only: bool = False) -> bool: return True +def _is_bridge_process(pid: int) -> bool: + """Return True when *pid* looks like a bridge process. + + Checks the process command line for ``bridge.cts`` to avoid killing an + unrelated process that happened to recycle a stale PID. + """ + try: + if os.name == "nt": + import ctypes + + import ctypes.wintypes + + kernel32 = ctypes.windll.kernel32 + handle = kernel32.OpenProcess(0x0400 | 0x0010, False, pid) + if not handle: + return False + try: + exe_path = (ctypes.c_wchar * 260)() + size = ctypes.wintypes.DWORD(260) + if kernel32.K32GetProcessImageFileNameW(handle, exe_path, size): + return "bridge" in str(exe_path.value).lower() + finally: + kernel32.CloseHandle(handle) + return False + # Unix: prefer /proc//cmdline; fall back to ps(1) on macOS / BSD. + try: + cmdline = Path(f"/proc/{pid}/cmdline").read_bytes() + return b"bridge.cts" in cmdline + except FileNotFoundError: + import subprocess + try: + result = subprocess.run( + ["ps", "-p", str(pid), "-o", "command="], + capture_output=True, + text=True, + timeout=5, + ) + return result.returncode == 0 and "bridge.cts" in result.stdout + except Exception: + return False + except Exception: + # If we can't validate, err on the side of safety — skip kill. + return False + + +def kill_existing_bridge() -> None: + """Kill any previously-running bridge process recorded in the PID file. + + Called **before** spawning a new bridge to guarantee at-most-one + instance. Validates that the PID belongs to a bridge process before + sending any signal to avoid killing an unrelated process when the + PID file is stale. + """ + pid = _read_pid() + if pid is not None and _pid_alive(pid): + if not _is_bridge_process(pid): + logger.warning( + "MemOS: PID %d is alive but does not appear to be a bridge " + "process — refusing to kill. Removing stale PID file.", + pid, + ) + else: + logger.info("MemOS: killing stale bridge (pid=%d)", pid) + try: + os.kill(pid, signal.SIGTERM) + for _ in range(25): # wait up to 2.5 s + if not _pid_alive(pid): + break + time.sleep(0.1) + else: + os.kill(pid, signal.SIGKILL) + except (OSError, ProcessLookupError): + pass + _clean_pid() + + +def register_bridge(proc: subprocess.Popen | None) -> None: + """Record the current running bridge process. + + Pass ``None`` (e.g. on close) to clear the registration and PID file. + """ + global _ACTIVE_BRIDGE_PROC + _ACTIVE_BRIDGE_PROC = proc + if proc is not None: + _write_pid(proc.pid) + else: + _clean_pid() + + def shutdown_bridge() -> None: - """Best-effort cleanup; each client owns its own subprocess.""" - global _bridge_ok + """Gracefully shut down the tracked bridge subprocess and clean PID file.""" + global _bridge_ok, _ACTIVE_BRIDGE_PROC with _lock: _bridge_ok = None + if _ACTIVE_BRIDGE_PROC is not None: + try: + _ACTIVE_BRIDGE_PROC.terminate() + _ACTIVE_BRIDGE_PROC.wait(timeout=5.0) + logger.info("MemOS: bridge terminated (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except subprocess.TimeoutExpired: + _ACTIVE_BRIDGE_PROC.kill() + logger.warning("MemOS: bridge killed after timeout (pid=%d)", _ACTIVE_BRIDGE_PROC.pid) + except Exception: + pass + _ACTIVE_BRIDGE_PROC = None + _clean_pid() diff --git a/apps/memos-local-plugin/bridge.cts b/apps/memos-local-plugin/bridge.cts index 62750c3f8..2e8ab4999 100644 --- a/apps/memos-local-plugin/bridge.cts +++ b/apps/memos-local-plugin/bridge.cts @@ -15,6 +15,7 @@ */ // eslint-disable-next-line @typescript-eslint/no-require-imports const path = require("node:path") as typeof import("node:path"); +const pkgVersion: string = (require(path.resolve(__dirname, "package.json")) as { version: string }).version; interface BridgeArgs { daemon: boolean; @@ -44,6 +45,9 @@ async function main(): Promise { const { startStdioServer, waitForShutdown } = (await import( pathToEsmUrl(path.resolve(__dirname, "bridge/stdio.ts")) )) as typeof import("./bridge/stdio.js"); + const { startTcpServer } = (await import( + pathToEsmUrl(path.resolve(__dirname, "bridge/tcp.ts")) + )) as typeof import("./bridge/tcp.js"); const { startHttpServer } = (await import( pathToEsmUrl(path.resolve(__dirname, "server/index.ts")) )) as typeof import("./server/index.js"); @@ -53,12 +57,44 @@ async function main(): Promise { const { core, config, home } = await bootstrapMemoryCoreFull({ agent: args.agent, - pkgVersion: "2.0.0-alpha.1", + pkgVersion, }); await core.init(); - // Default transport: stdio. Daemon + TCP support arrives in V1.1. - const stdio = startStdioServer({ core }); + // In daemon mode stdin is typically /dev/null — starting the stdio + // server would subscribe to events/logs and buffer writes to a pipe + // that nobody drains, wasting memory. Skip it. + const stdio = args.daemon ? null : startStdioServer({ core }); + let tcpServer: Awaited> | null = null; + if (args.tcpPort !== undefined && !args.daemon) { + process.stderr.write( + "bridge: ignoring --tcp because TCP mode requires --daemon\n", + ); + } else if (args.tcpPort !== undefined) { + if (!Number.isFinite(args.tcpPort) || args.tcpPort < 1 || args.tcpPort > 65535) { + process.stderr.write( + `bridge: invalid --tcp port value: ${String(args.tcpPort)} (must be 1–65535)\n`, + ); + } else { + try { + tcpServer = startTcpServer({ + core, + host: "127.0.0.1", + port: args.tcpPort, + }); + await tcpServer.ready; // throws on EADDRINUSE etc. + process.stderr.write(`bridge: tcp → ${tcpServer.url}\n`); + } catch (err) { + process.stderr.write( + `bridge: tcp server failed to start: ${(err as Error).message}\n`, + ); + if (tcpServer) { + await tcpServer.close().catch(() => {}); + tcpServer = null; + } + } + } + } // Boot a viewer too — hermes needs its own HTTP surface for the // Memory Viewer, and it discovers the openclaw hub (if any) so @@ -93,30 +129,55 @@ async function main(): Promise { `bridge: viewer failed to start: ${(err as Error).message}\n`, ); } + let shuttingDown = false; const shutdown = async (sig: string) => { + if (shuttingDown) return; + shuttingDown = true; process.stderr.write(`bridge: received ${sig}, shutting down\n`); + try { + if (tcpServer) await tcpServer.close(); + } catch { + /* best-effort */ + } try { if (viewer) await viewer.close(); } catch { /* best-effort */ } - await waitForShutdown(core, stdio); + if (stdio) { + await waitForShutdown(core, stdio); + } else { + try { + await core.shutdown(); + } catch { + /* swallow */ + } + } process.exit(0); }; process.on("SIGINT", () => void shutdown("SIGINT")); process.on("SIGTERM", () => void shutdown("SIGTERM")); - // Keep the process alive until stdin ends. - await stdio.done; - try { - if (viewer) await viewer.close(); - } catch { - /* best-effort */ + // In daemon mode, keep alive until TCP server stops (stdin is /dev/null). + // In stdio mode, run until the calling process closes stdin. + if (args.daemon && tcpServer) { + await tcpServer.done; + await shutdown("daemon_done"); + } else if (stdio) { + await stdio.done; + try { + if (viewer) await viewer.close(); + } catch { + /* best-effort */ + } + await core.shutdown(); + process.exit(0); + } else { + // Daemon mode without TCP — wait forever (kept alive by event loop). + await new Promise(() => {}); } - await core.shutdown(); - process.exit(0); } async function tryHubRegister(opts: { @@ -127,7 +188,7 @@ async function tryHubRegister(opts: { const body = JSON.stringify({ agent: opts.selfAgent, port: opts.selfPort, - version: "2.0.0-alpha.1", + version: pkgVersion, }); for (let i = 0; i < 6; i++) { try { diff --git a/apps/memos-local-plugin/bridge/tcp.ts b/apps/memos-local-plugin/bridge/tcp.ts new file mode 100644 index 000000000..f2df7d834 --- /dev/null +++ b/apps/memos-local-plugin/bridge/tcp.ts @@ -0,0 +1,214 @@ +/** + * Line-delimited JSON-RPC over TCP. + * + * A TCP server that accepts connections from remote clients (e.g. the + * Hermes Python provider) and dispatches JSON-RPC 2.0 messages through + * the same `Dispatcher` used by the stdio server. + * + * Each connected client gets its own read/write loop. Notifications + * (events + logs) are broadcast to all connected clients. + */ + +import { createServer, type Socket } from "node:net"; +import { + JSONRPC_PARSE_ERROR, + JSONRPC_INVALID_REQUEST, + RPC_METHODS, + rpcCodeForError, + type JsonRpcFailure, + type JsonRpcRequest, + type JsonRpcSuccess, +} from "../agent-contract/jsonrpc.js"; +import type { MemoryCore } from "../agent-contract/memory-core.js"; +import { MemosError } from "../agent-contract/errors.js"; +import { errorCodeOf, makeDispatcher } from "./methods.js"; + +// ─── Types ────────────────────────────────────────────────────────────────── + +export interface TcpServerOptions { + core: MemoryCore; + host: string; + port: number; +} + +export interface TcpServerHandle { + readonly url: string; + /** Resolves once the server is actually listening, rejects on error. */ + ready: Promise; + close: () => Promise; + done: Promise; +} + +// ─── Server ───────────────────────────────────────────────────────────────── + +export function startTcpServer(options: TcpServerOptions): TcpServerHandle { + const { core, host, port } = options; + const dispatch = makeDispatcher(core); + + const clients = new Set(); + let closed = false; + let doneResolve: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + // Subscribe to events + logs and broadcast to all connected clients. + const eventsUnsub = core.subscribeEvents((e) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.EVENTS_NOTIFY, params: e }); + }); + const logsUnsub = core.subscribeLogs((r) => { + broadcast({ jsonrpc: "2.0", method: RPC_METHODS.LOGS_FORWARD, params: r }); + }); + + function broadcast(obj: unknown): void { + const payload = JSON.stringify(obj) + "\n"; + for (const sock of clients) { + try { + sock.write(payload); + } catch { + /* best-effort per client */ + } + } + } + + function errorResponse( + id: JsonRpcRequest["id"] | null, + code: number, + message: string, + data?: unknown, + ): JsonRpcFailure { + return { + jsonrpc: "2.0", + id: id ?? null, + error: { code, message, data: data as any }, + }; + } + + function writeLine(sock: Socket, obj: unknown): void { + try { + sock.write(JSON.stringify(obj) + "\n"); + } catch { + /* ignore */ + } + } + + async function handleLine(sock: Socket, line: string): Promise { + const trimmed = line.trim(); + if (trimmed.length === 0) return; + + let msg: JsonRpcRequest | null = null; + try { + msg = JSON.parse(trimmed) as JsonRpcRequest; + } catch (err) { + writeLine( + sock, + errorResponse(null, JSONRPC_PARSE_ERROR, "invalid JSON", { + text: err instanceof Error ? err.message : String(err), + }), + ); + return; + } + + if (!msg || typeof msg !== "object" || msg.jsonrpc !== "2.0" || typeof msg.method !== "string") { + writeLine(sock, errorResponse(msg?.id ?? null, JSONRPC_INVALID_REQUEST, "not JSON-RPC 2.0")); + return; + } + + try { + const result = await dispatch(msg.method, msg.params); + if (msg.id !== undefined && msg.id !== null) { + const ok: JsonRpcSuccess = { jsonrpc: "2.0", id: msg.id, result }; + writeLine(sock, ok); + } + } catch (err) { + const code = rpcCodeForError(errorCodeOf(err)); + const mErr = + err instanceof MemosError + ? err + : new MemosError("internal", err instanceof Error ? err.message : String(err)); + writeLine(sock, errorResponse(msg.id ?? null, code, mErr.message, mErr.toJSON())); + process.stderr.write(`bridge.tcp.dispatch.err ${msg.method}: ${mErr.message}\n`); + } + } + + // ─── Server ─────────────────────────────────────────────────────────────── + + const server = createServer((sock: Socket) => { + clients.add(sock); + process.stderr.write( + `bridge.tcp: client connected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + + let buffer = ""; + sock.setEncoding("utf8"); + + sock.on("data", (chunk: string) => { + buffer += chunk; + let nl = buffer.indexOf("\n"); + while (nl >= 0) { + const line = buffer.slice(0, nl); + buffer = buffer.slice(nl + 1); + void handleLine(sock, line); + nl = buffer.indexOf("\n"); + } + }); + + sock.on("close", () => { + clients.delete(sock); + process.stderr.write( + `bridge.tcp: client disconnected (${sock.remoteAddress ?? "unknown"}:${sock.remotePort ?? "?"})\n`, + ); + }); + + sock.on("error", (err) => { + process.stderr.write(`bridge.tcp: socket error: ${err.message}\n`); + clients.delete(sock); + if (!sock.destroyed) { + sock.destroy(); + } + }); + }); + + // Wrap listen in a promise so callers can catch EADDRINUSE etc. + let isListening = false; + const listenPromise = new Promise((resolve, reject) => { + server.on("error", (err) => { + if (!isListening) { + reject(err); + return; + } + process.stderr.write(`bridge.tcp: server error: ${err.message}\n`); + }); + server.listen(port, host, () => { + isListening = true; + process.stderr.write(`bridge.tcp: listening on ${host}:${port}\n`); + resolve(); + }); + }); + + return { + get url() { + return `tcp://${host}:${port}`; + }, + ready: listenPromise, + async close() { + if (closed) return; + closed = true; + eventsUnsub(); + logsUnsub(); + for (const sock of clients) { + sock.end(); + sock.destroy(); + } + clients.clear(); + await new Promise((resolve, reject) => { + server.close((err) => { + if (err) reject(err); + else resolve(); + }); + }); + doneResolve(); + }, + done: donePromise, + }; +} diff --git a/apps/memos-local-plugin/install.sh b/apps/memos-local-plugin/install.sh index 8e9aa5d0c..3087d1019 100755 --- a/apps/memos-local-plugin/install.sh +++ b/apps/memos-local-plugin/install.sh @@ -526,26 +526,14 @@ install_hermes() { [[ -n "${python_bin}" && -x "${python_bin}" ]] || die "Cannot locate Python for Hermes." success "Hermes Python: ${python_bin}" - # plugins/memory discovery. - local plugin_dir="" - plugin_dir="$("${python_bin}" -c " -from pathlib import Path -try: - import plugins.memory as pm - print(Path(pm.__file__).parent) -except Exception: - pass -" 2>/dev/null || true)" - if [[ -z "${plugin_dir}" || ! -d "${plugin_dir}" ]]; then - for d in "${HOME}/.hermes/hermes-agent/plugins/memory"; do - [[ -d "${d}" && -f "${d}/__init__.py" ]] && { plugin_dir="${d}"; break; } - done - fi - [[ -n "${plugin_dir}" && -d "${plugin_dir}" ]] || die "plugins/memory not found" - success "Hermes plugins/memory: ${plugin_dir}" - - # Symlink memtensor provider. - local target="${plugin_dir}/memtensor" + # Locate Hermes user plugins directory. + # Use $HERMES_HOME/plugins// (outside git repo) so hermes update + # never deletes the symlink. We create the dir if it doesn't exist. + local hermes_plugins="${HOME}/.hermes/plugins" + mkdir -p "${hermes_plugins}" + + # Symlink memtensor provider into Hermes user plugins. + local target="${hermes_plugins}/memtensor" if [[ -L "${target}" ]]; then rm "${target}" elif [[ -e "${target}" ]]; then rm -rf "${target}" fi