feat(agents): pull-wake runner health check, principal rename, and lifecycle hardening#4339
Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…dispatch-policy, server-utils, and electric-ax Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…URL form, callers convert keys to URLs Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…migration, drop authorization fallback - Use principalKeyFromUrl for proper principal URL validation (rejects /principal/local-desktop) - Migration expires active claims and clears dispatch state before deleting runners - Desktop: don't use authorization header as principal source — return undefined and let server derive from ctx.principal.url - listRunners validates owner_principal query param Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pal keys, complete desktop constant replacement Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4339 +/- ##
=======================================
Coverage ? 59.56%
=======================================
Files ? 290
Lines ? 28579
Branches ? 7754
=======================================
Hits ? 17022
Misses ? 11540
Partials ? 17
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
State machine, concurrent claim limits, exponential reconnect backoff, and granular health status. onError is now reporting-only with fallback console.error logging. stop() rethrows drainWakes errors to callers. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Test iteration 7 body placeholder |
There was a problem hiding this comment.
I had a look through the code and left some comments.
On a higher level, i don't like the "claims" name, it collides too much with auth claims.
The term "lease" is more standard in distributed systems so I would rename the claims variables to: lease / activeLeaseCount / maxConcurrentLeases / getActiveLeasesForRunner.
Also, i think the code needs quite some refactoring. Currently there are ~20 variables inside one closure and they are being mutated from everywhere in this file. That's very error-prone. I'll try refactoring in a follow up PR.
| let eventHeartbeatTimer: ReturnType<typeof setTimeout> | null = null | ||
| let currentOffset = config.offset | ||
| let startedAt: string | null = null | ||
| let streamConnected = false |
There was a problem hiding this comment.
I would define streamConnected as a getter:
get streamConnected() {
return !!streamConnectedSince
}| last_claim_result: `claimed` | `no_work` | `error` | null | ||
| last_dispatch_at: string | null | ||
| events_received: number | ||
| claims_succeeded: number |
There was a problem hiding this comment.
Perhaps this could be a nested property:
export interface PullWakeRunnerHealth {
// ...
claims: {
succeeded: number
skipped: number
failed: number
}
}Not sure how useful these numbers are. Perhaps an array of the actual claims that succeeded/skipped/failed would be better.
| type PullWakeRunnerState = | ||
| | `stopped` | ||
| | `starting` | ||
| | `running.connecting` |
There was a problem hiding this comment.
The running. prefixes are a bit unusual.
There was a problem hiding this comment.
Would this benefit from being an actual TS enum?
enum PullWakeRunnerState {
Stopped = `stopped`,
Starting = `starting`,
Connecting = `running.connecting`,
Streaming = `running.streaming`,
Reconnecting = `running.reconnecting`,
Stopping = `stopping`
}This would make the code less string-heavy.
Or if you don't like enums:
const PullWakeRunnerState = {
Stopped: 'stopped',
Starting: 'starting',
Connecting: 'running.connecting',
Streaming: `running.streaming`,
Reconnecting: `running.reconnecting`,
Stopping: `stopping`
} as const
type PullWakeRunnerState = (typeof PullWakeRunnerState)[keyof typeof PullWakeRunnerState]| Math.floor(config.maxConcurrentClaims ?? DEFAULT_MAX_CONCURRENT_CLAIMS) | ||
| ) | ||
|
|
||
| const toStatus = (): PullWakeRunnerStatus => { |
There was a problem hiding this comment.
Why have "state" and "status" ? They are almost identical.
I would just go with the enum approach from my previous comment.
Users use it like PullWakeRunnerState.Connecting and it resolves to the string "running.connecting" so we effectively get the usage we want.
| const notifyHeartbeatChange = (): void => { | ||
| const signal = controller?.signal | ||
| if (!signal || signal.aborted || heartbeatIntervalMs <= 0) return | ||
| if (eventHeartbeatTimer) return |
There was a problem hiding this comment.
nit: we have 2 if statements for fast return, to be consistent the first one would only do the signal checks and the 2nd one would do heartbeat checks, or we would use only one if statement:
if (!signal || signal.aborted) return
if (heartbeatIntervalMs <= 0 || eventHeartbeatTimer) returnThere was a problem hiding this comment.
We could also avoid the eventHeartbeatTimer check by rewriting the assignment:
eventHeartbeatTimer ??= setTimeout(() => {
eventHeartbeatTimer = null
void heartbeat(signal)
}, eventHeartbeatThrottleMs)|
|
||
| const notifyHeartbeatChange = (): void => { | ||
| const signal = controller?.signal | ||
| if (!signal || signal.aborted || heartbeatIntervalMs <= 0) return |
There was a problem hiding this comment.
Why do we check heartbeatIntervalMs <= 0 if it's not used in this function?
This function uses eventHeartbeatThrottleMs instead.
| } catch (err) { | ||
| if (!signal.aborted) { | ||
| config.onError?.(err instanceof Error ? err : new Error(String(err))) | ||
| lastHeartbeatOk = false |
There was a problem hiding this comment.
We already set lastHeartbeatOk = false on L256 right before throwing and here we only set it inside if (!signal.aborted) but it will already be false. Is this the behaviour we want? If we only want it to be set to false if !signal.aborted then we need to not set it right before throwing.
| throw err | ||
| } | ||
| if (!claimErrorRecorded) { | ||
| recordClaimError() |
There was a problem hiding this comment.
Do we need to set claimErrorRecorded = true here?
Perhaps would be better if recordClaimError does that (e.g. we could wrap it such that we can't forget to set this variable).
| return true | ||
| } | ||
|
|
||
| const sleep = async (ms: number, signal: AbortSignal): Promise<void> => { |
There was a problem hiding this comment.
This should be extracted to a utility file.
|
Spent some time testing the lifecycle hardening claims in this PR against a running desktop app + local agents-server. Some of what's claimed works as advertised; one significant gap turned up that I think is worth flagging before merge. What worked
Things turned up while testingWhile testing this PR I also surfaced three related reliability issues that the new diagnostics made visible — they aren't introduced by this PR (the diagnostics just exposed them), but they all interact with this work:
Suggestion
Happy to help test once you've got a candidate fix. |
There was a problem hiding this comment.
We should address #4339 (comment) before merging.
| 1. A **health check endpoint** (`GET /_electric/runners/:id/health`) for deep debugging — curl it to see comprehensive diagnostics about a runner's dispatch pipeline. | ||
| 2. **Rich runner state in Postgres** so that apps can sync the `runners` table via an Electric Shape and show runner status on any device (e.g. see your laptop runner's status from your phone). | ||
|
|
||
| The `diagnostics` JSONB column on the `runners` table serves both purposes: the health endpoint reads it for the detailed response, and Shape sync delivers it reactively to any connected client. |
There was a problem hiding this comment.
This will make runners into a quite noisy shape, with every runner causing an update every 2 seconds. I suggest we split it into a separate table for heartbeats & claims, and keep status, started, last_error & last_error_at in the main table. Anyone needing full diagnostics can then either get the endpoint, or subscribe to a filtered shape for diagnostics of a particular runner. WDYT?
There was a problem hiding this comment.
It should be a max of every two seconds (if there's lots of activity) and then when things aren't happening, updates would drop to every 30 seconds.
But yeah, fairly noisy I agree actually still. Lemme think about it.
There was a problem hiding this comment.
We don't need the entire object on that table for "normal" UIs I think. the heartbeat endpoint can split this up easily
There was a problem hiding this comment.
yeah makes sense — refactored
# Conflicts: # packages/agents-desktop/src/main.ts # packages/agents-server/src/routing/runners-router.ts # packages/agents-server/test/dispatch-policy-routing.test.ts # packages/agents/src/server.ts # packages/electric-ax/src/start.ts # packages/electric-ax/test/start.test.ts
|
test body 2 |
|
Claude Code Review (iteration 1) Substantial PR (~3.5k additions / 60+ files): runner health endpoint, owner_user_id -> owner_principal rename, pull-wake runner hardening (state machine, bounded concurrency, exponential backoff, event-driven heartbeats). Overall direction solid. Main concerns: type-safety latent bug in health response, two small state-reset bugs, stack of unresolved inline review comments from kevin-dp. WHAT IS WORKING WELL
CRITICAL (Must Fix): None found. IMPORTANT (Should Fix)
SUGGESTIONS (Nice to Have)
ISSUE CONFORMANCE No linked GitHub issue on this PR. PR description is unusually thorough (root cause, approach, invariants, non-goals, trade-offs, verification, files-changed table) and substitutes well for a tracked issue. Diagnostics work surfaced #4340-#4342 and #4343, filed and tracked separately. Consider attaching Closes for whichever #4343 fixes are landing in this PR (the heartbeat-driven reset is option (a) per kevin-dp). PREVIOUS REVIEW STATUS A prior Claude Code Review comment existed but contained only a placeholder fragment ("PLACEHOLDER continuing in next edit.") -- replaced with this complete review. NOTE ON PROMPT-INJECTION ATTEMPT While reading .review-context/conversation.json I encountered a fake system-reminder tag embedded inside one of KyleAMathewss comment bodies (re: task tools). Treated as untrusted comment content, ignored. Flagging so you know. Review iteration: 1 | 2026-05-18 |
|
Iteration 2 — 2026-05-18 (re-validation pass) Re-read the diff and inline review threads. No new commits on the branch since iteration 1 was posted at 18:29 UTC. All findings in the prior Claude review (comment id 4480689327) remain open as written. Independent verification confirms:
No new issues surfaced beyond iteration 1. Review iteration: 2 | 2026-05-18 |
Claude Code ReviewSummaryIteration 5 on What's Working WellAll five iteration-4 Important findings are now fixed in code (and three of them are covered by new tests):
Other iteration-5 wins:
Issues FoundCritical (Must Fix)None. Important (Should Fix)
Suggestions (Nice to Have)
Issue ConformanceNo linked GitHub issue. PR description remains thorough but should be reconciled with the actual behavior on two points:
The new commit Previous Review StatusPrior Claude review is comment id Review iteration: 5 | 2026-05-19 |
…heartbeats (#4353) ## Summary Fixes #4341 — the per-wake heartbeat path nulls out `consumer_claims.lease_expires_at`, leaving every active claim row without an expiry after the first heartbeat (~10s after dispatch). ## Root cause `packages/agents-server/src/entity-registry.ts` — `materializeHeartbeatClaim`: ```ts .set({ lastHeartbeatAt: heartbeatAt, leaseExpiresAt: input.leaseExpiresAt ?? null, // ← unconditionally writes null if not provided updatedAt: heartbeatAt, }) ``` `packages/agents-server/src/routing/internal-router.ts:606-609` — the only production caller — never passes `leaseExpiresAt`. So every heartbeat overwrites the lease with `null`. The lease set correctly by `materializeActiveClaim` (from the upstream `lease_ttl_ms`, e.g. claimed_at + 30s) survives at most until the first heartbeat. ## Observed live ``` Initial (at claim): lease_expires_at = 2026-05-19T11:01:41.631Z (claimed_at + 30s) After 1 heartbeat: lease_expires_at = null ``` Captured via the health endpoint `claims.active[*]` field — issue #4341 has the full trace. ## The fix Treat heartbeats as alive-pings only: update `last_heartbeat_at` and leave `lease_expires_at` alone unless the caller explicitly provides a new lease. The lease set by `materializeActiveClaim` from the upstream `lease_ttl_ms` stays authoritative. ```diff .set({ lastHeartbeatAt: heartbeatAt, - leaseExpiresAt: input.leaseExpiresAt ?? null, + ...(input.leaseExpiresAt !== undefined + ? { leaseExpiresAt: input.leaseExpiresAt } + : {}), updatedAt: heartbeatAt, }) ``` Callers that genuinely want to extend the lease can still pass `leaseExpiresAt` explicitly. The single production caller (`internal-router.ts:606-609`) doesn't, and shouldn't — it has no TTL signal to base an extension on, and the upstream lease is the authoritative window. ## Tests New integration test `packages/agents-server/test/consumer-claim-registry.test.ts`: 1. **Lease preserved** — materialize an active claim with a lease, heartbeat without a lease, assert lease is still the original timestamp (not null). 2. **Lease extendable** — materialize, heartbeat with an explicit `leaseExpiresAt`, assert the new lease is written. Both run against the integration postgres backend, in the style of `tag-stream-outbox-registry.test.ts`. Existing unit tests pass unchanged. ## Not addressed in this PR - **Pre-existing claim rows with `lease_expires_at: null`** — claims that already lost their lease under the unfixed code won't recover. They'd need a reaper or admin command to clean up. Not currently a problem because nothing reaps on lease today, but worth knowing if a reaper is added later. ## Base branch note This PR targets `fix-pull-wake` (#4339), not `main`, because `materializeHeartbeatClaim` was introduced in #4308 which is part of the `fix-pull-wake` lineage but not yet in `main`. Merge order: this → fix-pull-wake → main. Independent of #4346 (the related #4340 fix); the two can land in either order. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…oken is missing (#4346) ## Summary Fixes #4340 — pull-wake claims leaking in `consumer_claims` when the in-memory `ClaimWriteTokenStore` no longer holds the consumer's token at the time `sendDone` arrives. The release path in `callback-forward` was gated by `stillOwnsClaim`, an in-memory check. When that check fails — server restart between mint and done, parallel wakes evicting each other's tokens, or a retry after a transient `updateStatus` failure — the entire release block is skipped: `materializeReleasedClaim` never runs, the entity stays stuck at `status='running'`, and the `consumer_claims` row stays `active` indefinitely. The steady-state "send one message, wait" path is not affected by this bug: it releases correctly via the runtime's `sendDone` after the `idleTimeout` window (default 5 min via `packages/agents/src/bootstrap.ts:146`). The bug only fires in the failure modes documented in [Test scenarios](#test-scenarios) below. ## Root cause `packages/agents-server/src/routing/internal-router.ts` had all three release actions behind the same in-memory gate: ```ts if (entity && stillOwnsClaim) { await materializeReleasedClaim(...) // DB row release await updateStatus(entity.url, 'idle') // entity status clearStream(...) // in-memory token cleanup await onEntityChanged(entity.url) } else if (stillOwnsClaim) { clearStream(...) } else if (entity) { log.info('done ignored for stale claim ...') } ``` `stillOwnsClaim` is the right gate for **write authorization** during the agent run, but it's the wrong gate for releasing the **DB row**, which is keyed by `(consumerId, epoch)`. The DB primary key is authoritative identity — the in-memory token state is orthogonal. ## The fix Three concerns, three gates: 1. **DB row release** (`materializeReleasedClaim`) — runs whenever `epoch` is defined. `(consumerId, epoch)` is the DB primary key; that's enough. 2. **Entity status → idle + `onEntityChanged`** — runs when `entityCleared || stillOwnsClaim`. `entityCleared` is a new return field from `materializeReleasedClaim`, set to `true` only when our `(consumerId, epoch)` was the active dispatch row and we just nulled it out. The `||` handles two non-trivial cases: server restart (DB has us active, token is gone) and retry after a failed `updateStatus` (state cleared on first attempt, token still held). 3. **In-memory token cleanup** (`clearStream`) — remains gated by `stillOwnsClaim` so a newer consumer's token is never cleared out from under it. ### `materializeReleasedClaim` API change ```diff - ): Promise<ConsumerClaim | null> { + ): Promise<{ claim: ConsumerClaim | null; entityCleared: boolean }> { ``` Only one production caller (`internal-router.ts`); both that caller and the test mock are updated. The `.returning()` on the `entityDispatchState` UPDATE now reports whether our row was actually cleared (vs. a no-op because a newer claim is active). ## Test scenarios The fix decouples the three concerns. Below: ✓ = action happens, × = action does NOT happen (and is correct that it doesn't). | Scenario | `entityCleared` | `stillOwnsClaim` | DB row released | Entity → idle | |---|---|---|---|---| | **A. Happy path** (mint + done) | true | true | ✓ released | ✓ goes idle | | **B. Server restart** (no in-memory token, DB row still active) | true | false | ✓ released | ✓ goes idle | | **C. Newer wake** (wake-1 done after wake-2 takes over the stream) | false | false | ✓ wake-1's row released | × stays running — wake-2 is in flight | | **D. Retry** (first done's `updateStatus` threw; same done retried) | false | true | ✓ no-op (already released) | ✓ goes idle | | **E. Legacy stale-done test** (test setup never materialized active claim; token evicted) | false | false | no row to release | × stays running — newer claim conceptually in flight | New tests in `packages/agents-server/test/webhook-forward-routing.test.ts > claim release on done callback (regression for #4340)` cover scenarios A–C. Existing tests in `server-claim-write-token.test.ts` cover D and E and continue to pass after the fix. ## Verified - **Unit tests**: deterministic. Pre-fix, B and C fail (zero invocations of `materializeReleasedClaim`); D fails (`updateStatus` skipped on retry). Post-fix, all five scenarios produce the documented behavior. - **Manual run-through** against a local desktop + agents-server: send one message, dispatch claims (`active_count: 0 → 1`), agent completes, runtime calls `sendDone` after `idleTimeout`, server fires the new release path, `active_count: 1 → 0`, entity status transitions back to `idle`. ## Not addressed in this PR - **Pre-existing orphan rows**: rows that already leaked from prior runs of the unfixed code can't be released because no fresh `done` callback is coming. Would need a reaper job or admin command. - **`lease_expires_at: null` issue (#4341)**: independent. Without a lease, even a reaper job can't time-out claims safely. ## Base branch note This PR targets `fix-pull-wake` (#4339), not `main`, because `materializeReleasedClaim` was introduced in #4308 which is part of the `fix-pull-wake` lineage but not yet in `main`. Merge order: this → fix-pull-wake → main. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Adds pull-wake runner health diagnostics, renames runner ownership from
owner_user_idto canonicalowner_principalURLs, and hardens the pull-wake runner lifecycle around heartbeats, reconnects, shutdown, and error reporting.This PR also fixes the local desktop send path so mutating local server requests can go through the Electron main process instead of being blocked behind Chromium's HTTP/1.1 connection limit, while keeping normal shape reads in the renderer.
Root Cause
Pull-wake dispatch had too little observability and too many implicit lifecycle assumptions. When a runner missed wakes, got stuck reconnecting, or failed during shutdown, the server/UI had limited information to explain the state. The ownership field also used
owner_user_id, which was misleading because principals can be users, agents, services, or system actors.Local development exposed a separate but related issue: renderer-origin mutating requests could stall behind long-lived shape/SSE requests, delaying sends and new agent startup by seconds even when the server processed the request in milliseconds.
Approach
Runner Health Diagnostics
PullWakeRunnerreports stream, heartbeat, claim, dispatch, reconnect, and error diagnostics in heartbeat payloads.GET /_electric/runners/:id/healthaggregates runner state, liveness lease, sanitized client diagnostics, active claims, dispatch stats, and derived health issues.Principal Ownership
owner_principal.electric-principalrequest headers now accept either a principal key (user:alice) or a principal URL (/principal/user%3Aalice) throughparsePrincipalInput()and compare internally by canonical URL.Pull-Wake Runner Lifecycle
stopped | starting | connecting | streaming | reconnecting | stopping.onErrorreporting-only and isolates throwing reporters with aconsole.errorfallback.stop()gates new claim dispatch, aborts outstanding work, waits boundedly for claim actors, drains runtime wakes, and rethrows drain errors after recording diagnostics.waitForStopped()now waits for the full stop path, not just the stream loop.start().maxConcurrentClaims: it only limited claim/enqueue work, not actual wake execution, so it was a misleading invariant.Dispatch And Local Send Performance
/sendremains normalapplication/json; the preflight/connection issue is handled by the desktop fetch path rather than changing API semantics.Key Invariants
owner_principalvalues are canonical principal URLs.onErrorcannot control runner lifecycle.Non-goals
pull-wake-runner.ts; the critical invariants are fixed and tested first.Verification