feat: integrate BatchWorkpool with workflow step execution#200
feat: integrate BatchWorkpool with workflow step execution#200sethconvex wants to merge 13 commits intomainfrom
Conversation
Route step.runAction() through BatchWorkpool when the action is batch-registered. Pass `batch` option to WorkflowManager, which threads it through to StepExecutor. Adds startBatchSteps mutation that creates journal entries without enqueueing to workpool, returning an onComplete handle for the client to use with batch.enqueueByHandle(). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a demo workflow that simulates a complex IO-constrained pipeline (like LLM generation) to compare regular workpool vs batched workpool. The pipeline generates an outline, 200 parallel sections, and a summary, demonstrating how BatchWorkpool processes all 200 sections concurrently inside a single executor action vs requiring separate action slots. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Store outline, sections, and summary progressively in the llmSimulations table as each workflow step completes. Set maxParallelism to 200 for batched workflow so all sections dispatch in a single tick. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
📝 WalkthroughWalkthroughAdds BatchWorkpool support end-to-end: client APIs and wiring, StepExecutor batch-group logic and tests, backend journal/schema and pool handlers for batch results and polling, an example LLM simulation using batched actions, README docs, and package/dev-dep update to include the workpool package. Changes
Sequence DiagramsequenceDiagram
participant Client as Client Code
participant WFM as WorkflowManager
participant SE as StepExecutor
participant BW as BatchWorkpool
participant Journal as Journal/Backend
participant Worker as Batch Worker
Client->>WFM: define/start workflow (with batch)
WFM->>Journal: workflowMutation(..., batch)
Journal->>SE: instantiate StepExecutor(..., batch)
SE->>SE: split steps into batchGroup vs regular
par Batch Path
SE->>Journal: startBatchSteps / startBatchGroupStep
Journal-->>SE: {entries, onCompleteHandle}
SE->>BW: enqueue batch tasks (per-item)
BW->>Worker: execute tasks
Worker->>Journal: onCompleteBatchGroupItem / onCompleteBatchStep
Journal->>SE: poller/_checkBatchCompletion signals completion
and Regular Path
SE->>Journal: startSteps (regular enqueue)
Journal->>WFM: schedule executor actions
WFM->>SE: regular worker run results
end
SE->>Journal: pipelineCompleted(...)
Journal->>Client: workflow completion persisted / mutations applied
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
The devDependency was pointing to a local file path that doesn't exist in CI. Restore to the published 0.3.1 version. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@package.json`:
- Line 69: Replace the local file reference for the dependency
"@convex-dev/workpool" in package.json (the entry currently set to
"file:../workpool") with the corresponding published semver (e.g., "0.3.1" or
the released version that provides the BatchWorkpool API) so CI and external
consumers can resolve the package; update the dependency value in package.json
and ensure package-lock or yarn lock is regenerated if present.
In `@src/client/step.ts`:
- Around line 251-259: The current direct type assertion on the result of
this.ctx.runMutation(this.component.journal.startBatchSteps, ...) is flagged by
static analysis; change the cast to go through unknown first so the Convex
mutation return type is safely coerced — i.e., take the result of
this.ctx.runMutation(...) and cast it as unknown and then to { entries:
JournalEntry[]; onCompleteHandle: string } to populate entries and
onCompleteHandle. Ensure you reference the callsite this.ctx.runMutation and the
component this.component.journal.startBatchSteps when making the change.
- Around line 262-286: Replace the O(n²) for...of loop that calls
entries.indexOf(entry) with an index-based loop so you iterate entries and
messages in parallel; use a standard for (let i = 0; i < entries.length; i++)
pattern and replace occurrences of entries.indexOf(entry) with i, obtain target
via messages[i].target, pass entry as entries[i], and use messages[i].retry when
calling this.batch!.enqueueByHandle; keep the existing logic that resolves
handlerName with
this.batch!.resolveHandlerName(safeFunctionName(target.function)) and the
onComplete context (stepId: entries[i]._id, generationNumber:
this.generationNumber, workpoolOptions: this.workpoolOptions).
In `@src/component/journal.ts`:
- Around line 219-236: startBatchSteps currently accepts retry and
schedulerOptions in its args schema but never persists or forwards them; update
the handler in startBatchSteps to extract retry and schedulerOptions from each
step and include them on the created journal entries (the same place where step
details are written to the journal) so downstream consumers get these values, or
if schedulerOptions should be unsupported for batch steps remove
schedulerOptions from the args validator and also remove retry if it is not
meant to be stored; reference startBatchSteps and the journal-entry creation
logic (where individual steps are turned into journal records) and ensure the
fields are carried through or removed consistently from the v.array step schema.
🧹 Nitpick comments (3)
src/component/journal.ts (1)
213-287: Consider extracting shared setup logic withstartSteps.The first ~15 lines of the handler (validation,
getWorkflow,getDefaultLogger,runResultcheck,maxEntryquery,stepNumberBasecalculation) are nearly identical tostartSteps. Extracting a shared helper would reduce duplication and ensure both paths stay in sync.src/client/step.ts (1)
188-194: Redundant guard onbatchIndices.length > 0.Line 188 checks
if (batchIndices.length > 0)but the enclosing code already verified this on line 180 (early-returning ifbatchIndices.length === 0). The guard is harmless but unnecessary.example/convex/llmSimulation.ts (1)
164-190:args.contextis untyped — consider narrowing or documenting.Line 171 casts
args.contexttoId<"llmSimulations">fromv.any(). This is a common pattern with onComplete callbacks but could silently fail if the caller passes the wrong context. For a demo this is acceptable, but a brief comment noting the expected shape would improve clarity.
| export const startBatchSteps = mutation({ | ||
| args: { | ||
| workflowId: v.string(), | ||
| generationNumber: v.number(), | ||
| steps: v.array( | ||
| v.object({ | ||
| step, | ||
| retry: v.optional(v.union(v.boolean(), vRetryBehavior)), | ||
| schedulerOptions: v.optional( | ||
| v.union( | ||
| v.object({ runAt: v.optional(v.number()) }), | ||
| v.object({ runAfter: v.optional(v.number()) }), | ||
| ), | ||
| ), | ||
| }), | ||
| ), | ||
| workpoolOptions: v.optional(workpoolOptions), | ||
| }, |
There was a problem hiding this comment.
retry and schedulerOptions are accepted but unused in startBatchSteps.
The step args schema includes retry and schedulerOptions fields (lines 226-232), but the handler never reads them — it only creates journal entries and returns. While retry is later used client-side in step.ts when calling batch.enqueueByHandle, schedulerOptions appears to be silently dropped for batch steps entirely.
If schedulerOptions is intentionally unsupported for batch steps, consider either removing it from the args validator or documenting the limitation.
🤖 Prompt for AI Agents
In `@src/component/journal.ts` around lines 219 - 236, startBatchSteps currently
accepts retry and schedulerOptions in its args schema but never persists or
forwards them; update the handler in startBatchSteps to extract retry and
schedulerOptions from each step and include them on the created journal entries
(the same place where step details are written to the journal) so downstream
consumers get these values, or if schedulerOptions should be unsupported for
batch steps remove schedulerOptions from the args validator and also remove
retry if it is not meant to be stored; reference startBatchSteps and the
journal-entry creation logic (where individual steps are turned into journal
records) and ensure the fields are carried through or removed consistently from
the v.array step schema.
The component returns workflowId as string but JournalEntry expects Id<"workflows">. Cast through unknown to satisfy TypeScript. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use direct batch action references instead of internal.* API refs (batch exports aren't included in generated API types). Cast through any for version-mismatch between RegisteredAction and FunctionReference. Use v.any() for result validator to avoid VObject type incompatibility. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The lockfile had a symlink resolution to ../workpool which doesn't exist in CI. Regenerate to resolve from the npm registry instead. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/client/step.ts`:
- Around line 262-287: The loop silently skips when a handler cannot be resolved
which can leave journaled steps stuck; instead, when
this.batch!.resolveHandlerName(...) returns falsy (i.e., handlerName is null)
you should treat it as an error: throw an Error (including identifying info such
as entry._id and safeFunctionName(target.function) or the target) rather than
continue, so the failure surfaces and the step can be retried/failed; keep the
existing enqueueByHandle path for valid handlerName and only use continue for
non-function targets or if isRegistered truly indicates the message should be
ignored.
In `@src/client/workflowMutation.ts`:
- Around line 23-27: The import and usages of BatchWorkpool (in symbols like
BatchWorkpool in src/client/workflowMutation.ts and related files) are invalid
for `@convex-dev/workpool`@0.3.1; either upgrade the dependency to a version that
exports BatchWorkpool or refactor to use the current Workpool API (replace
BatchWorkpool type references with Workpool and change batch operations to call
the existing methods such as Workpool.enqueueActionBatch / Workpool.enqueueBatch
equivalents), updating call sites in workflowMutation.ts, step.ts, index.ts and
example/convex/llmSimulation.ts to match the Workpool method signatures and
return types.
🧹 Nitpick comments (1)
src/client/step.ts (1)
188-194: Redundant guard:batchIndices.length > 0is always true here.Line 180 already returns early when
batchIndices.length === 0, so this condition on line 188 is always true.Proposed simplification
- if (batchIndices.length > 0) { - const batchMessages = batchIndices.map((i) => messages[i]); - const batchEntries = await this._startStepsBatch(batchMessages); - for (let i = 0; i < batchIndices.length; i++) { - allEntries.push({ index: batchIndices[i], entry: batchEntries[i] }); - } + const batchMessages = batchIndices.map((i) => messages[i]); + const batchEntries = await this._startStepsBatch(batchMessages); + for (let i = 0; i < batchIndices.length; i++) { + allEntries.push({ index: batchIndices[i], entry: batchEntries[i] }); }
The published @convex-dev/workpool@0.3.1 doesn't include BatchWorkpool. Use the pkg-pr-new preview from PR #167 which has the batch execution feature needed by this branch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
commit: |
|
@cursor please review this PR |
|
Findings (ordered by severity):
Open questions / assumptions:
|
- Fix step-number ordering when batch and regular steps are interleaved in a Promise.all. Previously batch steps were always processed first, causing step numbers to not match message order. On resume, this led to args-mismatch errors. Now processes contiguous groups in original order so step numbers match the handler's deterministic replay order. - Replace silent `continue` with `throw` in _startStepsBatch when target.kind is not "function" or handler resolution fails. These are assertion failures that would leave steps stuck in-progress forever. - Add BatchWorkpool documentation to README covering setup, action registration, WorkflowManager integration, and usage in workflows. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests verify: - Messages route through regular workpool when no batch configured - Unregistered actions go through regular workpool - Batch-eligible actions route through BatchWorkpool - Interleaved batch/regular steps preserve original message order - Contiguous groups are processed sequentially with correct step numbers - Mutations are never batch-eligible Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
EXPERIMENTAL / NEEDS REVIEW — this change was implemented reactively during benchmarking and needs more thorough design review before merging. ## Problem Batch workflows with N parallel items created N individual step documents. For 1000 items this meant 1000 step doc inserts, 1000 completion patches, and loading 1000+ docs on replay — yielding ~190s total when the actual work takes <1s. | Sections | Before (batched) | After (batched) | Speedup | |----------|------------------|-----------------|---------| | 100 | 38.7s | 37.1s | 1.0x | | 250 | 74.2s | 37.9s | 2.0x | | 1000 | 190.7s | 41.8s | 4.6x | For reference, 1000 sections without any batching takes 529.3s. ## Approach Replace N step docs with 1 "batchGroup" step + N lightweight result docs in a separate `batchResults` table. ### Schema (src/component/schema.ts) - New `batchGroup` variant in the step union (kind, count, common fields) - New `batchResults` table indexed by (batchStepId, index) ### Journal (src/component/journal.ts) - `startBatchGroupStep`: creates 1 step doc, schedules completion poller, returns entry + onComplete handle - `loadBatchResults`: query to load results by batchStepId for replay - Guard in `startSteps` to reject batchGroup kind ### Pool (src/component/pool.ts) - `onCompleteBatchGroupItem`: write-only insert into batchResults (no read-modify-write, no OCC contention) - `_checkBatchCompletion`: updated to handle batchGroup steps by counting batchResults via .take(count) for early termination ### StepExecutor (src/client/step.ts) - `_startStepsBatch()`: calls startBatchGroupStep (1 doc) instead of startBatchSteps (N docs), enqueues N tasks to BatchWorkpool - `run()`: handles batchGroup replay by loading results and resolving N messages from channel in index order - New `_completeBatchItem()` helper ### Channel capacity fix (src/client/workflowMutation.ts) - Increased channel capacity to 10,000 when batch is configured (was limited to maxParallelism=100, forcing 10 re-runs for 1000 items — this was the dominant performance bottleneck) ### Workflow operations (src/component/workflow.ts) - cleanup(): deletes associated batchResults before step doc - cancel(): handles batchGroup kind (skips workpool cancel) - publicStep(): maps batchGroup to kind "function" in public API ## Backward compatibility - Old startBatchSteps/onCompleteBatchStep mutations remain for in-flight workflows - No migration needed — old workflows keep N step docs, new ones use batchGroup ## Known concerns - Channel capacity of 10,000 is a hardcoded magic number - The polling completion checker runs every 200ms — may want adaptive backoff for very large batches - batchResults are not counted toward journal size limit — could grow unbounded for very large batches - Error handling for partial batchGroup failures relies on standard Promise.all rejection behavior - The interaction between batchGroup steps and workflow cancellation needs more edge case testing Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Bug fixes found by black-box testing: 1. getGenerationState miscounted batchGroup entries: used journalEntries.length (entry count) instead of total message count. A batchGroup(100) counted as 1, causing latest=true during replay. Fixed with cached remainingMessageCount (O(1) hot path). 2. Replay crashed with TypeError on incomplete batchResults: accessing results[i] when results.length < count gave "Cannot read properties of undefined". Added count validation before iterating. 3. Extra batchResults silently ignored: no detection of data corruption when results.length > count. Now throws on mismatch. 4. Non-contiguous result indices silently assigned wrong results: indices [0,2,4] sorted by position gave msg1 index-2's result. Added contiguous index validation (results[i].index === i). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Tests exercising real Convex mutations via convex-test: - startBatchGroupStep: creates step doc, correct stepNumber, schedules poller, rejects on completed workflow - onCompleteBatchGroupItem: inserts results, handles all result kinds (success/failed/canceled), ignores invalid IDs, no OCC contention - _checkBatchCompletion: marks complete when all results in, re-polls on incomplete, stops on generationNumber change or workflow completion - cleanup: deletes batchResults alongside step docs, handles mixed regular + batchGroup workflows - cancel mid-batchGroup: bumps generationNumber, late results don't crash, poller stops after cancel - loadBatchResults: sorted by index, empty for non-batch steps Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@src/client/step.test.ts`:
- Around line 987-1001: Tests are accessing msg2.resolve.mock but resolve is not
typed as a Jest mock; cast msg2.resolve to the mock type before inspecting .mock
(e.g., use (msg2.resolve as jest.Mock) when checking .mock.calls length and when
reading the first call value) in the test around executor.run()/msg2 handling so
the TypeScript type error is resolved; update both spots where .mock is accessed
to use the cast and keep existing checks (calls length and resolvedValue
extraction) intact.
- Around line 945-957: The test accesses .mock on msg1.resolve but StepRequest
types it as a plain function, causing a type error; update the test around
executor.run() to either guard with jest.isMockFunction(msg1.resolve) before
inspecting .mock.calls or cast msg1.resolve to a Jest mock (e.g., msg1.resolve
as jest.Mock) when checking .mock.calls; modify the conditional that currently
reads "if (msg1.resolve.mock.calls.length > 0)" to use the guard or cast so
TypeScript knows .mock exists, leaving the rest of the assertions
(expect(...).not.toHaveBeenCalledWith("two")) unchanged and keeping references
to msg1.resolve and executor.run.
In `@src/client/step.ts`:
- Line 323: The code is bypassing TypeScript safety by reading maxWorkers from
(this.batch as any).options; instead, add an explicit typed parameter (e.g.,
maxWorkers: number | undefined) to the StepExecutor constructor (and
corresponding factory/instantiation site) and use that value inside StepExecutor
instead of accessing (this.batch as any).options?.maxWorkers; alternatively, if
you can import the BatchWorkpool type from `@convex-dev/workpool`, narrow
this.batch to the correct BatchWorkpool type and read options.maxWorkers with
proper typing—update the constructor/signature where StepExecutor is created and
remove the as any access in src/client/step.ts.
In `@src/component/pool.ts`:
- Around line 220-247: The onCompleteBatchStep handler must guard against stale
callbacks and surface failures: update onCompleteBatchStep to accept a
generationNumber arg and fetch the workflow doc (via the step's workflowId or
ctx.db as used by onCompleteHandler) to compare generationNumber against
workflow.generation and return early if stale; when ctx.db.normalizeId("steps",
stepId) yields no id, when ctx.db.get(normalizedStepId) returns no entry, or
when the step is already not inProgress, log a warning via the same logger used
by onCompleteHandler and record the failure to the onCompleteFailures store
(same pattern/symbols used in onCompleteHandler), and only proceed to
ctx.db.patch to clear inProgress/completedAt/runResult when the generation
matches and the entry is valid.
- Around line 270-348: The _checkBatchCompletion poller currently retries
indefinitely; add a max-poll safeguard using the existing _pollCount and
_firstPollAt values in the internalMutation handler (function
_checkBatchCompletion) so it stops after a configured limit (e.g.,
MAX_POLL_COUNT = 300 for ~10 minutes at 200ms) or when elapsed time exceeds a
timeout (e.g., MAX_POLL_MS = 10*60*1000), log an error via getDefaultLogger when
the guard trips, mark the workflow/run as failed (set workflow.step/runResult
appropriately) or clear inProgress and set a failure runResult, and stop
scheduling further runAfter calls (i.e., do not call
internal.pool._checkBatchCompletion again); ensure checks occur before
re-enqueueing via enqueueWorkflow and preserve the existing
pollCount/_firstPollAt updates when rescheduling.
- Around line 249-268: onCompleteBatchGroupItem can insert duplicate
batchResults for the same (batchStepId, index); add a uniqueness guard before
inserting to prevent duplicates and the resulting completion/race and replay
validation failures. Specifically, in the handler for onCompleteBatchGroupItem
(use batchStepId/index and normalizedId already present) check the batchResults
table for an existing document with that compound key (e.g., ctx.db.findOne or a
query by batchStepId and index) and only call ctx.db.insert("batchResults", ...)
if none exists, or alternatively perform an upsert/atomic insert that enforces
uniqueness; reference the insert call in onCompleteBatchGroupItem and ensure
this change prevents duplicates that break _checkBatchCompletion and
loadBatchResults in step.ts.
In `@src/component/workflow.ts`:
- Around line 155-163: The mapping currently forces a function variant with
workId: undefined! which breaks the WorkflowStep contract; update the
WorkflowStep discriminated union to include a dedicated "batchGroup" variant
(e.g., { kind: "batchGroup"; /* fields as needed */ }) and change the mapper
that checks step.step.kind === "batchGroup" to return that "batchGroup" variant
instead of casting to { kind: "function", workId: undefined! }; also update any
switch/case or consumers that pattern-match on WorkflowStep (e.g., where
WorkflowStep is handled) to handle the new "batchGroup" discriminator so workId
remains required on the "function" variant and no non-null assertions are
needed.
🧹 Nitpick comments (6)
README.md (1)
370-410: Documentation examples useas anycasts that may confuse users.Lines 372, 405, and 408 use
as anycasts in the code examples. While this may be necessary due to current type limitations, it undermines the "just works" developer experience the docs are aiming for. Consider either:
- Improving the types so casts aren't needed, or
- Adding a brief comment in the example explaining why the cast is required.
src/component/journal.ts (3)
62-77: Performance logging usesglobalThis.consoleinstead of the structured logger.Line 74 bypasses the configured log level by writing directly to
globalThis.console.info. If this is intentional for always-on performance telemetry, a brief comment clarifying the choice would help. Otherwise, consider using the structured logger for consistency.
358-372: All batchGroup steps share the hardcoded name"batchGroup".Line 364 sets
name: "batchGroup"for every batch group step. If a workflow has multiplePromise.allblocks producing separate batch groups, they'll all appear identically in logs and the step list. Consider accepting an optional name from the caller or deriving one from the batch actions.
231-320: RemovestartBatchSteps— it's dead code no longer called from the client.The client-side
_startStepsBatchcallsstartBatchGroupStepinstead (line 309 ofsrc/client/step.ts).startBatchStepsonly appears in test mocks and the generated component file, with no production calls. TheretryandschedulerOptionsparameters defined in its args schema are never used in the handler, further confirming this code path is vestigial.src/client/step.ts (1)
354-371: Direct access tothis.batch!.component.batch.enqueueBatchcouples to internal API.Line 367 reaches into
BatchWorkpool's internal component reference to callenqueueBatchdirectly. This is another coupling to the library's internal structure. Ideally,BatchWorkpoolwould expose a publicenqueueBatchmethod, similar to howenqueueByHandleis used on line 355.src/component/pool.ts (1)
323-335: Consider exponential backoff for the polling interval.A fixed 200 ms interval generates significant read load for large batches (each poll reads up to
countdocuments frombatchResultsplus theinProgressindex). For a 200-item batch taking ~30 s, that's ~150 polls. Starting at 200 ms and backing off (e.g., doubling up to a cap of ~2 s) would reduce load substantially while only adding minimal latency to completion detection.
| try { | ||
| const result = await executor.run(); | ||
| // If it doesn't throw, at least verify results are correct | ||
| // msg0 -> index 0 result | ||
| expect(msg0.resolve).toHaveBeenCalledWith("zero"); | ||
| // msg1 should NOT get index 2's result | ||
| if (msg1.resolve.mock.calls.length > 0) { | ||
| // If it resolved, it should not be "two" (that's for index 2) | ||
| expect(msg1.resolve).not.toHaveBeenCalledWith("two"); | ||
| } | ||
| } catch { | ||
| // Throwing is acceptable — missing results is an error condition | ||
| } |
There was a problem hiding this comment.
Pipeline failure: .mock property access on non-mock typed resolve.
msg1.resolve is typed as (result: unknown) => void per the StepRequest type, so accessing .mock.calls is a type error — this is what's failing CI. Cast through the mock type:
🐛 Proposed fix
try {
const result = await executor.run();
// If it doesn't throw, at least verify results are correct
// msg0 -> index 0 result
expect(msg0.resolve).toHaveBeenCalledWith("zero");
// msg1 should NOT get index 2's result
- if (msg1.resolve.mock.calls.length > 0) {
+ if ((msg1.resolve as ReturnType<typeof vi.fn>).mock.calls.length > 0) {
// If it resolved, it should not be "two" (that's for index 2)
expect(msg1.resolve).not.toHaveBeenCalledWith("two");
}
} catch {
// Throwing is acceptable — missing results is an error condition
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| const result = await executor.run(); | |
| // If it doesn't throw, at least verify results are correct | |
| // msg0 -> index 0 result | |
| expect(msg0.resolve).toHaveBeenCalledWith("zero"); | |
| // msg1 should NOT get index 2's result | |
| if (msg1.resolve.mock.calls.length > 0) { | |
| // If it resolved, it should not be "two" (that's for index 2) | |
| expect(msg1.resolve).not.toHaveBeenCalledWith("two"); | |
| } | |
| } catch { | |
| // Throwing is acceptable — missing results is an error condition | |
| } | |
| try { | |
| const result = await executor.run(); | |
| // If it doesn't throw, at least verify results are correct | |
| // msg0 -> index 0 result | |
| expect(msg0.resolve).toHaveBeenCalledWith("zero"); | |
| // msg1 should NOT get index 2's result | |
| if ((msg1.resolve as ReturnType<typeof vi.fn>).mock.calls.length > 0) { | |
| // If it resolved, it should not be "two" (that's for index 2) | |
| expect(msg1.resolve).not.toHaveBeenCalledWith("two"); | |
| } | |
| } catch { | |
| // Throwing is acceptable — missing results is an error condition | |
| } |
🧰 Tools
🪛 GitHub Actions: Test and lint
[error] 951-951: TS2339: Property 'mock' does not exist on type '(result: unknown) => void'.
🪛 GitHub Check: Test and lint
[failure] 951-951:
Property 'mock' does not exist on type '(result: unknown) => void'.
🤖 Prompt for AI Agents
In `@src/client/step.test.ts` around lines 945 - 957, The test accesses .mock on
msg1.resolve but StepRequest types it as a plain function, causing a type error;
update the test around executor.run() to either guard with
jest.isMockFunction(msg1.resolve) before inspecting .mock.calls or cast
msg1.resolve to a Jest mock (e.g., msg1.resolve as jest.Mock) when checking
.mock.calls; modify the conditional that currently reads "if
(msg1.resolve.mock.calls.length > 0)" to use the guard or cast so TypeScript
knows .mock exists, leaving the rest of the assertions
(expect(...).not.toHaveBeenCalledWith("two")) unchanged and keeping references
to msg1.resolve and executor.run.
| try { | ||
| await executor.run(); | ||
| // msg2 should have gotten index 2's result, but there is no index 2. | ||
| // If it resolved at all, something is wrong. | ||
| if (msg2.resolve.mock.calls.length > 0) { | ||
| // What did it resolve with? If "one" then index matching is broken. | ||
| const resolvedValue = msg2.resolve.mock.calls[0][0]; | ||
| // There IS no result for index 2 — any resolution is suspect. | ||
| // This test documents the behavior even if it doesn't crash. | ||
| expect(resolvedValue).toBeDefined(); | ||
| } | ||
| } catch { | ||
| // Acceptable | ||
| } | ||
| }); |
There was a problem hiding this comment.
Same pipeline failure: .mock access needs a type cast.
Same issue as above — lines 991 and 993 access .mock on the non-mock-typed resolve.
🐛 Proposed fix
try {
await executor.run();
// msg2 should have gotten index 2's result, but there is no index 2.
// If it resolved at all, something is wrong.
- if (msg2.resolve.mock.calls.length > 0) {
+ if ((msg2.resolve as ReturnType<typeof vi.fn>).mock.calls.length > 0) {
// What did it resolve with? If "one" then index matching is broken.
- const resolvedValue = msg2.resolve.mock.calls[0][0];
+ const resolvedValue = (msg2.resolve as ReturnType<typeof vi.fn>).mock.calls[0][0];
// There IS no result for index 2 — any resolution is suspect.
// This test documents the behavior even if it doesn't crash.
expect(resolvedValue).toBeDefined();
}
} catch {
// Acceptable
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try { | |
| await executor.run(); | |
| // msg2 should have gotten index 2's result, but there is no index 2. | |
| // If it resolved at all, something is wrong. | |
| if (msg2.resolve.mock.calls.length > 0) { | |
| // What did it resolve with? If "one" then index matching is broken. | |
| const resolvedValue = msg2.resolve.mock.calls[0][0]; | |
| // There IS no result for index 2 — any resolution is suspect. | |
| // This test documents the behavior even if it doesn't crash. | |
| expect(resolvedValue).toBeDefined(); | |
| } | |
| } catch { | |
| // Acceptable | |
| } | |
| }); | |
| try { | |
| await executor.run(); | |
| // msg2 should have gotten index 2's result, but there is no index 2. | |
| // If it resolved at all, something is wrong. | |
| if ((msg2.resolve as ReturnType<typeof vi.fn>).mock.calls.length > 0) { | |
| // What did it resolve with? If "one" then index matching is broken. | |
| const resolvedValue = (msg2.resolve as ReturnType<typeof vi.fn>).mock.calls[0][0]; | |
| // There IS no result for index 2 — any resolution is suspect. | |
| // This test documents the behavior even if it doesn't crash. | |
| expect(resolvedValue).toBeDefined(); | |
| } | |
| } catch { | |
| // Acceptable | |
| } |
🧰 Tools
🪛 GitHub Check: Test and lint
[failure] 993-993:
Property 'mock' does not exist on type '(result: unknown) => void'.
[failure] 991-991:
Property 'mock' does not exist on type '(result: unknown) => void'.
🤖 Prompt for AI Agents
In `@src/client/step.test.ts` around lines 987 - 1001, Tests are accessing
msg2.resolve.mock but resolve is not typed as a Jest mock; cast msg2.resolve to
the mock type before inspecting .mock (e.g., use (msg2.resolve as jest.Mock)
when checking .mock.calls length and when reading the first call value) in the
test around executor.run()/msg2 handling so the TypeScript type error is
resolved; update both spots where .mock is accessed to use the cast and keep
existing checks (calls length and resolvedValue extraction) intact.
| this._checkJournalSize([entry]); | ||
|
|
||
| // Build all batch tasks upfront. | ||
| const maxWorkers = (this.batch as any).options?.maxWorkers ?? 10; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find where BatchWorkpool is defined
fd -e ts -e tsx | xargs grep -l "class BatchWorkpool\|export.*BatchWorkpool" | head -20Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Search for maxWorkers usage and definition
rg "maxWorkers" -t ts --type ts -B 2 -A 2Repository: get-convex/workflow
Length of output: 1155
🏁 Script executed:
# Check the specific line 323 in step.ts and surrounding context
sed -n '320,330p' src/client/step.tsRepository: get-convex/workflow
Length of output: 464
🏁 Script executed:
# Look at where 'batch' is assigned in step.ts to understand its type
rg "this\.batch\s*=" src/client/step.ts -B 2 -A 2Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check BatchWorkpool's interface/class definition for public properties
rg -A 20 "class BatchWorkpool|interface BatchWorkpool" -t tsRepository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Search for BatchWorkpool in all files
fd -type f -e ts -e tsx | xargs grep -l "BatchWorkpool" | head -10Repository: get-convex/workflow
Length of output: 233
🏁 Script executed:
# Look at the actual BatchWorkpool source
fd BatchWorkpool.ts -o batch*.ts | head -5Repository: get-convex/workflow
Length of output: 200
🏁 Script executed:
# Search in convex directory or similar for BatchWorkpool
rg "class BatchWorkpool" -B 5 -A 30Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check if BatchWorkpool is imported and from where
rg "import.*BatchWorkpool" src/client/step.tsRepository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check the test mock to understand the expected interface
sed -n '1,50p' src/client/step.test.ts | grep -A 30 "BatchWorkpool"Repository: get-convex/workflow
Length of output: 961
🏁 Script executed:
# Look for the batch mock setup in the test file to understand the expected interface
sed -n '1,100p' src/client/step.test.ts | tail -60Repository: get-convex/workflow
Length of output: 1413
🏁 Script executed:
# Find where batch is created/assigned in step.ts
rg "batch\s*:" src/client/step.ts -B 3 -A 3Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Look for the constructor or batch assignment in step.ts
sed -n '60,95p' src/client/step.tsRepository: get-convex/workflow
Length of output: 1438
🏁 Script executed:
# Check if there are any other usages of 'batch' in step.ts that might indicate its type
rg "this\.batch\b" src/client/step.ts | head -20Repository: get-convex/workflow
Length of output: 397
🏁 Script executed:
# Search for BatchWorkpool type definition or interface in node_modules or source
find . -path ./node_modules -prune -o -name "*.ts" -type f -exec grep -l "interface BatchWorkpool\|type BatchWorkpool" {} \; 2>/dev/nullRepository: get-convex/workflow
Length of output: 95
🏁 Script executed:
# Check where the batch is instantiated or created to understand its interface
rg "new BatchWorkpool\|BatchWorkpool\(" -B 3 -A 5Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check the workpoolOptions to see if maxWorkers is passed through there
rg "workpoolOptions\|WorkpoolOptions" src/client/step.ts -B 2 -A 2Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Look at the full context of batch initialization
sed -n '1,30p' src/client/step.tsRepository: get-convex/workflow
Length of output: 901
🏁 Script executed:
# Check if maxWorkers or similar is available in the workpoolOptions parameter
rg "WorkpoolOptions" -A 10Repository: get-convex/workflow
Length of output: 6042
🏁 Script executed:
# Check how workpoolOptions is used or intended to be used
rg "workpoolOptions\?" src/client/step.ts -B 2 -A 5Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Look for any default workpoolOptions or configuration
rg "maxWorkers" src/client/ -B 3 -A 3Repository: get-convex/workflow
Length of output: 1242
🏁 Script executed:
# Check the test setup to see if maxWorkers is being passed via workpoolOptions or batch.options
sed -n '1900,2000p' src/client/step.test.ts | grep -A 30 "maxWorkers"Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Look at how the batch is created in tests
rg "batch.*=" src/client/step.test.ts | head -20Repository: get-convex/workflow
Length of output: 1348
🏁 Script executed:
# Check the WorkpoolOptions type definition to see if maxWorkers should be part of it
rg "type WorkpoolOptions\|interface WorkpoolOptions" -A 20Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Look at what properties are in workpoolOptions in the pool.ts file
sed -n '1,100p' src/component/pool.ts | grep -A 20 "workpoolOptions\|maxWorkers"Repository: get-convex/workflow
Length of output: 1491
🏁 Script executed:
# Check if maxWorkers is even part of the official WorkpoolOptions type
rg "maxParallelism\|maxWorkers" src/ -B 2 -A 2 | head -40Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# Check if there's a way maxWorkers could be passed through workpoolOptions or elsewhere
sed -n '60,100p' src/client/step.tsRepository: get-convex/workflow
Length of output: 1687
🏁 Script executed:
# Check how the StepExecutor is instantiated to see if maxWorkers could be passed separately
rg "new StepExecutor\|new step\.StepExecutor" -B 3 -A 5Repository: get-convex/workflow
Length of output: 45
🏁 Script executed:
# See if the batch.options is documented or if there's a type definition
rg "\.options\?" src/client/ | grep -v testRepository: get-convex/workflow
Length of output: 418
🏁 Script executed:
# Check if there's any documentation about BatchWorkpool.options
rg "BatchWorkpool" src/client/index.ts -B 5 -A 10 | head -40Repository: get-convex/workflow
Length of output: 676
🏁 Script executed:
# Check the package.json to see the version of `@convex-dev/workpool`
cat package.json | grep -A 2 -B 2 "workpool"Repository: get-convex/workflow
Length of output: 378
🏁 Script executed:
# Final check: look at the complete context where batch is used in step.ts to understand the design intent
sed -n '305,350p' src/client/step.tsRepository: get-convex/workflow
Length of output: 1632
🏁 Script executed:
# Check if maxWorkers is documented anywhere or if there's an alternative API
rg "maxWorkers" -B 5 -A 5 | grep -v "test" | tail -30Repository: get-convex/workflow
Length of output: 1452
🏁 Script executed:
# Verify the complete picture: is maxWorkers the only undocumented property being accessed?
rg "\(this\.batch as any\)" src/client/step.ts -B 2 -A 2Repository: get-convex/workflow
Length of output: 257
Accessing maxWorkers via as any bypasses type safety.
While maxWorkers is a documented public configuration option on BatchWorkpool (seen in the README and examples), accessing it via (this.batch as any).options?.maxWorkers circumvents the type system. This is fragile—if the underlying library changes its structure, the error will surface at runtime rather than compile time.
Consider extracting maxWorkers as an explicit parameter passed to StepExecutor, or if the @convex-dev/workpool package allows, requesting that it expose this property through proper TypeScript types to enable type-safe access.
🤖 Prompt for AI Agents
In `@src/client/step.ts` at line 323, The code is bypassing TypeScript safety by
reading maxWorkers from (this.batch as any).options; instead, add an explicit
typed parameter (e.g., maxWorkers: number | undefined) to the StepExecutor
constructor (and corresponding factory/instantiation site) and use that value
inside StepExecutor instead of accessing (this.batch as
any).options?.maxWorkers; alternatively, if you can import the BatchWorkpool
type from `@convex-dev/workpool`, narrow this.batch to the correct BatchWorkpool
type and read options.maxWorkers with proper typing—update the
constructor/signature where StepExecutor is created and remove the as any access
in src/client/step.ts.
| // Lightweight onComplete for batch steps. Only patches the step doc with the | ||
| // result — no shared counter, no index check, no workflow re-enqueue. | ||
| // Re-enqueue is handled by _checkBatchCompletion (polling). | ||
| export const onCompleteBatchStep = internalMutation({ | ||
| args: { | ||
| workId: vWorkIdValidator, | ||
| result: vResultValidator, | ||
| context: v.any(), | ||
| }, | ||
| returns: v.null(), | ||
| handler: async (ctx, args) => { | ||
| const { stepId } = args.context; | ||
| const normalizedStepId = ctx.db.normalizeId("steps", stepId); | ||
| if (!normalizedStepId) return; | ||
|
|
||
| // Patch step result directly — read step to spread, then patch. | ||
| const entry = await ctx.db.get(normalizedStepId); | ||
| if (!entry) return; | ||
| await ctx.db.patch(normalizedStepId, { | ||
| step: { | ||
| ...entry.step, | ||
| inProgress: false, | ||
| completedAt: Date.now(), | ||
| runResult: args.result, | ||
| }, | ||
| }); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Missing generation-number guard and error observability.
The regular onCompleteHandler (lines 129–136) validates generationNumber to discard stale callbacks from a previous workflow generation. This handler skips that check entirely — a late-arriving callback from a cancelled/retried generation could overwrite a step that has been reset for the new run.
Additionally, invalid/missing stepId or a missing entry is silently swallowed with no logging or write to onCompleteFailures, unlike the established pattern in onCompleteHandler (lines 104–110). This will make batch-step issues very hard to diagnose in production.
At minimum, consider:
- Passing and checking
generationNumberagainst the workflow doc. - Logging a warning when the entry is missing or already completed.
Suggested hardening
handler: async (ctx, args) => {
- const { stepId } = args.context;
+ const console = await getDefaultLogger(ctx);
+ const { stepId, generationNumber } = args.context;
const normalizedStepId = ctx.db.normalizeId("steps", stepId);
- if (!normalizedStepId) return;
+ if (!normalizedStepId) {
+ console.error("onCompleteBatchStep: invalid stepId", stepId);
+ return;
+ }
- // Patch step result directly — read step to spread, then patch.
const entry = await ctx.db.get(normalizedStepId);
- if (!entry) return;
+ if (!entry) {
+ console.error("onCompleteBatchStep: step not found", stepId);
+ return;
+ }
+ const workflow = await ctx.db.get(entry.workflowId);
+ if (workflow && generationNumber !== undefined &&
+ workflow.generationNumber !== generationNumber) {
+ console.error("onCompleteBatchStep: stale generation", {
+ expected: workflow.generationNumber,
+ got: generationNumber,
+ });
+ return;
+ }
await ctx.db.patch(normalizedStepId, {🤖 Prompt for AI Agents
In `@src/component/pool.ts` around lines 220 - 247, The onCompleteBatchStep
handler must guard against stale callbacks and surface failures: update
onCompleteBatchStep to accept a generationNumber arg and fetch the workflow doc
(via the step's workflowId or ctx.db as used by onCompleteHandler) to compare
generationNumber against workflow.generation and return early if stale; when
ctx.db.normalizeId("steps", stepId) yields no id, when
ctx.db.get(normalizedStepId) returns no entry, or when the step is already not
inProgress, log a warning via the same logger used by onCompleteHandler and
record the failure to the onCompleteFailures store (same pattern/symbols used in
onCompleteHandler), and only proceed to ctx.db.patch to clear
inProgress/completedAt/runResult when the generation matches and the entry is
valid.
| // Lightweight onComplete for batchGroup items. Write-only: inserts a result doc | ||
| // into batchResults — no read-modify-write, no OCC contention. | ||
| export const onCompleteBatchGroupItem = internalMutation({ | ||
| args: { | ||
| workId: vWorkIdValidator, | ||
| result: vResultValidator, | ||
| context: v.any(), | ||
| }, | ||
| returns: v.null(), | ||
| handler: async (ctx, args) => { | ||
| const { batchStepId, index } = args.context; | ||
| const normalizedId = ctx.db.normalizeId("steps", batchStepId); | ||
| if (!normalizedId) return; | ||
| await ctx.db.insert("batchResults", { | ||
| batchStepId: normalizedId, | ||
| index, | ||
| result: args.result, | ||
| }); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
❓ Verification inconclusive
Script executed:
#!/bin/bash
# Search for all batchResults usage to understand consumption patterns
rg "batchResults" --type=ts -B 2 -A 5Repository: get-convex/workflow
Repository: get-convex/workflow
Exit code: 0
stdout:
src/component/batchGroup.test.ts- * - _checkBatchCompletion (pool.ts)
src/component/batchGroup.test.ts- * - loadBatchResults (journal.ts)
src/component/batchGroup.test.ts: * - cleanup with batchResults (workflow.ts)
src/component/batchGroup.test.ts- * - cancel mid-batchGroup (workflow.ts)
src/component/batchGroup.test.ts- */
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts-// Helper: create a workflow and return its ID + generationNumber
src/component/batchGroup.test.ts-async function createTestWorkflow(t: ReturnType<typeof initConvexTest>) {
--
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts: test("inserts a result doc into batchResults", async () => {
src/component/batchGroup.test.ts- const t = initConvexTest();
src/component/batchGroup.test.ts- const { workflowId, generationNumber } = await createTestWorkflow(t);
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts- const { entry } = await t.mutation(api.journal.startBatchGroupStep, {
src/component/batchGroup.test.ts- workflowId,
--
src/component/batchGroup.test.ts-});
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts:describe("cleanup with batchResults", () => {
src/component/batchGroup.test.ts- beforeEach(() => {
src/component/batchGroup.test.ts- vi.useFakeTimers();
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts- afterEach(() => {
src/component/batchGroup.test.ts- vi.useRealTimers();
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts: test("deletes batchResults when cleaning up a batchGroup workflow", async () => {
src/component/batchGroup.test.ts- const t = initConvexTest();
src/component/batchGroup.test.ts- const { workflowId, generationNumber } = await createTestWorkflow(t);
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts- const { entry } = await t.mutation(api.journal.startBatchGroupStep, {
src/component/batchGroup.test.ts- workflowId,
--
src/component/batchGroup.test.ts- expect(step).toBeNull();
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts: // All batchResults gone
src/component/batchGroup.test.ts- const remaining = await ctx.db
src/component/batchGroup.test.ts: .query("batchResults")
src/component/batchGroup.test.ts- .withIndex("batchStep", (q) =>
src/component/batchGroup.test.ts- q.eq("batchStepId", entry._id as Id<"steps">),
src/component/batchGroup.test.ts- )
src/component/batchGroup.test.ts- .collect();
src/component/batchGroup.test.ts- expect(remaining).toHaveLength(0);
--
src/component/batchGroup.test.ts- expect(steps).toHaveLength(0);
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts: // All batchResults gone
src/component/batchGroup.test.ts: const batchResults = await ctx.db.query("batchResults").collect();
src/component/batchGroup.test.ts: expect(batchResults).toHaveLength(0);
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts-});
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts-describe("cancel mid-batchGroup", () => {
--
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts-
src/component/batchGroup.test.ts: // Query batchResults for a step that has no batch results
src/component/batchGroup.test.ts- const results = await t.query(api.journal.loadBatchResults, {
src/component/batchGroup.test.ts- batchStepId: entries[0]._id as Id<"steps">,
src/component/batchGroup.test.ts- });
src/component/batchGroup.test.ts- expect(results).toHaveLength(0);
src/component/batchGroup.test.ts- });
--
src/component/workflow.ts- if (journalEntry.step.kind === "batchGroup") {
src/component/workflow.ts- const results = await ctx.db
src/component/workflow.ts: .query("batchResults")
src/component/workflow.ts- .withIndex("batchStep", (q) =>
src/component/workflow.ts- q.eq("batchStepId", journalEntry._id),
src/component/workflow.ts- )
src/component/workflow.ts- .collect();
src/component/workflow.ts- for (const r of results) {
--
src/component/schema.ts- "state.kind",
src/component/schema.ts- ]),
src/component/schema.ts: batchResults: defineTable({
src/component/schema.ts- batchStepId: v.id("steps"),
src/component/schema.ts- index: v.number(),
src/component/schema.ts- result: vResultValidator,
src/component/schema.ts- }).index("batchStep", ["batchStepId", "index"]),
src/component/schema.ts- onCompleteFailures: defineTable(
--
src/component/pool.ts-
src/component/pool.ts-// Lightweight onComplete for batchGroup items. Write-only: inserts a result doc
src/component/pool.ts:// into batchResults — no read-modify-write, no OCC contention.
src/component/pool.ts-export const onCompleteBatchGroupItem = internalMutation({
src/component/pool.ts- args: {
src/component/pool.ts- workId: vWorkIdValidator,
src/component/pool.ts- result: vResultValidator,
src/component/pool.ts- context: v.any(),
--
src/component/pool.ts- const normalizedId = ctx.db.normalizeId("steps", batchStepId);
src/component/pool.ts- if (!normalizedId) return;
src/component/pool.ts: await ctx.db.insert("batchResults", {
src/component/pool.ts- batchStepId: normalizedId,
src/component/pool.ts- index,
src/component/pool.ts- result: args.result,
src/component/pool.ts- });
src/component/pool.ts- },
--
src/component/pool.ts-
src/component/pool.ts-// Polling completion checker for batch steps. Scheduled from startBatchSteps
src/component/pool.ts:// and startBatchGroupStep. For batchGroup steps, counts batchResults docs.
src/component/pool.ts-// For legacy batch steps, checks the inProgress index.
src/component/pool.ts-export const _checkBatchCompletion = internalMutation({
src/component/pool.ts- args: {
src/component/pool.ts- workflowId: v.id("workflows"),
src/component/pool.ts- generationNumber: v.number(),
--
src/component/pool.ts- for (const step of inProgressSteps) {
src/component/pool.ts- if (step.step.kind === "batchGroup") {
src/component/pool.ts: // Count batchResults for this batchGroup step.
src/component/pool.ts- // Use take(count) to stop reading early when not all results are in.
src/component/pool.ts- const results = await ctx.db
src/component/pool.ts: .query("batchResults")
src/component/pool.ts- .withIndex("batchStep", (q) => q.eq("batchStepId", step._id))
src/component/pool.ts- .take(step.step.count);
src/component/pool.ts- if (results.length >= step.step.count) {
src/component/pool.ts- // All items complete — mark the batchGroup step as done.
src/component/pool.ts- await ctx.db.patch(step._id, {
--
src/component/journal.ts- handler: async (ctx, args) => {
src/component/journal.ts- const results = await ctx.db
src/component/journal.ts: .query("batchResults")
src/component/journal.ts- .withIndex("batchStep", (q) => q.eq("batchStepId", args.batchStepId))
src/component/journal.ts- .collect();
src/component/journal.ts- return results.map((r) => ({ index: r.index, result: r.result }));
src/component/journal.ts- },
src/component/journal.ts-});
--
src/client/step.ts- );
src/client/step.ts- }
src/client/step.ts: // Load all individual results from the batchResults table.
src/client/step.ts- const results = (await this.ctx.runQuery(
src/client/step.ts- this.component.journal.loadBatchResults,
src/client/step.ts- { batchStepId: entry._id },
src/client/step.ts- )) as unknown as Array<{
src/client/step.ts- index: number;
--
src/client/step.test.ts- // The mock currently returns one fixed array. Let's use a per-call approach.
src/client/step.test.ts- const callLog: string[] = [];
src/client/step.test.ts: let batchResultsMap: Record<
src/client/step.test.ts- string,
src/client/step.test.ts- Array<{
src/client/step.test.ts- index: number;
src/client/step.test.ts- result: { kind: string; returnValue?: unknown; error?: string };
src/client/step.test.ts- }>
src/client/step.test.ts- > = {};
src/client/step.test.ts-
src/client/step.test.ts: batchResultsMap["step_bg_0"] = [
src/client/step.test.ts- { index: 0, result: { kind: "success", returnValue: "bg1_item0" } },
src/client/step.test.ts- { index: 1, result: { kind: "success", returnValue: "bg1_item1" } },
src/client/step.test.ts- ];
src/client/step.test.ts: batchResultsMap["step_bg_1"] = [
src/client/step.test.ts- { index: 0, result: { kind: "success", returnValue: "bg2_item0" } },
src/client/step.test.ts- { index: 1, result: { kind: "success", returnValue: "bg2_item1" } },
src/client/step.test.ts- { index: 2, result: { kind: "success", returnValue: "bg2_item2" } },
src/client/step.test.ts- ];
src/client/step.test.ts-
--
src/client/step.test.ts- if (_ref === mockComponent.journal.loadBatchResults) {
src/client/step.test.ts- callLog.push(args.batchStepId);
src/client/step.test.ts: return batchResultsMap[args.batchStepId] ?? [];
src/client/step.test.ts- }
src/client/step.test.ts- return [];
src/client/step.test.ts- }),
src/client/step.test.ts- } as any;
src/client/step.test.ts-
--
src/client/step.test.ts- } as any;
src/client/step.test.ts-
src/client/step.test.ts: const batchResultsMap: Record<string, any[]> = {
src/client/step.test.ts- step_bg_0: [
src/client/step.test.ts- { index: 0, result: { kind: "success", returnValue: "b1_0" } },
src/client/step.test.ts- { index: 1, result: { kind: "success", returnValue: "b1_1" } },
src/client/step.test.ts- ],
src/client/step.test.ts- step_bg_2: [
--
src/client/step.test.ts- runQuery: vi.fn(async (_ref: any, args: any) => {
src/client/step.test.ts- if (_ref === mockComponent.journal.loadBatchResults) {
src/client/step.test.ts: return batchResultsMap[args.batchStepId] ?? [];
src/client/step.test.ts- }
src/client/step.test.ts- return [];
src/client/step.test.ts- }),
src/client/step.test.ts- } as any;
src/client/step.test.ts-
--
src/client/step.test.ts- });
src/client/step.test.ts-
src/client/step.test.ts: it("replay should give clear error when batchResults count < expected count", async () => {
src/client/step.test.ts- // If loadBatchResults returns fewer results than entry.step.count,
src/client/step.test.ts- // the code crashes with:
src/client/step.test.ts- // TypeError: Cannot read properties of undefined (reading 'result')
src/client/step.test.ts- // It should throw a clear assertion error instead.
src/client/step.test.ts- const bgEntry = fakeBatchGroupEntry(0, 3);
Script executed:
#!/bin/bash
# Find the context around how loadBatchResults results are used in client/step.ts
rg "loadBatchResults" --type=ts -B 5 -A 15 src/client/step.tsRepository: get-convex/workflow
Repository: get-convex/workflow
Exit code: 0
stdout:
`Assertion failed: batchGroup entry still in progress`,
);
}
// Load all individual results from the batchResults table.
const results = (await this.ctx.runQuery(
this.component.journal.loadBatchResults,
{ batchStepId: entry._id },
)) as unknown as Array<{
index: number;
result: { kind: string; returnValue?: unknown; error?: string };
}>;
results.sort((a, b) => a.index - b.index);
// Validate results count and index integrity.
if (results.length !== entry.step.count) {
throw new Error(
`Batch result count mismatch for ${entry._id}: ` +
`expected ${entry.step.count} results but got ${results.length}`,
);
}
for (let i = 0; i < results.length; i++) {
Script executed:
#!/bin/bash
# Get the full iteration logic and any additional batchResults usage
rg "loadBatchResults" --type=ts -B 5 -A 25 src/client/step.tsRepository: get-convex/workflow
Repository: get-convex/workflow
Exit code: 0
stdout:
`Assertion failed: batchGroup entry still in progress`,
);
}
// Load all individual results from the batchResults table.
const results = (await this.ctx.runQuery(
this.component.journal.loadBatchResults,
{ batchStepId: entry._id },
)) as unknown as Array<{
index: number;
result: { kind: string; returnValue?: unknown; error?: string };
}>;
results.sort((a, b) => a.index - b.index);
// Validate results count and index integrity.
if (results.length !== entry.step.count) {
throw new Error(
`Batch result count mismatch for ${entry._id}: ` +
`expected ${entry.step.count} results but got ${results.length}`,
);
}
for (let i = 0; i < results.length; i++) {
if (results[i].index !== i) {
throw new Error(
`Batch result index mismatch for ${entry._id}: ` +
`expected index ${i} at position ${i} but got index ${results[i].index}`,
);
}
}
// Resolve the first message (already dequeued).
this._completeBatchItem(message, results[0]);
Duplicate result insertions are possible and will break batch completion.
If a workpool retries a batch item (e.g., transient failure + retry policy), onCompleteBatchGroupItem will insert a second batchResults doc for the same (batchStepId, index). This creates two problems:
-
Completion detection race:
_checkBatchCompletionusestake(count)to check if all items are done. If duplicates exist for lower indices, it may mark the step complete when higher indices are actually still missing. -
Replay validation failure: When
loadBatchResultscollects all docs (including duplicates), the client-side validation instep.tswill fail with "Batch result index mismatch" when it detects the out-of-order indices caused by duplicates.
Consider adding a uniqueness guard — for example, query batchResults by (batchStepId, index) before inserting, or use a unique compound field.
🤖 Prompt for AI Agents
In `@src/component/pool.ts` around lines 249 - 268, onCompleteBatchGroupItem can
insert duplicate batchResults for the same (batchStepId, index); add a
uniqueness guard before inserting to prevent duplicates and the resulting
completion/race and replay validation failures. Specifically, in the handler for
onCompleteBatchGroupItem (use batchStepId/index and normalizedId already
present) check the batchResults table for an existing document with that
compound key (e.g., ctx.db.findOne or a query by batchStepId and index) and only
call ctx.db.insert("batchResults", ...) if none exists, or alternatively perform
an upsert/atomic insert that enforces uniqueness; reference the insert call in
onCompleteBatchGroupItem and ensure this change prevents duplicates that break
_checkBatchCompletion and loadBatchResults in step.ts.
| // Polling completion checker for batch steps. Scheduled from startBatchSteps | ||
| // and startBatchGroupStep. For batchGroup steps, counts batchResults docs. | ||
| // For legacy batch steps, checks the inProgress index. | ||
| export const _checkBatchCompletion = internalMutation({ | ||
| args: { | ||
| workflowId: v.id("workflows"), | ||
| generationNumber: v.number(), | ||
| workpoolOptions: v.optional(workpoolOptions), | ||
| _pollCount: v.optional(v.number()), | ||
| _firstPollAt: v.optional(v.number()), | ||
| }, | ||
| returns: v.null(), | ||
| handler: async (ctx, args) => { | ||
| const pollCount = (args._pollCount ?? 0) + 1; | ||
| const firstPollAt = args._firstPollAt ?? Date.now(); | ||
|
|
||
| // Find all in-progress steps for this workflow. | ||
| const inProgressSteps = await ctx.db | ||
| .query("steps") | ||
| .withIndex("inProgress", (q) => | ||
| q.eq("step.inProgress", true).eq("workflowId", args.workflowId), | ||
| ) | ||
| .collect(); | ||
|
|
||
| let anyRemaining = false; | ||
| for (const step of inProgressSteps) { | ||
| if (step.step.kind === "batchGroup") { | ||
| // Count batchResults for this batchGroup step. | ||
| // Use take(count) to stop reading early when not all results are in. | ||
| const results = await ctx.db | ||
| .query("batchResults") | ||
| .withIndex("batchStep", (q) => q.eq("batchStepId", step._id)) | ||
| .take(step.step.count); | ||
| if (results.length >= step.step.count) { | ||
| // All items complete — mark the batchGroup step as done. | ||
| await ctx.db.patch(step._id, { | ||
| step: { | ||
| ...step.step, | ||
| inProgress: false, | ||
| completedAt: Date.now(), | ||
| runResult: { kind: "success", returnValue: null }, | ||
| }, | ||
| }); | ||
| // There may be other in-progress steps; continue checking. | ||
| } else { | ||
| anyRemaining = true; | ||
| } | ||
| } else { | ||
| // Legacy batch step or other in-progress step — still waiting. | ||
| anyRemaining = true; | ||
| } | ||
| } | ||
|
|
||
| if (anyRemaining) { | ||
| // Not done yet — poll again in 200ms. | ||
| await ctx.scheduler.runAfter( | ||
| 200, | ||
| internal.pool._checkBatchCompletion, | ||
| { | ||
| ...args, | ||
| _pollCount: pollCount, | ||
| _firstPollAt: firstPollAt, | ||
| }, | ||
| ); | ||
| return; | ||
| } | ||
| // All batch steps done — re-enqueue the workflow. | ||
| const elapsed = ((Date.now() - firstPollAt) / 1000).toFixed(1); | ||
| const console = await getDefaultLogger(ctx); | ||
| console.info( | ||
| `[PERF] Batch completion detected after ${pollCount} polls (${elapsed}s)`, | ||
| ); | ||
| const workflow = await getWorkflow(ctx, args.workflowId, null); | ||
| if (workflow.runResult !== undefined) return; | ||
| if (workflow.generationNumber !== args.generationNumber) return; | ||
| const workpool = await getWorkpool(ctx, args.workpoolOptions); | ||
| await enqueueWorkflow(ctx, workflow, workpool); | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Unbounded polling — no timeout or max-poll safeguard.
_pollCount and _firstPollAt are tracked but only used for a log message. If a batch item's action silently fails (no callback, no retry) or a bug leaves a step permanently inProgress, this poller will schedule a new mutation every 200 ms forever, creating unbounded load on the database and scheduler.
Add a timeout (e.g., 10 minutes) or a maximum poll count, after which the poller logs an error and marks the workflow as failed.
Sketch: add a timeout guard
handler: async (ctx, args) => {
const pollCount = (args._pollCount ?? 0) + 1;
const firstPollAt = args._firstPollAt ?? Date.now();
+ const MAX_POLL_DURATION_MS = 10 * 60 * 1000; // 10 minutes
+ if (Date.now() - firstPollAt > MAX_POLL_DURATION_MS) {
+ const console = await getDefaultLogger(ctx);
+ console.error(
+ `[BATCH] Polling timed out for workflow ${args.workflowId} after ${pollCount} polls`,
+ );
+ // Optionally fail the workflow here.
+ return;
+ }🤖 Prompt for AI Agents
In `@src/component/pool.ts` around lines 270 - 348, The _checkBatchCompletion
poller currently retries indefinitely; add a max-poll safeguard using the
existing _pollCount and _firstPollAt values in the internalMutation handler
(function _checkBatchCompletion) so it stops after a configured limit (e.g.,
MAX_POLL_COUNT = 300 for ~10 minutes at 200ms) or when elapsed time exceeds a
timeout (e.g., MAX_POLL_MS = 10*60*1000), log an error via getDefaultLogger when
the guard trips, mark the workflow/run as failed (set workflow.step/runResult
appropriately) or clear inProgress and set a failure runResult, and stop
scheduling further runAfter calls (i.e., do not call
internal.pool._checkBatchCompletion again); ensure checks occur before
re-enqueueing via enqueueWorkflow and preserve the existing
pollCount/_firstPollAt updates when rescheduling.
| : step.step.kind === "batchGroup" | ||
| ? { | ||
| kind: "function" as const, | ||
| workId: undefined!, | ||
| } | ||
| : { | ||
| kind: "function", | ||
| workId: step.step.workId!, | ||
| }), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the WorkflowStep type definition
rg -n "type WorkflowStep|interface WorkflowStep" --type ts -A 15 src/Repository: get-convex/workflow
Length of output: 2474
🏁 Script executed:
# Look at the code in context around lines 155-163 in workflow.ts
sed -n '140,170p' src/component/workflow.tsRepository: get-convex/workflow
Length of output: 898
🏁 Script executed:
# Find usages of publicStep to see how consumers use it
rg -n "publicStep" --type ts -B 2 -A 2 src/Repository: get-convex/workflow
Length of output: 618
🏁 Script executed:
# Check for batchGroup kind definition
rg -n "batchGroup" --type ts -C 3 src/Repository: get-convex/workflow
Length of output: 38896
workId: undefined! bypasses type safety in the public API.
The WorkflowStep type requires workId: WorkId for kind: "function" steps, but batchGroup steps are mapped to this variant with workId: undefined!. This non-null assertion silences the type checker while producing a runtime value that violates the contract. Consumers expecting a valid workId will get undefined instead.
Add a dedicated "batchGroup" variant to the WorkflowStep type, or make workId optional on the "function" variant.
🤖 Prompt for AI Agents
In `@src/component/workflow.ts` around lines 155 - 163, The mapping currently
forces a function variant with workId: undefined! which breaks the WorkflowStep
contract; update the WorkflowStep discriminated union to include a dedicated
"batchGroup" variant (e.g., { kind: "batchGroup"; /* fields as needed */ }) and
change the mapper that checks step.step.kind === "batchGroup" to return that
"batchGroup" variant instead of casting to { kind: "function", workId:
undefined! }; also update any switch/case or consumers that pattern-match on
WorkflowStep (e.g., where WorkflowStep is handled) to handle the new
"batchGroup" discriminator so workId remains required on the "function" variant
and no non-null assertions are needed.
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/component/batchGroup.test.ts`:
- Around line 237-241: The test call to
t.mutation(internal.pool.onCompleteBatchGroupItem, ...) uses batchStepId:
"invalid_id" which will fail TypeScript because batchStepId expects Id<"steps">;
change the literal to include a type assertion (e.g., "invalid_id" as
Id<"steps"> or as any) so the test compiles while still exercising the
invalid-ID path, updating the argument object in the invocation of
onCompleteBatchGroupItem accordingly.
- Around line 104-105: The call passes vi.advanceTimersByTime (signature
(ms:number)=>...) into t.finishAllScheduledFunctions which expects a zero-arg
function; fix by passing a zero-arg wrapper that calls the timer helper with an
explicit ms (e.g. () => vi.advanceTimersByTime(0)) or replace with a zero-arg
timer runner like () => vi.runAllTimers(); update the call site using the
functions vi.advanceTimersByTime and t.finishAllScheduledFunctions accordingly.
- Around line 50-53: The test accesses batchGroup-specific properties on
result.entry.step without narrowing the discriminated union, causing a TS error;
update the test to narrow result.entry.step to the "batchGroup" variant before
asserting count/inProgress/name (e.g., use a type guard or switch on step.kind,
or create a local const like const step = result.entry.step as Extract<typeof
result.entry.step, { kind: "batchGroup" }>) and then perform the expects against
that narrowed step variable (reference: result.entry.step in the
batchGroup.test.ts assertions).
🧹 Nitpick comments (1)
src/component/batchGroup.test.ts (1)
32-38: RepeatedbeforeEach/afterEachfake-timer setup across all describe blocks.The same
vi.useFakeTimers()/vi.useRealTimers()pattern is duplicated in everydescribe. Consider hoisting it to a top-levelbeforeEach/afterEachor wrapping all describes in an outerdescribeblock to reduce boilerplate.Also applies to: 127-133, 245-251, 370-376, 498-504, 589-595
| expect(result.entry.step.kind).toBe("batchGroup"); | ||
| expect(result.entry.step.count).toBe(10); | ||
| expect(result.entry.step.inProgress).toBe(true); | ||
| expect(result.entry.step.name).toBe("batchGroup"); |
There was a problem hiding this comment.
Type error: count does not exist on the step union type.
Static analysis confirms Property 'count' does not exist on type ... at line 51. The step field is a discriminated union; you need to narrow the type before accessing batchGroup-specific properties.
Proposed fix
+ expect(result.entry.step.kind).toBe("batchGroup");
+ // Narrow the type for batchGroup-specific assertions
+ if (result.entry.step.kind !== "batchGroup") throw new Error("unexpected");
expect(result.entry.step.count).toBe(10);
expect(result.entry.step.inProgress).toBe(true);
- expect(result.entry.step.name).toBe("batchGroup");
+ expect(result.entry.step.name).toBe("batchGroup");Alternatively, cast once:
const step = result.entry.step as Extract<typeof result.entry.step, { kind: "batchGroup" }>;
expect(step.kind).toBe("batchGroup");
expect(step.count).toBe(10);🧰 Tools
🪛 GitHub Check: Test and lint
[failure] 51-51:
Property 'count' does not exist on type '{ kind?: "function" | undefined; runResult?: { kind: "success"; returnValue: any; } | { error: string; kind: "failed"; } | { kind: "canceled"; } | undefined; workId?: WorkId | undefined; ... 7 more ...; argsSize: number; } | { ...; } | { ...; } | { ...; }'.
🤖 Prompt for AI Agents
In `@src/component/batchGroup.test.ts` around lines 50 - 53, The test accesses
batchGroup-specific properties on result.entry.step without narrowing the
discriminated union, causing a TS error; update the test to narrow
result.entry.step to the "batchGroup" variant before asserting
count/inProgress/name (e.g., use a type guard or switch on step.kind, or create
a local const like const step = result.entry.step as Extract<typeof
result.entry.step, { kind: "batchGroup" }>) and then perform the expects against
that narrowed step variable (reference: result.entry.step in the
batchGroup.test.ts assertions).
| const timer = vi.advanceTimersByTime; | ||
| await t.finishAllScheduledFunctions(timer); |
There was a problem hiding this comment.
Type mismatch: vi.advanceTimersByTime signature doesn't match () => void.
vi.advanceTimersByTime is (ms: number) => VitestUtils, but finishAllScheduledFunctions expects () => void. This fails compilation as confirmed by CI.
Proposed fix
- const timer = vi.advanceTimersByTime;
- await t.finishAllScheduledFunctions(timer);
+ await t.finishAllScheduledFunctions(() => vi.advanceTimersByTime(1));Or if the intent is to run all pending timers:
- const timer = vi.advanceTimersByTime;
- await t.finishAllScheduledFunctions(timer);
+ await t.finishAllScheduledFunctions(vi.runAllTimers.bind(vi));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const timer = vi.advanceTimersByTime; | |
| await t.finishAllScheduledFunctions(timer); | |
| await t.finishAllScheduledFunctions(() => vi.advanceTimersByTime(1)); |
🧰 Tools
🪛 GitHub Check: Test and lint
[failure] 105-105:
Argument of type '(ms: number) => VitestUtils' is not assignable to parameter of type '() => void'.
🤖 Prompt for AI Agents
In `@src/component/batchGroup.test.ts` around lines 104 - 105, The call passes
vi.advanceTimersByTime (signature (ms:number)=>...) into
t.finishAllScheduledFunctions which expects a zero-arg function; fix by passing
a zero-arg wrapper that calls the timer helper with an explicit ms (e.g. () =>
vi.advanceTimersByTime(0)) or replace with a zero-arg timer runner like () =>
vi.runAllTimers(); update the call site using the functions
vi.advanceTimersByTime and t.finishAllScheduledFunctions accordingly.
| await t.mutation(internal.pool.onCompleteBatchGroupItem, { | ||
| workId: "w0" as any, | ||
| result: { kind: "success", returnValue: "ok" }, | ||
| context: { batchStepId: "invalid_id", index: 0 }, | ||
| }); |
There was a problem hiding this comment.
"invalid_id" likely needs a type assertion to compile.
batchStepId presumably expects Id<"steps">. Unlike workId (which has as any), this string literal will fail type-checking. Add a cast to keep the test compiling while still exercising the invalid-ID path.
Proposed fix
- context: { batchStepId: "invalid_id", index: 0 },
+ context: { batchStepId: "invalid_id" as Id<"steps">, index: 0 },📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await t.mutation(internal.pool.onCompleteBatchGroupItem, { | |
| workId: "w0" as any, | |
| result: { kind: "success", returnValue: "ok" }, | |
| context: { batchStepId: "invalid_id", index: 0 }, | |
| }); | |
| await t.mutation(internal.pool.onCompleteBatchGroupItem, { | |
| workId: "w0" as any, | |
| result: { kind: "success", returnValue: "ok" }, | |
| context: { batchStepId: "invalid_id" as Id<"steps">, index: 0 }, | |
| }); |
🤖 Prompt for AI Agents
In `@src/component/batchGroup.test.ts` around lines 237 - 241, The test call to
t.mutation(internal.pool.onCompleteBatchGroupItem, ...) uses batchStepId:
"invalid_id" which will fail TypeScript because batchStepId expects Id<"steps">;
change the literal to include a type assertion (e.g., "invalid_id" as
Id<"steps"> or as any) so the test compiles while still exercising the
invalid-ID path, updating the argument object in the invocation of
onCompleteBatchGroupItem accordingly.



feat: integrate BatchWorkpool with workflow step execution
Route step.runAction() through BatchWorkpool when the action is
batch-registered. Pass
batchoption to WorkflowManager, which threadsit through to StepExecutor. Adds startBatchSteps mutation that creates
journal entries without enqueueing to workpool, returning an onComplete
handle for the client to use with batch.enqueueByHandle().
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com
feat: add IO-constrained LLM simulation workflow demo
Add a demo workflow that simulates a complex IO-constrained pipeline
(like LLM generation) to compare regular workpool vs batched workpool.
The pipeline generates an outline, 200 parallel sections, and a summary,
demonstrating how BatchWorkpool processes all 200 sections concurrently
inside a single executor action vs requiring separate action slots.
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com
feat: save per-step results and increase batch parallelism
Store outline, sections, and summary progressively in the
llmSimulations table as each workflow step completes. Set
maxParallelism to 200 for batched workflow so all sections
dispatch in a single tick.
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com
Summary by CodeRabbit
New Features
Schema
Documentation
Bug Fixes
Tests
Chores