Skip to content

Network Control

Synced-store's test harness gives you full control over network timing, letting you test race conditions, message reordering, offline behavior, and multi-client convergence — all deterministically, with no real timers or real network connections.

Controlling the Network

By default, createTestHarness auto-flushes all network messages so tests don't need to think about timing. Set autoFlush: false to take manual control:

typescript
import { test, expect } from "bun:test";
import { createTestHarness } from "@synced-store/client/test-utils";

const harness = createTestHarness({ mutators, autoFlush: false });
const { client, transport } = harness.createClient();

Once auto-flush is disabled, messages queue up on both the outbound (client-to-server) and inbound (server-to-client) sides. You decide when each message gets delivered.

Disabling and Enabling Auto-Flush Per Direction

You can toggle auto-flush independently for each direction:

typescript
// Stop outbound messages (client → server)
transport.outbound.disableAutoFlush();

// Stop inbound messages (server → client)
transport.inbound.disableAutoFlush();

// Re-enable when done
transport.outbound.enableAutoFlush();
transport.inbound.enableAutoFlush();

Flushing Messages

MethodEffect
transport.outbound.flushNext()Send the next queued outbound message
transport.inbound.flushOne()Deliver the next inbound message
transport.inbound.flushAll()Deliver all queued inbound messages
transport.step()One full round trip (flush outbound, then inbound)
transport.flushUntil(promise)Keep flushing until a promise resolves

Basic Example

typescript
test("manual network control", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client, transport } = harness.createClient();

  transport.outbound.disableAutoFlush();
  transport.inbound.disableAutoFlush();

  // Mutation is queued locally but not sent
  await client.mutate.setValue({ key: "test", value: "data" });
  expect(transport.outbound.queue.length).toBe(1);

  // Manually flush the request to the server
  await transport.outbound.flushOne();

  // Deliver the server's response(s) back to the client
  await transport.inbound.flushAll();
});

TIP

Use transport.flushUntil(client.waitForServerData()) to bring a client through its initial pull handshake before starting your test scenario. This is the standard way to "initialize" a client in manual-flush tests.

Testing Optimistic Updates

Mutations are applied to the local store immediately, before any network round trip. This lets you verify that the UI would update instantly:

typescript
test("optimistic updates visible immediately", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client } = harness.createClient();
  client.network.outbound.disableAutoFlush();

  await client.mutate.setValue({ key: "test", value: "optimistic" });

  // Value is available locally even though nothing has been sent
  const value = await client.query((tx) => tx.table("main").get("test"));
  expect(value).toBe("optimistic");
});

Testing Race Conditions

Real networks deliver messages out of order. The test harness lets you reproduce these scenarios deterministically.

Poke Arrives Before Push Response

A common race condition: the server broadcasts a poke (containing the result of our own mutation) before the client receives the push response acknowledging that mutation. Use flushMatching to deliver messages in a specific order:

typescript
import { hasInboundType } from "../test-utils/protocol-helpers";

test("poke arrives before push response", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client, transport } = harness.createClient();
  await transport.flushUntil(client.waitForServerData());

  transport.outbound.disableAutoFlush();
  transport.inbound.disableAutoFlush();

  const { confirmed } = await client.mutate.setValue({
    key: "race",
    value: "value",
  });

  // Send the push request to the server
  await transport.outbound.flushNext();

  // Deliver the poke BEFORE the push response
  await transport.inbound.flushMatching(hasInboundType("poke"));
  await transport.inbound.flushMatching(hasInboundType("push_response"));

  await confirmed;
  const value = await client.query((tx) => tx.table("main").get("race"));
  expect(value).toBe("value");
  expect(client.getPendingCount()).toBe(0);
});

TIP

flushMatching reorders the queue to deliver the first message matching the predicate, leaving all other messages in place. This is the key tool for testing message reordering.

Concurrent Mutations from Multiple Clients

Test that two clients mutating simultaneously converge to the correct state:

typescript
test("concurrent mutations from multiple clients", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client: client1, transport: t1 } = harness.createClient();
  const { client: client2, transport: t2 } = harness.createClient();

  // Initialize both clients
  await Promise.all([
    t1.flushUntil(client1.waitForServerData()),
    t2.flushUntil(client2.waitForServerData()),
  ]);

  // Both clients mutate before either flushes
  await client1.mutate.increment({ key: "counter", amount: 1 });
  await client2.mutate.increment({ key: "counter", amount: 1 });

  // Step all clients through their network queues
  await harness.networkManager.stepAll();

  const value = await client1.query((tx) => tx.table("main").get("counter"));
  expect(value).toBe(2);
});

