From e3ca17f0a864dc1e730ae650e211baace2a7432e Mon Sep 17 00:00:00 2001 From: Iwan Birrer Date: Sun, 17 May 2026 18:16:18 +0200 Subject: [PATCH 1/2] fix(electric-db-collection): use node environment for e2e tests The e2e tests hung indefinitely in beforeAll on Node 24. The jsdom test environment provides its own AbortController/AbortSignal, but fetch is Node's native undici fetch. The Electric ShapeStream creates an AbortController (jsdom) and passes its signal to fetch (undici); Node 24's undici strictly rejects the foreign signal with a TypeError. The Electric client's backoff wrapper treats that error as retryable and retries forever, so preload() never resolves and beforeAll times out. The e2e tests exercise Postgres + Electric data sync and use no DOM APIs, so switch the environment to `node`, where AbortController and fetch are both Node-native and compatible. CI did not catch this because it pins Node 20, whose undici accepted the jsdom signal via a looser duck-typed check. --- packages/electric-db-collection/vitest.e2e.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/electric-db-collection/vitest.e2e.config.ts b/packages/electric-db-collection/vitest.e2e.config.ts index 903f46de5..cf7a2ebea 100644 --- a/packages/electric-db-collection/vitest.e2e.config.ts +++ b/packages/electric-db-collection/vitest.e2e.config.ts @@ -6,6 +6,6 @@ export default defineConfig({ globalSetup: `../db-collection-e2e/support/global-setup.ts`, fileParallelism: false, // Critical for shared database testTimeout: 30000, - environment: `jsdom`, + environment: `node`, }, }) From 43d0237b7c48ad66e4c798efa8274f3f48fb8dea Mon Sep 17 00:00:00 2001 From: Iwan Birrer Date: Sun, 17 May 2026 18:57:26 +0200 Subject: [PATCH 2/2] fix(db): load nested includes/join children eagerly when parent keys are static MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In `progressive` sync mode an Electric collection serves a fast-path snapshot for any loadSubset that arrives before its first `up-to-date` message, then loads the rest in the background. A collection queried directly got this fast path; the same collection fetched as a nested `toArray` child — or joined to a filtered parent — did not. Its rows only appeared once the entire child collection finished syncing. Root cause is ordering. Includes and joins lazy-load the child/joined collection by tapping the parent pipeline and issuing the child loadSubset only once the parent has produced rows. That deferred call resolves after the child's full background sync has completed, so its snapshot is discarded as stale and the query never sees the intermediate fast-path result. Fix: when the parent query statically constrains the correlation/join field to known keys (e.g. `.where(eq(parent.id, 5))`), push an equality predicate onto the child so it loads its subset eagerly — in parallel with the parent — through the normal subscription path, instead of via the deferred tap. When the keys are not statically known the existing lazy-loading behaviour is kept. - extractEqualityKeys() in expression-helpers.ts reads a fixed key set from a WHERE expression (eq / in, possibly nested in and). - compiler/index.ts applies it to `toArray` includes children. - compiler/joins.ts applies it to inner/left join sources (the only join types where filtering the joined side is sound). Tests: - packages/db/tests/query/progressive-includes-fastpath.test.ts Deterministic unit test: the nested child loadSubset now runs eagerly, inside the fast-path window, even while the parent collection is unsynced. - packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts End-to-end against a real Electric server: the direct query, the nested toArray query and the nested join query all surface their 40-row target subset after only 40/4540 rows have loaded. --- packages/db/src/query/compiler/index.ts | 119 +++++--- packages/db/src/query/compiler/joins.ts | 77 ++++- packages/db/src/query/expression-helpers.ts | 75 +++++ .../progressive-includes-fastpath.test.ts | 175 +++++++++++ .../e2e/progressive-nested.e2e.test.ts | 289 ++++++++++++++++++ 5 files changed, 690 insertions(+), 45 deletions(-) create mode 100644 packages/db/tests/query/progressive-includes-fastpath.test.ts create mode 100644 packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index f4cff548d..397ca042d 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -25,6 +25,7 @@ import { } from '../ir.js' import { ensureIndexForField } from '../../indexes/auto-index.js' import { inArray } from '../builder/functions.js' +import { extractEqualityKeys } from '../expression-helpers.js' import { compileExpression, toBooleanPredicate } from './evaluators.js' import { processJoins } from './joins.js' import { containsAggregate, processGroupBy } from './group-by.js' @@ -38,6 +39,7 @@ import type { IncludesMaterialization, QueryIR, QueryRef, + Where, } from '../ir.js' import type { LazyCollectionCallbacks } from './joins.js' import type { Collection } from '../../collection/index.js' @@ -396,67 +398,98 @@ export function compileQuery( childFromCollection, ) + // When the parent query statically constrains the correlation field to + // known keys (e.g. `.where(eq(parent.id, 5))`), the child subset can be + // loaded eagerly — in parallel with the parent — instead of being + // deferred behind the parent pipeline. Deferring it makes the child + // loadSubset miss a progressive collection's fast-path window. + const parentCorrelationAlias = subquery.correlationField.path[0] + const staticCorrelationKeys = parentCorrelationAlias + ? extractEqualityKeys( + sourceWhereClauses.get(parentCorrelationAlias), + subquery.correlationField.path, + ) + : null + // Non-empty key set => load the child eagerly; otherwise lazy-load it. + const eagerChildKeys = + staticCorrelationKeys && staticCorrelationKeys.length > 0 + ? staticCorrelationKeys + : null + if (followRefResult) { const followRefCollection = followRefResult.collection const fieldPath = followRefResult.path const fieldName = fieldPath[0] - // 1. Mark child source as lazy so CollectionSubscriber skips initial full load - lazySources.add(childCorrelationAlias) - - // 2. Ensure an index on the correlation field for efficient lookups + // Ensure an index on the correlation field for efficient lookups if (fieldName) { ensureIndexForField(fieldName, fieldPath, followRefCollection) } - // 3. Tap parent keys to intercept correlation values and request - // matching child rows on-demand via the child's subscription - parentKeys = parentKeys.pipe( - tap((data: any) => { - const resolvedAlias = - aliasRemapping[childCorrelationAlias] || childCorrelationAlias - const lazySourceSubscription = subscriptions[resolvedAlias] + // When the keys aren't statically known, fall back to lazy loading: + // mark the child source lazy so its CollectionSubscriber skips the + // initial full load, and tap the parent keys to request matching + // child rows on-demand once the parent pipeline produces them. + if (!eagerChildKeys) { + lazySources.add(childCorrelationAlias) + + parentKeys = parentKeys.pipe( + tap((data: any) => { + const resolvedAlias = + aliasRemapping[childCorrelationAlias] || childCorrelationAlias + const lazySourceSubscription = subscriptions[resolvedAlias] + + if (!lazySourceSubscription) { + return + } - if (!lazySourceSubscription) { - return - } + if (lazySourceSubscription.hasLoadedInitialState()) { + return + } - if (lazySourceSubscription.hasLoadedInitialState()) { - return - } + const joinKeys = [ + ...new Set( + data + .getInner() + .map( + ([[correlationValue]]: any) => + correlationValue as unknown, + ) + .filter((joinKey: unknown) => joinKey != null), + ), + ] + + if (joinKeys.length === 0) { + return + } - const joinKeys = [ - ...new Set( - data - .getInner() - .map( - ([[correlationValue]]: any) => correlationValue as unknown, - ) - .filter((key: unknown) => key != null), - ), - ] - - if (joinKeys.length === 0) { - return - } + const lazyJoinRef = new PropRef(fieldPath) + lazySourceSubscription.requestSnapshot({ + where: inArray(lazyJoinRef, joinKeys), + }) + }), + ) + } + } - const lazyJoinRef = new PropRef(fieldPath) - lazySourceSubscription.requestSnapshot({ - where: inArray(lazyJoinRef, joinKeys), - }) - }), + // Extra WHERE clauses appended to the child query: parent-referencing + // filters (applied post-join) and, when the parent keys are statically + // known, an eager correlation predicate so the child loads its subset + // immediately via the normal (non-lazy) subscription path. + const extraChildWhere: Array = [] + if (subquery.parentFilters && subquery.parentFilters.length > 0) { + extraChildWhere.push(...subquery.parentFilters) + } + if (eagerChildKeys) { + extraChildWhere.push( + inArray(subquery.childCorrelationField, eagerChildKeys), ) } - - // If parent filters exist, append them to the child query's WHERE const childQuery = - subquery.parentFilters && subquery.parentFilters.length > 0 + extraChildWhere.length > 0 ? { ...subquery.query, - where: [ - ...(subquery.query.where || []), - ...subquery.parentFilters, - ], + where: [...(subquery.query.where || []), ...extraChildWhere], } : subquery.query diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 58db91257..ddfe46f35 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -14,7 +14,8 @@ import { import { normalizeValue } from '../../utils/comparison.js' import { ensureIndexForField } from '../../indexes/auto-index.js' import { PropRef, followRef } from '../ir.js' -import { inArray } from '../builder/functions.js' +import { and, inArray } from '../builder/functions.js' +import { extractEqualityKeys } from '../expression-helpers.js' import { compileExpression } from './evaluators.js' import type { CompileQueryFn } from './index.js' import type { OrderByOptimizationInfo } from './order-by.js' @@ -221,7 +222,35 @@ function processJoin( throw new UnsupportedJoinTypeError(joinClause.type) } - if (activeSource) { + // When the query statically constrains the main side of the join condition + // to known keys, filter the joined side to the same keys so it loads only + // the matching subset eagerly. This avoids deferring it behind a + // lazy-loading tap, which would miss a progressive collection's fast-path + // window. Sound only for inner/left joins (see deriveStaticJoinFilter). + const canFilterJoinedSide = + joinClause.type === `inner` || joinClause.type === `left` + const staticJoinFilter = canFilterJoinedSide + ? deriveStaticJoinFilter( + mainExpr, + joinedExpr, + joinedSource, + isCollectionRef, + sourceWhereClauses, + ) + : null + if (staticJoinFilter) { + const existing = sourceWhereClauses.get(staticJoinFilter.alias) + sourceWhereClauses.set( + staticJoinFilter.alias, + existing + ? and(existing, staticJoinFilter.filter) + : staticJoinFilter.filter, + ) + } + + // Skip lazy loading when the joined side is already statically filtered — + // it now loads its subset eagerly via the normal subscription path. + if (activeSource && !staticJoinFilter) { // If the lazy collection comes from a subquery that has a limit and/or an offset clause // then we need to deoptimize the join because we don't know which rows are in the result set // since we simply lookup matching keys in the index but the index contains all rows @@ -660,3 +689,47 @@ function getActiveAndLazySources( return { activeSource: undefined, lazySource: undefined } } } + +/** + * When the query statically constrains the main side of an inner/left join to + * a known set of keys (e.g. `.where(eq(group.id, 5))`), derives an equality + * filter for the joined side of the join condition. + * + * Applying that filter lets the joined collection load only the matching + * subset eagerly, instead of being deferred behind a lazy-loading tap (which + * misses a progressive collection's fast-path window) or loaded in full as the + * join's "active" side. + * + * Only sound for `inner` and `left` joins, where a joined row is needed solely + * when it matches a main row — `right`/`full` joins keep unmatched joined rows. + * Only plain collection join sources are filtered; subquery sources keep the + * existing lazy-loading behaviour. + * + * @returns The joined alias and the derived predicate, or `null` when the main + * side is not statically constrained. + */ +function deriveStaticJoinFilter( + mainExpr: BasicExpression, + joinedExpr: BasicExpression, + joinedSource: string, + joinedSideIsCollection: boolean, + sourceWhereClauses: Map>, +): { alias: string; filter: BasicExpression } | null { + if ( + !joinedSideIsCollection || + mainExpr.type !== `ref` || + joinedExpr.type !== `ref` + ) { + return null + } + + const mainKeys = extractEqualityKeys( + sourceWhereClauses.get(mainExpr.path[0]!), + mainExpr.path, + ) + if (mainKeys && mainKeys.length > 0) { + return { alias: joinedSource, filter: inArray(joinedExpr, mainKeys) } + } + + return null +} diff --git a/packages/db/src/query/expression-helpers.ts b/packages/db/src/query/expression-helpers.ts index 9b79159e0..419c1d01f 100644 --- a/packages/db/src/query/expression-helpers.ts +++ b/packages/db/src/query/expression-helpers.ts @@ -520,3 +520,78 @@ export function parseLoadSubsetOptions( limit: options.limit, } } + +/** + * Determines whether a WHERE expression constrains the field at `fieldPath` to + * a fixed, statically-known set of literal values. + * + * Recognises a top-level `eq` or `in` comparison against the field — possibly + * nested inside `and` — and returns the literal value set. Returns `null` when + * the field is not statically constrained (for example it is compared against + * another column, or the predicate uses `or`). + * + * Used to push correlation predicates eagerly onto lazily-loaded join/includes + * children: when the parent side is filtered to known keys, the child subset + * can be loaded immediately instead of waiting for the parent query to resolve. + * + * @param where - The WHERE expression to inspect + * @param fieldPath - The full path of the field (alias included, e.g. `['user', 'id']`) + * @returns The literal value set, or `null` when not statically constrained + * + * @example + * ```typescript + * // eq(user.id, 5) -> [5] + * // in(user.id, [1, 2, 3]) -> [1, 2, 3] + * // and(eq(user.id, 5), ...) -> [5] + * // or(...) / eq(user.id, x) -> null + * ``` + */ +export function extractEqualityKeys( + where: BasicExpression | undefined, + fieldPath: ReadonlyArray, +): Array | null { + if (!where || where.type !== `func`) { + return null + } + + const matchesField = (expr: BasicExpression): boolean => + expr.type === `ref` && + expr.path.length === fieldPath.length && + expr.path.every((segment, index) => segment === fieldPath[index]) + + const { name, args } = where + + if (name === `and`) { + for (const arg of args) { + const keys = extractEqualityKeys( + arg as BasicExpression, + fieldPath, + ) + if (keys) { + return keys + } + } + return null + } + + if (name === `eq` && args.length === 2) { + const [lhs, rhs] = args as [BasicExpression, BasicExpression] + if (matchesField(lhs) && rhs.type === `val`) { + return [rhs.value] + } + if (matchesField(rhs) && lhs.type === `val`) { + return [lhs.value] + } + return null + } + + if (name === `in` && args.length === 2) { + const [lhs, rhs] = args as [BasicExpression, BasicExpression] + if (matchesField(lhs) && rhs.type === `val` && Array.isArray(rhs.value)) { + return [...rhs.value] + } + return null + } + + return null +} diff --git a/packages/db/tests/query/progressive-includes-fastpath.test.ts b/packages/db/tests/query/progressive-includes-fastpath.test.ts new file mode 100644 index 000000000..fea9004cc --- /dev/null +++ b/packages/db/tests/query/progressive-includes-fastpath.test.ts @@ -0,0 +1,175 @@ +import { describe, expect, it, vi } from 'vitest' +import { createLiveQueryCollection, eq, toArray } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { extractSimpleComparisons } from '../../src/query/expression-helpers.js' +import { flushPromises } from '../utils.js' +import type { LoadSubsetOptions } from '../../src/types.js' + +/** + * Regression test for a progressive-sync-mode bug with nested `toArray` + * subqueries. + * + * In `progressive` sync mode an Electric collection serves a fast-path + * `fetchSnapshot` for any `loadSubset` call that arrives BEFORE its first + * `up-to-date` message (`isBufferingInitialSync()` in electric.ts). After that + * the window is closed and the data is only available once the full background + * sync finishes. + * + * - Direct query (`q.from({post}).where(eq(post.userId, X))`): + * the collection subscriber issues `loadSubset` immediately, inside the + * window -> fast path works. + * + * - Nested query (`post` fetched via `toArray` inside a user's `select`): + * the BUG was that includes lazy-loading deferred the child `loadSubset` + * until the parent `users` query produced rows. The fix: when the parent + * query statically constrains the correlation field (`eq(user.id, X)`), + * the child subset is loaded eagerly — in parallel with the parent — so it + * still lands inside the fast-path window. + * + * This is the deterministic proof. The fast-path window is modelled with a + * plain `@tanstack/db` collection whose `loadSubset` records whether it was + * called while the window was open — the deferral it exposes is real + * `@tanstack/db` behaviour, only the window is simulated. The same scenario + * against a real Electric server is verified end-to-end by + * packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts. + */ + +type User = { id: number; name: string } +type Post = { id: number; userId: number; title: string } + +const users: Array = [ + { id: 1, name: `U1` }, + { id: 2, name: `U2` }, + { id: 3, name: `U3` }, +] + +let seq = 0 + +/** + * A collection whose initial sync is gated: nothing is committed and the + * collection is not `markReady()` until `release()` is called. Mirrors a + * parent collection that takes a moment to sync. + */ +function makeGatedUsers() { + let release: () => void = () => {} + const gate = new Promise((resolve) => { + release = resolve + }) + const collection = createCollection({ + id: `fastpath-users-${seq++}`, + getKey: (u) => u.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + void (async () => { + await gate + begin() + for (const u of users) write({ type: `insert`, value: u }) + commit() + markReady() + })() + }, + }, + }) + return { collection, release } +} + +/** + * Models an Electric collection in `progressive` sync mode. + * + * `windowOpen` mirrors `isBufferingInitialSync()`: the progressive fast-path + * (`fetchSnapshot`) only runs while it is `true`. `closeWindow()` simulates the + * first `up-to-date` message arriving, after which `loadSubset` calls can no + * longer be served from a snapshot. + */ +function makeProgressivePosts() { + let windowOpen = true + const fastPathLoads: Array = [] + const lateLoads: Array = [] + + const collection = createCollection({ + id: `fastpath-posts-${seq++}`, + getKey: (p) => p.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + return { + loadSubset: vi.fn((opts: LoadSubsetOptions) => { + ;(windowOpen ? fastPathLoads : lateLoads).push(opts) + return Promise.resolve() + }), + } + }, + }, + }) + + return { + collection, + fastPathLoads, + lateLoads, + closeWindow: () => { + windowOpen = false + }, + } +} + +describe(`progressive sync — nested subquery fast-path`, () => { + it(`direct query: posts loadSubset runs INSIDE the fast-path window`, async () => { + const { collection: posts, fastPathLoads, lateLoads } = + makeProgressivePosts() + + const query = createLiveQueryCollection((q) => + q.from({ post: posts }).where(({ post }) => eq(post.userId, 2)), + ) + const preloadPromise = query.preload() + await flushPromises() + + // The collection subscriber issues loadSubset immediately — before the + // progressive collection's buffering window could close. + expect(fastPathLoads.length).toBeGreaterThan(0) + expect(lateLoads.length).toBe(0) + + await preloadPromise + }) + + it(`nested toArray child: posts loadSubset runs eagerly, INSIDE the fast-path window`, async () => { + // The parent `users` collection is gated and never released, so its data + // never arrives. Before the fix the child `posts` loadSubset was deferred + // behind the parent and would never run; with the fix it runs eagerly + // because the parent filters the correlation field to a known key. + const { collection: gatedUsers } = makeGatedUsers() + const { collection: posts, fastPathLoads, lateLoads } = + makeProgressivePosts() + + const query = createLiveQueryCollection((q) => + q + .from({ user: gatedUsers }) + .where(({ user }) => eq(user.id, 2)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + posts: toArray( + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, user.id)), + ), + })), + ) + // Not awaited: preload never resolves while the parent stays gated. + void query.preload() + await flushPromises() + + // The child loadSubset ran eagerly — inside the fast-path window — even + // though the parent `users` collection has not synced. + expect(fastPathLoads.length).toBeGreaterThan(0) + expect(lateLoads.length).toBe(0) + + // It was scoped to the parent's statically-known correlation key. + const lastLoad = fastPathLoads[fastPathLoads.length - 1]! + expect(extractSimpleComparisons(lastLoad.where)).toEqual([ + { field: [`userId`], operator: `in`, value: [2] }, + ]) + }) +}) diff --git a/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts b/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts new file mode 100644 index 000000000..d51dca385 --- /dev/null +++ b/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts @@ -0,0 +1,289 @@ +/** + * Progressive sync mode — nested subquery & join fast-path (end-to-end) + * + * Regression test against a real Electric server. + * + * In `progressive` sync mode an Electric collection loads a fast-path snapshot + * of the rows a query needs FIRST, then the rest of the collection in the + * background. This works when a collection is queried directly: + * + * q.from({ post: posts }).where(({ post }) => eq(post.userId, X)) + * + * It previously did NOT work when the same `posts` were fetched as a nested + * `toArray` child of a user, or joined to a filtered user — the fast-path was + * missed and the rows only became available once the WHOLE `posts` collection + * had finished its background sync. + * + * The fix: when the parent/main side is filtered to statically-known keys, the + * correlation predicate is pushed onto the child/joined collection so it loads + * its subset eagerly, exactly like the direct query. + * + * The test seeds a large `posts` table so the background sync is visibly slow, + * then watches how much of the collection had to load before each query could + * show its (small) target subset: + * + * - subset visible while posts.size is still small -> fast-path used + * - subset visible only once posts.size == full -> fast-path missed + * + * The deterministic counterpart lives in + * packages/db/tests/query/progressive-includes-fastpath.test.ts. + */ + +import { randomUUID } from 'node:crypto' +import { afterAll, beforeAll, describe, expect, inject, it } from 'vitest' +import { + createCollection, + createLiveQueryCollection, + eq, + toArray, +} from '@tanstack/db' +import { electricCollectionOptions } from '../src/electric' +import { makePgClient } from '../../db-collection-e2e/support/global-setup' +import { sleep, waitFor } from '../../db-collection-e2e/src/utils/helpers' +import type { Row } from '@electric-sql/client' +import type { Client } from 'pg' + +const OTHER_USERS = 3 +const POSTS_PER_OTHER_USER = 1500 +const TARGET_POSTS = 40 +const TOTAL_POSTS = OTHER_USERS * POSTS_PER_OTHER_USER + TARGET_POSTS + +type User = { id: string; name: string } +type Post = { id: string; userId: string; title: string } + +describe(`Progressive mode — nested subquery & join fast-path (e2e)`, () => { + let dbClient: Client + let baseUrl: string + let testSchema: string + let usersTable: string + let postsTable: string + let targetUserId: string + + const shapeUrl = () => `${baseUrl}/v1/shape` + + beforeAll(async () => { + baseUrl = inject(`baseUrl`) + testSchema = inject(`testSchema`) + + const testId = Date.now().toString(16) + usersTable = `"users_pn_${testId}"` + postsTable = `"posts_pn_${testId}"` + + dbClient = makePgClient({ options: `-csearch_path=${testSchema}` }) + await dbClient.connect() + await dbClient.query(`SET search_path TO ${testSchema}`) + + await dbClient.query(` + CREATE TABLE ${usersTable} (id UUID PRIMARY KEY, name TEXT NOT NULL) + `) + await dbClient.query(` + CREATE TABLE ${postsTable} ( + id UUID PRIMARY KEY, + "userId" UUID NOT NULL, + title TEXT NOT NULL + ) + `) + + targetUserId = randomUUID() + await dbClient.query( + `INSERT INTO ${usersTable} (id, name) VALUES ($1, $2)`, + [targetUserId, `Target User`], + ) + // Small target subset. + await dbClient.query( + `INSERT INTO ${postsTable} (id, "userId", title) + SELECT gen_random_uuid(), $1, 'Target Post ' || g + FROM generate_series(1, ${TARGET_POSTS}) g`, + [targetUserId], + ) + // Large remainder so the background full sync is visibly slow. + for (let u = 0; u < OTHER_USERS; u++) { + const otherId = randomUUID() + await dbClient.query( + `INSERT INTO ${usersTable} (id, name) VALUES ($1, $2)`, + [otherId, `Other User ${u}`], + ) + await dbClient.query( + `INSERT INTO ${postsTable} (id, "userId", title) + SELECT gen_random_uuid(), $1, 'Post ' || g + FROM generate_series(1, ${POSTS_PER_OTHER_USER}) g`, + [otherId], + ) + } + + // Ensure Electric's replication slot has caught up with the seed data. + const verify = createCollection( + electricCollectionOptions({ + id: `pn-verify-${testId}`, + shapeOptions: { + url: shapeUrl(), + params: { table: `${testSchema}.${postsTable}` }, + }, + syncMode: `eager`, + getKey: (item) => item.id, + startSync: true, + }), + ) + await verify.preload() + await waitFor(() => verify.size >= TOTAL_POSTS, { + timeout: 60000, + interval: 250, + message: `Electric did not replicate seed posts (got ${verify.size}/${TOTAL_POSTS})`, + }) + await verify.cleanup() + }, 120000) + + afterAll(async () => { + await dbClient.query(`DROP TABLE IF EXISTS ${postsTable}`) + await dbClient.query(`DROP TABLE IF EXISTS ${usersTable}`) + await dbClient.end() + }) + + function makeProgressive & { id: string }>( + id: string, + table: string, + ) { + return createCollection( + electricCollectionOptions({ + id, + shapeOptions: { + url: shapeUrl(), + params: { table: `${testSchema}.${table}` }, + }, + syncMode: `progressive`, + getKey: (item) => item.id, + startSync: true, + }), + ) + } + + /** + * Polls until `getSubsetSize()` reaches `TARGET_POSTS` and records how much + * of the `posts` collection had loaded at that moment. A size well below the + * full collection means the fast-path snapshot served the query; a size + * equal to the full collection means the query had to wait for the whole + * background sync. + */ + async function postsLoadedWhenSubsetReady( + posts: { readonly size: number }, + getSubsetSize: () => number, + ): Promise { + const deadline = Date.now() + 60000 + while (Date.now() < deadline) { + if (getSubsetSize() >= TARGET_POSTS) { + return posts.size + } + await sleep(5) + } + throw new Error( + `subset never reached ${TARGET_POSTS} (got ${getSubsetSize()}, posts.size=${posts.size})`, + ) + } + + it(`direct query loads the target subset via the fast-path snapshot`, async () => { + const posts = makeProgressive( + `pn-posts-direct-${Date.now().toString(16)}`, + postsTable, + ) + + const query = createLiveQueryCollection((q) => + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, targetUserId)), + ) + void query.preload() + + const postsLoaded = await postsLoadedWhenSubsetReady( + posts, + () => query.size, + ) + console.log( + `[DIRECT] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await posts.cleanup() + + // Fast path: the query saw its 40 rows long before the full background + // sync of ~4500 rows finished. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) + + it(`nested toArray query loads the target subset via the fast-path snapshot`, async () => { + const suffix = Date.now().toString(16) + const users = makeProgressive(`pn-users-tarray-${suffix}`, usersTable) + const posts = makeProgressive(`pn-posts-tarray-${suffix}`, postsTable) + + const query = createLiveQueryCollection((q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.id, targetUserId)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + posts: toArray( + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, user.id)), + ), + })), + ) + void query.preload() + + const nestedCount = () => { + const row = Array.from(query.values())[0] as + | { posts?: Array } + | undefined + return row?.posts?.length ?? 0 + } + + const postsLoaded = await postsLoadedWhenSubsetReady(posts, nestedCount) + console.log( + `[NESTED toArray] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await users.cleanup() + await posts.cleanup() + + // The nested subset arrives via the fast-path snapshot, well before the + // full background sync of the posts collection completes. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) + + it(`nested join query loads the target subset via the fast-path snapshot`, async () => { + const suffix = Date.now().toString(16) + const users = makeProgressive(`pn-users-join-${suffix}`, usersTable) + const posts = makeProgressive(`pn-posts-join-${suffix}`, postsTable) + + const query = createLiveQueryCollection((q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.id, targetUserId)) + .innerJoin({ post: posts }, ({ user, post }) => + eq(post.userId, user.id), + ) + .select(({ post }) => ({ ...post })), + ) + void query.preload() + + const postsLoaded = await postsLoadedWhenSubsetReady( + posts, + () => query.size, + ) + console.log( + `[NESTED join] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await users.cleanup() + await posts.cleanup() + + // The joined subset arrives via the fast-path snapshot, well before the + // full background sync of the posts collection completes. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) +})