Skip to content
Strata Sync

AI agents: fetch the documentation index at llms.txt. Markdown versions are available by appending .md to any page URL, including this page's markdown.

@stratasync/transport-graphql

GraphQL and REST transport adapter for sync communication, delta subscriptions, and Yjs collaborative editing in Strata Sync.

Implements TransportAdapter with NDJSON bootstrap streaming, REST/GraphQL mutations, WebSocket delta subscriptions, and Yjs collaborative editing: all with built-in retry and exponential backoff.

What it provides

  • createGraphQLTransport: factory function that creates a configured TransportAdapter instance
  • GraphQLTransportAdapter: the class implementing the full TransportAdapter interface
  • WebSocketManager: manages WebSocket connections for delta subscriptions with automatic reconnection
  • YjsTransportAdapter: bridges Yjs protocol messages over the WebSocket connection
  • Bootstrap streams: createBootstrapStream and createBatchLoadStream for NDJSON streaming
  • Delta fetching: fetchDeltas and fetchAllDeltas for REST-based catch-up
  • Mutation sending: sendMutations (GraphQL) and sendRestMutations (REST)
  • Retry utilities: retryWithBackoff, calculateBackoff, fetchWithTimeout, error classification helpers

Installation

npm install @stratasync/transport-graphql

@stratasync/transport-graphql has no peer dependencies. It depends on @stratasync/core and @stratasync/y-doc as workspace dependencies.

Quick start

import { createSyncClient } from "@stratasync/client";
import { createGraphQLTransport } from "@stratasync/transport-graphql";

const transport = createGraphQLTransport({
  endpoint: "https://api.example.com/graphql",
  syncEndpoint: "https://api.example.com/sync",
  wsEndpoint: "wss://api.example.com/sync/ws",
  auth: {
    getAccessToken: () => getTokenFromStore(),
    refreshToken: () => refreshAccessToken(),
  },
});

const client = createSyncClient({
  transport,
  // ... storage, reactivity
});

TransportAdapter interface

MethodSignatureDescription
bootstrap(options: BootstrapOptions) => AsyncGenerator<ModelRow, BootstrapMetadata>Streams initial data via NDJSON.
batchLoad(options: BatchLoadOptions) => AsyncIterable<ModelRow>Batch loads specific model instances.
mutate(batch: TransactionBatch) => Promise<MutateResult>Sends a batch of mutations.
subscribe(options: SubscribeOptions) => DeltaSubscriptionSubscribes to real-time delta updates via WebSocket.
fetchDeltas(after: number, limit?: number) => Promise<DeltaPacket>Fetches deltas via REST for catch-up.
getConnectionState() => ConnectionStateReturns the WebSocket connection state.
onConnectionStateChange(callback) => () => voidSubscribes to connection state changes.
close() => Promise<void>Closes the WebSocket connection.

Configuration

TransportOptions

interface TransportOptions {
  /** GraphQL endpoint URL */
  endpoint: string;
  /** Base REST sync endpoint (such as https://api.example.com/sync) */
  syncEndpoint: string;
  /** WebSocket endpoint for subscriptions */
  wsEndpoint: string;
  /** Authentication provider */
  auth: AuthProvider;
  /** GraphQL mutation builder (optional -- uses REST mutations if omitted) */
  mutationBuilder?: GraphQLMutationBuilder;
  /** Request timeout in milliseconds */
  timeout?: number;
  /** Retry configuration */
  retry?: RetryConfig;
  /** Custom HTTP headers */
  headers?: Record<string, string>;
  /** Custom WebSocket implementation (for non-browser environments) */
  webSocketFactory?: typeof WebSocket;
}

AuthProvider

Called before each HTTP request and WebSocket subscription:

interface AuthProvider {
  /** Gets the current access token */
  getAccessToken(): Promise<string | null>;
  /** Refreshes the access token (optional) */
  refreshToken?(): Promise<string | null>;
  /** Called when auth fails (optional) */
  onAuthError?(error: Error): void;
}

RetryConfig

Controls retry for HTTP requests and WebSocket reconnection:

interface RetryConfig {
  /** Maximum number of retries */
  maxRetries: number;
  /** Base delay in milliseconds */
  baseDelay: number;
  /** Maximum delay in milliseconds */
  maxDelay: number;
  /** Jitter factor (0-1) for randomizing delays */
  jitter?: number;
}

Default configuration:

const DEFAULT_RETRY_CONFIG: RetryConfig = {
  maxRetries: 3,
  baseDelay: 1000,
  maxDelay: 30_000,
  jitter: 0.2,
};

Bootstrap streaming

Streams initial data as NDJSON from the server:

const stream = transport.bootstrap({
  type: "full",
  schemaHash: "abc123",
  syncGroups: ["workspace-1"],
});