Simulating Network Failures

Disconnect and Reconnect

Use simulateDisconnect to drop the connection and simulateReconnect to restore it. Pending mutations are preserved and retried on reconnect:

typescript
test("push response lost - mutation retries on reconnect", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client, transport } = harness.createClient();
  await transport.flushUntil(client.waitForServerData());

  transport.outbound.disableAutoFlush();
  transport.inbound.disableAutoFlush();

  await client.mutate.setValue({ key: "test", value: "original" });

  // Server receives and processes the push...
  await transport.outbound.flushNext();

  // ...but the response is lost due to a network failure
  transport.simulateDisconnect({ rejectPending: true });
  expect(client.getPendingCount()).toBe(1);

  // Client reconnects and retries the unconfirmed mutation
  transport.simulateReconnect();
  transport.enableAutoFlush();
  await client.push();
  await transport.step();

  expect(client.getPendingCount()).toBe(0);
});

TIP

Pass { rejectPending: true } to simulateDisconnect to reject all in-flight requests, simulating a hard network failure. Without it, pending requests stay unresolved (simulating a hung connection).

Offline Mutations with Late Sync

Test a client going offline, making mutations, and syncing with another client that joined while the first was disconnected. Use blockReconnect: true to keep the client offline until you explicitly reconnect:

typescript
test("offline mutations sync after reconnect", async () => {
  const harness = createTestHarness({ mutators, autoFlush: false });
  const { client: client1, transport: t1 } = harness.createClient();
  await t1.flushUntil(client1.waitForServerData());

  // Client 1 goes offline
  t1.simulateDisconnect({ rejectPending: true, blockReconnect: true });

  // Client 1 makes mutations while offline
  await client1.mutate.setValue({ key: "offline", value: "from-client1" });
  expect(client1.getPendingCount()).toBe(1);

  // Meanwhile, client 2 joins and makes its own mutations
  const { client: client2, transport: t2 } = harness.createClient();
  await t2.flushUntil(client2.waitForServerData());
  await client2.mutate.setValue({ key: "online", value: "from-client2" });
  await t2.step();

  // Client 1 comes back online
  t1.simulateReconnect();
  t1.enableAutoFlush();
  await client1.push();
  await t1.step();

  // Both clients converge
  const offlineVal = await client2.query((tx) =>
    tx.table("main").get("offline"),
  );
  const onlineVal = await client1.query((tx) =>
    tx.table("main").get("online"),
  );
  expect(offlineVal).toBe("from-client1");
  expect(onlineVal).toBe("from-client2");
});

Protocol-Aware Queue Introspection

When you need to inspect or filter queued messages by their protocol type (e.g., poke, push_response, pull_response), use the protocol-helpers:

typescript
import { hasInboundType } from "../test-utils/protocol-helpers";

// Check if a specific message type is queued
const hasPoke = transport.inbound.queue.some(hasInboundType("poke"));

// Flush only messages matching a predicate
await transport.inbound.flushMatching(hasInboundType("poke"));

// Remove a message from the queue without delivering it
transport.inbound.remove(hasInboundType("push_response"));

This is particularly useful for race condition tests where you need to deliver messages in a specific order, or drop specific messages to simulate partial network failures.

API Quick Reference

MethodDescription
autoFlush: falseDisable automatic message delivery for the harness
transport.outbound.disableAutoFlush()Pause outbound (client-to-server) messages
transport.outbound.enableAutoFlush()Resume outbound messages
transport.outbound.flushNext()Send one queued outbound message
transport.inbound.disableAutoFlush()Pause inbound (server-to-client) messages
transport.inbound.enableAutoFlush()Resume inbound messages
transport.inbound.flushOne()Deliver one inbound message
transport.inbound.flushAll()Deliver all inbound messages
transport.inbound.flushMatching(fn)Deliver the first message matching a predicate
transport.inbound.remove(fn)Remove a message without delivering it
transport.step()Process one full round trip
transport.flushUntil(promise)Flush both directions until a promise resolves
transport.enableAutoFlush()Re-enable automatic delivery for both directions
transport.simulateDisconnect(opts)Simulate network failure
transport.simulateReconnect()Reconnect after simulated failure
harness.networkManager.stepAll()Step all clients' networks simultaneously
client.getPendingCount()Count unconfirmed local mutations
client.push()Trigger a push of pending mutations
client.waitForServerData()Promise that resolves after initial pull