diff --git a/packages/db/src/query/compiler/index.ts b/packages/db/src/query/compiler/index.ts index f4cff548d..397ca042d 100644 --- a/packages/db/src/query/compiler/index.ts +++ b/packages/db/src/query/compiler/index.ts @@ -25,6 +25,7 @@ import { } from '../ir.js' import { ensureIndexForField } from '../../indexes/auto-index.js' import { inArray } from '../builder/functions.js' +import { extractEqualityKeys } from '../expression-helpers.js' import { compileExpression, toBooleanPredicate } from './evaluators.js' import { processJoins } from './joins.js' import { containsAggregate, processGroupBy } from './group-by.js' @@ -38,6 +39,7 @@ import type { IncludesMaterialization, QueryIR, QueryRef, + Where, } from '../ir.js' import type { LazyCollectionCallbacks } from './joins.js' import type { Collection } from '../../collection/index.js' @@ -396,67 +398,98 @@ export function compileQuery( childFromCollection, ) + // When the parent query statically constrains the correlation field to + // known keys (e.g. `.where(eq(parent.id, 5))`), the child subset can be + // loaded eagerly — in parallel with the parent — instead of being + // deferred behind the parent pipeline. Deferring it makes the child + // loadSubset miss a progressive collection's fast-path window. + const parentCorrelationAlias = subquery.correlationField.path[0] + const staticCorrelationKeys = parentCorrelationAlias + ? extractEqualityKeys( + sourceWhereClauses.get(parentCorrelationAlias), + subquery.correlationField.path, + ) + : null + // Non-empty key set => load the child eagerly; otherwise lazy-load it. + const eagerChildKeys = + staticCorrelationKeys && staticCorrelationKeys.length > 0 + ? staticCorrelationKeys + : null + if (followRefResult) { const followRefCollection = followRefResult.collection const fieldPath = followRefResult.path const fieldName = fieldPath[0] - // 1. Mark child source as lazy so CollectionSubscriber skips initial full load - lazySources.add(childCorrelationAlias) - - // 2. Ensure an index on the correlation field for efficient lookups + // Ensure an index on the correlation field for efficient lookups if (fieldName) { ensureIndexForField(fieldName, fieldPath, followRefCollection) } - // 3. Tap parent keys to intercept correlation values and request - // matching child rows on-demand via the child's subscription - parentKeys = parentKeys.pipe( - tap((data: any) => { - const resolvedAlias = - aliasRemapping[childCorrelationAlias] || childCorrelationAlias - const lazySourceSubscription = subscriptions[resolvedAlias] + // When the keys aren't statically known, fall back to lazy loading: + // mark the child source lazy so its CollectionSubscriber skips the + // initial full load, and tap the parent keys to request matching + // child rows on-demand once the parent pipeline produces them. + if (!eagerChildKeys) { + lazySources.add(childCorrelationAlias) + + parentKeys = parentKeys.pipe( + tap((data: any) => { + const resolvedAlias = + aliasRemapping[childCorrelationAlias] || childCorrelationAlias + const lazySourceSubscription = subscriptions[resolvedAlias] + + if (!lazySourceSubscription) { + return + } - if (!lazySourceSubscription) { - return - } + if (lazySourceSubscription.hasLoadedInitialState()) { + return + } - if (lazySourceSubscription.hasLoadedInitialState()) { - return - } + const joinKeys = [ + ...new Set( + data + .getInner() + .map( + ([[correlationValue]]: any) => + correlationValue as unknown, + ) + .filter((joinKey: unknown) => joinKey != null), + ), + ] + + if (joinKeys.length === 0) { + return + } - const joinKeys = [ - ...new Set( - data - .getInner() - .map( - ([[correlationValue]]: any) => correlationValue as unknown, - ) - .filter((key: unknown) => key != null), - ), - ] - - if (joinKeys.length === 0) { - return - } + const lazyJoinRef = new PropRef(fieldPath) + lazySourceSubscription.requestSnapshot({ + where: inArray(lazyJoinRef, joinKeys), + }) + }), + ) + } + } - const lazyJoinRef = new PropRef(fieldPath) - lazySourceSubscription.requestSnapshot({ - where: inArray(lazyJoinRef, joinKeys), - }) - }), + // Extra WHERE clauses appended to the child query: parent-referencing + // filters (applied post-join) and, when the parent keys are statically + // known, an eager correlation predicate so the child loads its subset + // immediately via the normal (non-lazy) subscription path. + const extraChildWhere: Array = [] + if (subquery.parentFilters && subquery.parentFilters.length > 0) { + extraChildWhere.push(...subquery.parentFilters) + } + if (eagerChildKeys) { + extraChildWhere.push( + inArray(subquery.childCorrelationField, eagerChildKeys), ) } - - // If parent filters exist, append them to the child query's WHERE const childQuery = - subquery.parentFilters && subquery.parentFilters.length > 0 + extraChildWhere.length > 0 ? { ...subquery.query, - where: [ - ...(subquery.query.where || []), - ...subquery.parentFilters, - ], + where: [...(subquery.query.where || []), ...extraChildWhere], } : subquery.query diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 58db91257..ddfe46f35 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -14,7 +14,8 @@ import { import { normalizeValue } from '../../utils/comparison.js' import { ensureIndexForField } from '../../indexes/auto-index.js' import { PropRef, followRef } from '../ir.js' -import { inArray } from '../builder/functions.js' +import { and, inArray } from '../builder/functions.js' +import { extractEqualityKeys } from '../expression-helpers.js' import { compileExpression } from './evaluators.js' import type { CompileQueryFn } from './index.js' import type { OrderByOptimizationInfo } from './order-by.js' @@ -221,7 +222,35 @@ function processJoin( throw new UnsupportedJoinTypeError(joinClause.type) } - if (activeSource) { + // When the query statically constrains the main side of the join condition + // to known keys, filter the joined side to the same keys so it loads only + // the matching subset eagerly. This avoids deferring it behind a + // lazy-loading tap, which would miss a progressive collection's fast-path + // window. Sound only for inner/left joins (see deriveStaticJoinFilter). + const canFilterJoinedSide = + joinClause.type === `inner` || joinClause.type === `left` + const staticJoinFilter = canFilterJoinedSide + ? deriveStaticJoinFilter( + mainExpr, + joinedExpr, + joinedSource, + isCollectionRef, + sourceWhereClauses, + ) + : null + if (staticJoinFilter) { + const existing = sourceWhereClauses.get(staticJoinFilter.alias) + sourceWhereClauses.set( + staticJoinFilter.alias, + existing + ? and(existing, staticJoinFilter.filter) + : staticJoinFilter.filter, + ) + } + + // Skip lazy loading when the joined side is already statically filtered — + // it now loads its subset eagerly via the normal subscription path. + if (activeSource && !staticJoinFilter) { // If the lazy collection comes from a subquery that has a limit and/or an offset clause // then we need to deoptimize the join because we don't know which rows are in the result set // since we simply lookup matching keys in the index but the index contains all rows @@ -660,3 +689,47 @@ function getActiveAndLazySources( return { activeSource: undefined, lazySource: undefined } } } + +/** + * When the query statically constrains the main side of an inner/left join to + * a known set of keys (e.g. `.where(eq(group.id, 5))`), derives an equality + * filter for the joined side of the join condition. + * + * Applying that filter lets the joined collection load only the matching + * subset eagerly, instead of being deferred behind a lazy-loading tap (which + * misses a progressive collection's fast-path window) or loaded in full as the + * join's "active" side. + * + * Only sound for `inner` and `left` joins, where a joined row is needed solely + * when it matches a main row — `right`/`full` joins keep unmatched joined rows. + * Only plain collection join sources are filtered; subquery sources keep the + * existing lazy-loading behaviour. + * + * @returns The joined alias and the derived predicate, or `null` when the main + * side is not statically constrained. + */ +function deriveStaticJoinFilter( + mainExpr: BasicExpression, + joinedExpr: BasicExpression, + joinedSource: string, + joinedSideIsCollection: boolean, + sourceWhereClauses: Map>, +): { alias: string; filter: BasicExpression } | null { + if ( + !joinedSideIsCollection || + mainExpr.type !== `ref` || + joinedExpr.type !== `ref` + ) { + return null + } + + const mainKeys = extractEqualityKeys( + sourceWhereClauses.get(mainExpr.path[0]!), + mainExpr.path, + ) + if (mainKeys && mainKeys.length > 0) { + return { alias: joinedSource, filter: inArray(joinedExpr, mainKeys) } + } + + return null +} diff --git a/packages/db/src/query/expression-helpers.ts b/packages/db/src/query/expression-helpers.ts index 9b79159e0..419c1d01f 100644 --- a/packages/db/src/query/expression-helpers.ts +++ b/packages/db/src/query/expression-helpers.ts @@ -520,3 +520,78 @@ export function parseLoadSubsetOptions( limit: options.limit, } } + +/** + * Determines whether a WHERE expression constrains the field at `fieldPath` to + * a fixed, statically-known set of literal values. + * + * Recognises a top-level `eq` or `in` comparison against the field — possibly + * nested inside `and` — and returns the literal value set. Returns `null` when + * the field is not statically constrained (for example it is compared against + * another column, or the predicate uses `or`). + * + * Used to push correlation predicates eagerly onto lazily-loaded join/includes + * children: when the parent side is filtered to known keys, the child subset + * can be loaded immediately instead of waiting for the parent query to resolve. + * + * @param where - The WHERE expression to inspect + * @param fieldPath - The full path of the field (alias included, e.g. `['user', 'id']`) + * @returns The literal value set, or `null` when not statically constrained + * + * @example + * ```typescript + * // eq(user.id, 5) -> [5] + * // in(user.id, [1, 2, 3]) -> [1, 2, 3] + * // and(eq(user.id, 5), ...) -> [5] + * // or(...) / eq(user.id, x) -> null + * ``` + */ +export function extractEqualityKeys( + where: BasicExpression | undefined, + fieldPath: ReadonlyArray, +): Array | null { + if (!where || where.type !== `func`) { + return null + } + + const matchesField = (expr: BasicExpression): boolean => + expr.type === `ref` && + expr.path.length === fieldPath.length && + expr.path.every((segment, index) => segment === fieldPath[index]) + + const { name, args } = where + + if (name === `and`) { + for (const arg of args) { + const keys = extractEqualityKeys( + arg as BasicExpression, + fieldPath, + ) + if (keys) { + return keys + } + } + return null + } + + if (name === `eq` && args.length === 2) { + const [lhs, rhs] = args as [BasicExpression, BasicExpression] + if (matchesField(lhs) && rhs.type === `val`) { + return [rhs.value] + } + if (matchesField(rhs) && lhs.type === `val`) { + return [lhs.value] + } + return null + } + + if (name === `in` && args.length === 2) { + const [lhs, rhs] = args as [BasicExpression, BasicExpression] + if (matchesField(lhs) && rhs.type === `val` && Array.isArray(rhs.value)) { + return [...rhs.value] + } + return null + } + + return null +} diff --git a/packages/db/tests/query/progressive-includes-fastpath.test.ts b/packages/db/tests/query/progressive-includes-fastpath.test.ts new file mode 100644 index 000000000..fea9004cc --- /dev/null +++ b/packages/db/tests/query/progressive-includes-fastpath.test.ts @@ -0,0 +1,175 @@ +import { describe, expect, it, vi } from 'vitest' +import { createLiveQueryCollection, eq, toArray } from '../../src/query/index.js' +import { createCollection } from '../../src/collection/index.js' +import { extractSimpleComparisons } from '../../src/query/expression-helpers.js' +import { flushPromises } from '../utils.js' +import type { LoadSubsetOptions } from '../../src/types.js' + +/** + * Regression test for a progressive-sync-mode bug with nested `toArray` + * subqueries. + * + * In `progressive` sync mode an Electric collection serves a fast-path + * `fetchSnapshot` for any `loadSubset` call that arrives BEFORE its first + * `up-to-date` message (`isBufferingInitialSync()` in electric.ts). After that + * the window is closed and the data is only available once the full background + * sync finishes. + * + * - Direct query (`q.from({post}).where(eq(post.userId, X))`): + * the collection subscriber issues `loadSubset` immediately, inside the + * window -> fast path works. + * + * - Nested query (`post` fetched via `toArray` inside a user's `select`): + * the BUG was that includes lazy-loading deferred the child `loadSubset` + * until the parent `users` query produced rows. The fix: when the parent + * query statically constrains the correlation field (`eq(user.id, X)`), + * the child subset is loaded eagerly — in parallel with the parent — so it + * still lands inside the fast-path window. + * + * This is the deterministic proof. The fast-path window is modelled with a + * plain `@tanstack/db` collection whose `loadSubset` records whether it was + * called while the window was open — the deferral it exposes is real + * `@tanstack/db` behaviour, only the window is simulated. The same scenario + * against a real Electric server is verified end-to-end by + * packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts. + */ + +type User = { id: number; name: string } +type Post = { id: number; userId: number; title: string } + +const users: Array = [ + { id: 1, name: `U1` }, + { id: 2, name: `U2` }, + { id: 3, name: `U3` }, +] + +let seq = 0 + +/** + * A collection whose initial sync is gated: nothing is committed and the + * collection is not `markReady()` until `release()` is called. Mirrors a + * parent collection that takes a moment to sync. + */ +function makeGatedUsers() { + let release: () => void = () => {} + const gate = new Promise((resolve) => { + release = resolve + }) + const collection = createCollection({ + id: `fastpath-users-${seq++}`, + getKey: (u) => u.id, + sync: { + sync: ({ begin, write, commit, markReady }) => { + void (async () => { + await gate + begin() + for (const u of users) write({ type: `insert`, value: u }) + commit() + markReady() + })() + }, + }, + }) + return { collection, release } +} + +/** + * Models an Electric collection in `progressive` sync mode. + * + * `windowOpen` mirrors `isBufferingInitialSync()`: the progressive fast-path + * (`fetchSnapshot`) only runs while it is `true`. `closeWindow()` simulates the + * first `up-to-date` message arriving, after which `loadSubset` calls can no + * longer be served from a snapshot. + */ +function makeProgressivePosts() { + let windowOpen = true + const fastPathLoads: Array = [] + const lateLoads: Array = [] + + const collection = createCollection({ + id: `fastpath-posts-${seq++}`, + getKey: (p) => p.id, + syncMode: `on-demand`, + sync: { + sync: ({ begin, commit, markReady }) => { + begin() + commit() + markReady() + return { + loadSubset: vi.fn((opts: LoadSubsetOptions) => { + ;(windowOpen ? fastPathLoads : lateLoads).push(opts) + return Promise.resolve() + }), + } + }, + }, + }) + + return { + collection, + fastPathLoads, + lateLoads, + closeWindow: () => { + windowOpen = false + }, + } +} + +describe(`progressive sync — nested subquery fast-path`, () => { + it(`direct query: posts loadSubset runs INSIDE the fast-path window`, async () => { + const { collection: posts, fastPathLoads, lateLoads } = + makeProgressivePosts() + + const query = createLiveQueryCollection((q) => + q.from({ post: posts }).where(({ post }) => eq(post.userId, 2)), + ) + const preloadPromise = query.preload() + await flushPromises() + + // The collection subscriber issues loadSubset immediately — before the + // progressive collection's buffering window could close. + expect(fastPathLoads.length).toBeGreaterThan(0) + expect(lateLoads.length).toBe(0) + + await preloadPromise + }) + + it(`nested toArray child: posts loadSubset runs eagerly, INSIDE the fast-path window`, async () => { + // The parent `users` collection is gated and never released, so its data + // never arrives. Before the fix the child `posts` loadSubset was deferred + // behind the parent and would never run; with the fix it runs eagerly + // because the parent filters the correlation field to a known key. + const { collection: gatedUsers } = makeGatedUsers() + const { collection: posts, fastPathLoads, lateLoads } = + makeProgressivePosts() + + const query = createLiveQueryCollection((q) => + q + .from({ user: gatedUsers }) + .where(({ user }) => eq(user.id, 2)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + posts: toArray( + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, user.id)), + ), + })), + ) + // Not awaited: preload never resolves while the parent stays gated. + void query.preload() + await flushPromises() + + // The child loadSubset ran eagerly — inside the fast-path window — even + // though the parent `users` collection has not synced. + expect(fastPathLoads.length).toBeGreaterThan(0) + expect(lateLoads.length).toBe(0) + + // It was scoped to the parent's statically-known correlation key. + const lastLoad = fastPathLoads[fastPathLoads.length - 1]! + expect(extractSimpleComparisons(lastLoad.where)).toEqual([ + { field: [`userId`], operator: `in`, value: [2] }, + ]) + }) +}) diff --git a/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts b/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts new file mode 100644 index 000000000..d51dca385 --- /dev/null +++ b/packages/electric-db-collection/e2e/progressive-nested.e2e.test.ts @@ -0,0 +1,289 @@ +/** + * Progressive sync mode — nested subquery & join fast-path (end-to-end) + * + * Regression test against a real Electric server. + * + * In `progressive` sync mode an Electric collection loads a fast-path snapshot + * of the rows a query needs FIRST, then the rest of the collection in the + * background. This works when a collection is queried directly: + * + * q.from({ post: posts }).where(({ post }) => eq(post.userId, X)) + * + * It previously did NOT work when the same `posts` were fetched as a nested + * `toArray` child of a user, or joined to a filtered user — the fast-path was + * missed and the rows only became available once the WHOLE `posts` collection + * had finished its background sync. + * + * The fix: when the parent/main side is filtered to statically-known keys, the + * correlation predicate is pushed onto the child/joined collection so it loads + * its subset eagerly, exactly like the direct query. + * + * The test seeds a large `posts` table so the background sync is visibly slow, + * then watches how much of the collection had to load before each query could + * show its (small) target subset: + * + * - subset visible while posts.size is still small -> fast-path used + * - subset visible only once posts.size == full -> fast-path missed + * + * The deterministic counterpart lives in + * packages/db/tests/query/progressive-includes-fastpath.test.ts. + */ + +import { randomUUID } from 'node:crypto' +import { afterAll, beforeAll, describe, expect, inject, it } from 'vitest' +import { + createCollection, + createLiveQueryCollection, + eq, + toArray, +} from '@tanstack/db' +import { electricCollectionOptions } from '../src/electric' +import { makePgClient } from '../../db-collection-e2e/support/global-setup' +import { sleep, waitFor } from '../../db-collection-e2e/src/utils/helpers' +import type { Row } from '@electric-sql/client' +import type { Client } from 'pg' + +const OTHER_USERS = 3 +const POSTS_PER_OTHER_USER = 1500 +const TARGET_POSTS = 40 +const TOTAL_POSTS = OTHER_USERS * POSTS_PER_OTHER_USER + TARGET_POSTS + +type User = { id: string; name: string } +type Post = { id: string; userId: string; title: string } + +describe(`Progressive mode — nested subquery & join fast-path (e2e)`, () => { + let dbClient: Client + let baseUrl: string + let testSchema: string + let usersTable: string + let postsTable: string + let targetUserId: string + + const shapeUrl = () => `${baseUrl}/v1/shape` + + beforeAll(async () => { + baseUrl = inject(`baseUrl`) + testSchema = inject(`testSchema`) + + const testId = Date.now().toString(16) + usersTable = `"users_pn_${testId}"` + postsTable = `"posts_pn_${testId}"` + + dbClient = makePgClient({ options: `-csearch_path=${testSchema}` }) + await dbClient.connect() + await dbClient.query(`SET search_path TO ${testSchema}`) + + await dbClient.query(` + CREATE TABLE ${usersTable} (id UUID PRIMARY KEY, name TEXT NOT NULL) + `) + await dbClient.query(` + CREATE TABLE ${postsTable} ( + id UUID PRIMARY KEY, + "userId" UUID NOT NULL, + title TEXT NOT NULL + ) + `) + + targetUserId = randomUUID() + await dbClient.query( + `INSERT INTO ${usersTable} (id, name) VALUES ($1, $2)`, + [targetUserId, `Target User`], + ) + // Small target subset. + await dbClient.query( + `INSERT INTO ${postsTable} (id, "userId", title) + SELECT gen_random_uuid(), $1, 'Target Post ' || g + FROM generate_series(1, ${TARGET_POSTS}) g`, + [targetUserId], + ) + // Large remainder so the background full sync is visibly slow. + for (let u = 0; u < OTHER_USERS; u++) { + const otherId = randomUUID() + await dbClient.query( + `INSERT INTO ${usersTable} (id, name) VALUES ($1, $2)`, + [otherId, `Other User ${u}`], + ) + await dbClient.query( + `INSERT INTO ${postsTable} (id, "userId", title) + SELECT gen_random_uuid(), $1, 'Post ' || g + FROM generate_series(1, ${POSTS_PER_OTHER_USER}) g`, + [otherId], + ) + } + + // Ensure Electric's replication slot has caught up with the seed data. + const verify = createCollection( + electricCollectionOptions({ + id: `pn-verify-${testId}`, + shapeOptions: { + url: shapeUrl(), + params: { table: `${testSchema}.${postsTable}` }, + }, + syncMode: `eager`, + getKey: (item) => item.id, + startSync: true, + }), + ) + await verify.preload() + await waitFor(() => verify.size >= TOTAL_POSTS, { + timeout: 60000, + interval: 250, + message: `Electric did not replicate seed posts (got ${verify.size}/${TOTAL_POSTS})`, + }) + await verify.cleanup() + }, 120000) + + afterAll(async () => { + await dbClient.query(`DROP TABLE IF EXISTS ${postsTable}`) + await dbClient.query(`DROP TABLE IF EXISTS ${usersTable}`) + await dbClient.end() + }) + + function makeProgressive & { id: string }>( + id: string, + table: string, + ) { + return createCollection( + electricCollectionOptions({ + id, + shapeOptions: { + url: shapeUrl(), + params: { table: `${testSchema}.${table}` }, + }, + syncMode: `progressive`, + getKey: (item) => item.id, + startSync: true, + }), + ) + } + + /** + * Polls until `getSubsetSize()` reaches `TARGET_POSTS` and records how much + * of the `posts` collection had loaded at that moment. A size well below the + * full collection means the fast-path snapshot served the query; a size + * equal to the full collection means the query had to wait for the whole + * background sync. + */ + async function postsLoadedWhenSubsetReady( + posts: { readonly size: number }, + getSubsetSize: () => number, + ): Promise { + const deadline = Date.now() + 60000 + while (Date.now() < deadline) { + if (getSubsetSize() >= TARGET_POSTS) { + return posts.size + } + await sleep(5) + } + throw new Error( + `subset never reached ${TARGET_POSTS} (got ${getSubsetSize()}, posts.size=${posts.size})`, + ) + } + + it(`direct query loads the target subset via the fast-path snapshot`, async () => { + const posts = makeProgressive( + `pn-posts-direct-${Date.now().toString(16)}`, + postsTable, + ) + + const query = createLiveQueryCollection((q) => + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, targetUserId)), + ) + void query.preload() + + const postsLoaded = await postsLoadedWhenSubsetReady( + posts, + () => query.size, + ) + console.log( + `[DIRECT] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await posts.cleanup() + + // Fast path: the query saw its 40 rows long before the full background + // sync of ~4500 rows finished. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) + + it(`nested toArray query loads the target subset via the fast-path snapshot`, async () => { + const suffix = Date.now().toString(16) + const users = makeProgressive(`pn-users-tarray-${suffix}`, usersTable) + const posts = makeProgressive(`pn-posts-tarray-${suffix}`, postsTable) + + const query = createLiveQueryCollection((q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.id, targetUserId)) + .select(({ user }) => ({ + id: user.id, + name: user.name, + posts: toArray( + q + .from({ post: posts }) + .where(({ post }) => eq(post.userId, user.id)), + ), + })), + ) + void query.preload() + + const nestedCount = () => { + const row = Array.from(query.values())[0] as + | { posts?: Array } + | undefined + return row?.posts?.length ?? 0 + } + + const postsLoaded = await postsLoadedWhenSubsetReady(posts, nestedCount) + console.log( + `[NESTED toArray] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await users.cleanup() + await posts.cleanup() + + // The nested subset arrives via the fast-path snapshot, well before the + // full background sync of the posts collection completes. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) + + it(`nested join query loads the target subset via the fast-path snapshot`, async () => { + const suffix = Date.now().toString(16) + const users = makeProgressive(`pn-users-join-${suffix}`, usersTable) + const posts = makeProgressive(`pn-posts-join-${suffix}`, postsTable) + + const query = createLiveQueryCollection((q) => + q + .from({ user: users }) + .where(({ user }) => eq(user.id, targetUserId)) + .innerJoin({ post: posts }, ({ user, post }) => + eq(post.userId, user.id), + ) + .select(({ post }) => ({ ...post })), + ) + void query.preload() + + const postsLoaded = await postsLoadedWhenSubsetReady( + posts, + () => query.size, + ) + console.log( + `[NESTED join] posts loaded when subset ready: ${postsLoaded}/${TOTAL_POSTS}`, + ) + + await waitFor(() => posts.status === `ready`, { timeout: 60000 }) + await query.cleanup() + await users.cleanup() + await posts.cleanup() + + // The joined subset arrives via the fast-path snapshot, well before the + // full background sync of the posts collection completes. + expect(postsLoaded).toBeLessThan(TOTAL_POSTS) + }, 90000) +}) diff --git a/packages/electric-db-collection/vitest.e2e.config.ts b/packages/electric-db-collection/vitest.e2e.config.ts index 903f46de5..cf7a2ebea 100644 --- a/packages/electric-db-collection/vitest.e2e.config.ts +++ b/packages/electric-db-collection/vitest.e2e.config.ts @@ -6,6 +6,6 @@ export default defineConfig({ globalSetup: `../db-collection-e2e/support/global-setup.ts`, fileParallelism: false, // Critical for shared database testTimeout: 30000, - environment: `jsdom`, + environment: `node`, }, })