for await (const row of stream) {
  // row: { modelName: string, data: Record<string, unknown> }
  await storage.put(row.modelName, row.data);
}

// The return value contains metadata
const metadata = await stream.return(); // BootstrapMetadata

batchLoad fetches specific model instances:

const stream = transport.batchLoad({
  firstSyncId: 12345,
  requests: [
    { modelName: "Task", groupId: "team-abc" },
    { modelName: "Comment", indexedKey: "taskId", keyValue: "task-123" },
  ],
});

for await (const row of stream) {
  await storage.put(row.modelName, row.data);
}

Mutations

Two modes: REST (default) and GraphQL.

REST mutations (default)

Without a mutationBuilder, mutations are sent as JSON to {syncEndpoint}/mutate.

GraphQL mutations

With a mutationBuilder, mutations are sent as a GraphQL document. The builder receives each transaction and returns a GraphQLMutationSpec.

import type {
  GraphQLMutationBuilder,
  GraphQLMutationSpec,
} from "@stratasync/transport-graphql";

const mutationBuilder: GraphQLMutationBuilder = (transaction, index) => {
  const spec: GraphQLMutationSpec = {
    mutation: `taskUpdate(input: $input${index}) { syncId success }`,
    variables: { [`input${index}`]: transaction.payload },
    variableTypes: { [`input${index}`]: "TaskUpdateInput!" },
  };
  return spec;
};

const transport = createGraphQLTransport({
  mutationBuilder,
  // ...
});

GraphQLMutationSpec has three properties:

PropertyTypeDescription
mutationstringGraphQL field invocation (such as taskUpdate(input: $input0) { syncId }).
variablesRecord<string, unknown>Variables used by the mutation.
variableTypesRecord<string, string>GraphQL variable type definitions.

WebSocket subscriptions

Real-time delta streaming over WebSocket:

const subscription = transport.subscribe({
  afterSyncId: 12345,
  groups: ["workspace-1"],
});

for await (const packet of subscription) {
  // packet: DeltaPacket { actions: SyncAction[], lastSyncId: number, hasMore: boolean }
  for (const action of packet.actions) {
    await applyAction(action);
  }
}

// Clean up
subscription.unsubscribe();

WebSocketManager

Handles connection lifecycle internally:

  • Reconnects with exponential backoff
  • Re-subscribes after reconnection
  • Queues messages while disconnected, flushes on reconnect
  • Routes Yjs messages to YjsTransportAdapter
  • Supports custom WebSocket via webSocketFactory

Connection states

StateDescription
"disconnected"No active WebSocket connection.
"connecting"Initial connection in progress.
"reconnecting"Reconnecting after a disconnect.
"connected"WebSocket is open and ready.
"error"Connection failed after all retry attempts.

Delta fetching

Fetch deltas via REST for catch-up. fetchDeltas for a single page, fetchAllDeltas for automatic pagination:

// Single page
const packet = await transport.fetchDeltas(lastSyncId, 1000);

// Paginated generator
for await (const action of fetchAllDeltas({
  afterSyncId: lastSyncId,
  auth,
  batchSize: 1000,
  syncEndpoint,
})) {
  await applyAction(action);
}

Yjs transport

Bridges Yjs protocol messages over the existing WebSocket: no separate connection needed.

const yjsTransport = transport.getYjsTransport();

// Use with YjsDocumentManager and YjsPresenceManager
presenceManager.setTransport(yjsTransport);
documentManager.setTransport(yjsTransport);

The YjsTransportAdapter implements YjsTransport from @stratasync/y-doc:

class YjsTransportAdapter implements YjsTransport {
  send(message: ClientMessage): void;
  onMessage(callback: (message: ServerMessage) => void): () => void;
  handleIncoming(message: ServerMessage): void;
  isConnected(): boolean;
}

Retry utilities

FunctionDescription
retryWithBackoff(fn, config, shouldRetry)Retries an async function with exponential backoff.
calculateBackoff(attempt, config)Computes the delay for a given retry attempt with jitter.
fetchWithTimeout(url, options, timeoutMs)Wraps fetch() with an AbortController timeout.
isNetworkError(error)Checks for network-level errors (TypeError, ECONNREFUSED, and others).
isTimeoutError(error)Checks for timeout/abort errors.
isRetryableError(error)Returns true for network errors, timeouts, 5xx, and 429 status codes.
isAuthError(error)Returns true for 401/403 or Unauthorized/Forbidden errors.
parseSyncId(value)Validates a string-encoded sync ID without losing precision.

Architecture role

Connects the sync client to the server.

sync-core (defines TransactionBatch, DeltaPacket, and more)
  ^-- sync-y-doc (defines YjsTransport interface)
        ^-- sync-transport-graphql (implements TransportAdapter + YjsTransport)
              ^-- sync-client (uses adapter for all server communication)

The same interface can be implemented with other protocols (gRPC, custom WebSocket).