AI agents: fetch the documentation index at llms.txt. Markdown versions are available by appending .md to any page URL, including this page's markdown.
@stratasync/transport-graphql
GraphQL and REST transport adapter for sync communication, delta subscriptions, and Yjs collaborative editing in Strata Sync.
Implements TransportAdapter with NDJSON bootstrap streaming, REST/GraphQL mutations, WebSocket delta subscriptions, and Yjs collaborative editing: all with built-in retry and exponential backoff.
What it provides
createGraphQLTransport: factory function that creates a configuredTransportAdapterinstanceGraphQLTransportAdapter: the class implementing the fullTransportAdapterinterfaceWebSocketManager: manages WebSocket connections for delta subscriptions with automatic reconnectionYjsTransportAdapter: bridges Yjs protocol messages over the WebSocket connection- Bootstrap streams:
createBootstrapStreamandcreateBatchLoadStreamfor NDJSON streaming - Delta fetching:
fetchDeltasandfetchAllDeltasfor REST-based catch-up - Mutation sending:
sendMutations(GraphQL) andsendRestMutations(REST) - Retry utilities:
retryWithBackoff,calculateBackoff,fetchWithTimeout, error classification helpers
Installation
npm install @stratasync/transport-graphql@stratasync/transport-graphql has no peer dependencies. It depends on @stratasync/core and @stratasync/y-doc as workspace dependencies.
Quick start
import { createSyncClient } from "@stratasync/client";
import { createGraphQLTransport } from "@stratasync/transport-graphql";
const transport = createGraphQLTransport({
endpoint: "https://api.example.com/graphql",
syncEndpoint: "https://api.example.com/sync",
wsEndpoint: "wss://api.example.com/sync/ws",
auth: {
getAccessToken: () => getTokenFromStore(),
refreshToken: () => refreshAccessToken(),
},
});
const client = createSyncClient({
transport,
// ... storage, reactivity
});TransportAdapter interface
| Method | Signature | Description |
|---|---|---|
bootstrap | (options: BootstrapOptions) => AsyncGenerator<ModelRow, BootstrapMetadata> | Streams initial data via NDJSON. |
batchLoad | (options: BatchLoadOptions) => AsyncIterable<ModelRow> | Batch loads specific model instances. |
mutate | (batch: TransactionBatch) => Promise<MutateResult> | Sends a batch of mutations. |
subscribe | (options: SubscribeOptions) => DeltaSubscription | Subscribes to real-time delta updates via WebSocket. |
fetchDeltas | (after: number, limit?: number) => Promise<DeltaPacket> | Fetches deltas via REST for catch-up. |
getConnectionState | () => ConnectionState | Returns the WebSocket connection state. |
onConnectionStateChange | (callback) => () => void | Subscribes to connection state changes. |
close | () => Promise<void> | Closes the WebSocket connection. |
Configuration
TransportOptions
interface TransportOptions {
/** GraphQL endpoint URL */
endpoint: string;
/** Base REST sync endpoint (such as https://api.example.com/sync) */
syncEndpoint: string;
/** WebSocket endpoint for subscriptions */
wsEndpoint: string;
/** Authentication provider */
auth: AuthProvider;
/** GraphQL mutation builder (optional -- uses REST mutations if omitted) */
mutationBuilder?: GraphQLMutationBuilder;
/** Request timeout in milliseconds */
timeout?: number;
/** Retry configuration */
retry?: RetryConfig;
/** Custom HTTP headers */
headers?: Record<string, string>;
/** Custom WebSocket implementation (for non-browser environments) */
webSocketFactory?: typeof WebSocket;
}AuthProvider
Called before each HTTP request and WebSocket subscription:
interface AuthProvider {
/** Gets the current access token */
getAccessToken(): Promise<string | null>;
/** Refreshes the access token (optional) */
refreshToken?(): Promise<string | null>;
/** Called when auth fails (optional) */
onAuthError?(error: Error): void;
}RetryConfig
Controls retry for HTTP requests and WebSocket reconnection:
interface RetryConfig {
/** Maximum number of retries */
maxRetries: number;
/** Base delay in milliseconds */
baseDelay: number;
/** Maximum delay in milliseconds */
maxDelay: number;
/** Jitter factor (0-1) for randomizing delays */
jitter?: number;
}Default configuration:
const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxRetries: 3,
baseDelay: 1000,
maxDelay: 30_000,
jitter: 0.2,
};Bootstrap streaming
Streams initial data as NDJSON from the server:
const stream = transport.bootstrap({
type: "full",
schemaHash: "abc123",
syncGroups: ["workspace-1"],
});
for await (const row of stream) {
// row: { modelName: string, data: Record<string, unknown> }
await storage.put(row.modelName, row.data);
}
// The return value contains metadata
const metadata = await stream.return(); // BootstrapMetadatabatchLoad fetches specific model instances:
const stream = transport.batchLoad({
firstSyncId: 12345,
requests: [
{ modelName: "Task", groupId: "team-abc" },
{ modelName: "Comment", indexedKey: "taskId", keyValue: "task-123" },
],
});
for await (const row of stream) {
await storage.put(row.modelName, row.data);
}Mutations
Two modes: REST (default) and GraphQL.
REST mutations (default)
Without a mutationBuilder, mutations are sent as JSON to {syncEndpoint}/mutate.
GraphQL mutations
With a mutationBuilder, mutations are sent as a GraphQL document. The builder receives each transaction and returns a GraphQLMutationSpec.
import type {
GraphQLMutationBuilder,
GraphQLMutationSpec,
} from "@stratasync/transport-graphql";
const mutationBuilder: GraphQLMutationBuilder = (transaction, index) => {
const spec: GraphQLMutationSpec = {
mutation: `taskUpdate(input: $input${index}) { syncId success }`,
variables: { [`input${index}`]: transaction.payload },
variableTypes: { [`input${index}`]: "TaskUpdateInput!" },
};
return spec;
};
const transport = createGraphQLTransport({
mutationBuilder,
// ...
});GraphQLMutationSpec has three properties:
| Property | Type | Description |
|---|---|---|
mutation | string | GraphQL field invocation (such as taskUpdate(input: $input0) { syncId }). |
variables | Record<string, unknown> | Variables used by the mutation. |
variableTypes | Record<string, string> | GraphQL variable type definitions. |
WebSocket subscriptions
Real-time delta streaming over WebSocket:
const subscription = transport.subscribe({
afterSyncId: 12345,
groups: ["workspace-1"],
});
for await (const packet of subscription) {
// packet: DeltaPacket { actions: SyncAction[], lastSyncId: number, hasMore: boolean }
for (const action of packet.actions) {
await applyAction(action);
}
}
// Clean up
subscription.unsubscribe();WebSocketManager
Handles connection lifecycle internally:
- Reconnects with exponential backoff
- Re-subscribes after reconnection
- Queues messages while disconnected, flushes on reconnect
- Routes Yjs messages to
YjsTransportAdapter - Supports custom WebSocket via
webSocketFactory
Connection states
| State | Description |
|---|---|
"disconnected" | No active WebSocket connection. |
"connecting" | Initial connection in progress. |
"reconnecting" | Reconnecting after a disconnect. |
"connected" | WebSocket is open and ready. |
"error" | Connection failed after all retry attempts. |
Delta fetching
Fetch deltas via REST for catch-up. fetchDeltas for a single page, fetchAllDeltas for automatic pagination:
// Single page
const packet = await transport.fetchDeltas(lastSyncId, 1000);
// Paginated generator
for await (const action of fetchAllDeltas({
afterSyncId: lastSyncId,
auth,
batchSize: 1000,
syncEndpoint,
})) {
await applyAction(action);
}Yjs transport
Bridges Yjs protocol messages over the existing WebSocket: no separate connection needed.
const yjsTransport = transport.getYjsTransport();
// Use with YjsDocumentManager and YjsPresenceManager
presenceManager.setTransport(yjsTransport);
documentManager.setTransport(yjsTransport);The YjsTransportAdapter implements YjsTransport from @stratasync/y-doc:
class YjsTransportAdapter implements YjsTransport {
send(message: ClientMessage): void;
onMessage(callback: (message: ServerMessage) => void): () => void;
handleIncoming(message: ServerMessage): void;
isConnected(): boolean;
}Retry utilities
| Function | Description |
|---|---|
retryWithBackoff(fn, config, shouldRetry) | Retries an async function with exponential backoff. |
calculateBackoff(attempt, config) | Computes the delay for a given retry attempt with jitter. |
fetchWithTimeout(url, options, timeoutMs) | Wraps fetch() with an AbortController timeout. |
isNetworkError(error) | Checks for network-level errors (TypeError, ECONNREFUSED, and others). |
isTimeoutError(error) | Checks for timeout/abort errors. |
isRetryableError(error) | Returns true for network errors, timeouts, 5xx, and 429 status codes. |
isAuthError(error) | Returns true for 401/403 or Unauthorized/Forbidden errors. |
parseSyncId(value) | Validates a string-encoded sync ID without losing precision. |
Architecture role
Connects the sync client to the server.
sync-core (defines TransactionBatch, DeltaPacket, and more)
^-- sync-y-doc (defines YjsTransport interface)
^-- sync-transport-graphql (implements TransportAdapter + YjsTransport)
^-- sync-client (uses adapter for all server communication)
The same interface can be implemented with other protocols (gRPC, custom WebSocket).