Realtime Subscriptions
Fluxbase provides real-time database change notifications via WebSockets, powered by PostgreSQL’s LISTEN/NOTIFY system.
Installation
Section titled “Installation”npm install @nimbleflux/fluxbase-sdkBasic Usage
Section titled “Basic Usage”import { createClient } from "@nimbleflux/fluxbase-sdk";
const client = createClient("http://localhost:8080", "your-anon-key");
// Subscribe to table changesconst channel = client.realtime .channel("table:public.products") .on("INSERT", (payload) => { console.log("New product:", payload.new_record); }) .on("UPDATE", (payload) => { console.log("Updated:", payload.new_record); console.log("Previous:", payload.old_record); }) .on("DELETE", (payload) => { console.log("Deleted:", payload.old_record); }) .subscribe();
// Or use wildcard for all eventsconst channel = client.realtime .channel("table:public.products") .on("*", (payload) => { console.log("Event:", payload.type); // INSERT, UPDATE, or DELETE }) .subscribe();
// Unsubscribechannel.unsubscribe();Payload Structure
Section titled “Payload Structure”interface RealtimeChangePayload { type: "INSERT" | "UPDATE" | "DELETE"; schema: string; table: string; new_record?: Record<string, unknown>; // INSERT and UPDATE old_record?: Record<string, unknown>; // UPDATE and DELETE timestamp: string;}React Hook Example
Section titled “React Hook Example”import { useEffect, useState } from 'react'import { createClient } from '@nimbleflux/fluxbase-sdk'
function useRealtimeTable(tableName) { const [data, setData] = useState([]) const client = createClient('http://localhost:8080', 'your-anon-key')
useEffect(() => { const channel = client.realtime .channel(`table:public.${tableName}`) .on('INSERT', (payload) => { setData(prev => [...prev, payload.new_record]) }) .on('UPDATE', (payload) => { setData(prev => prev.map(item => item.id === payload.new_record.id ? payload.new_record : item ) ) }) .on('DELETE', (payload) => { setData(prev => prev.filter(item => item.id !== payload.old_record.id)) }) .subscribe()
return () => channel.unsubscribe() }, [tableName])
return data}
// Usagefunction ProductList() { const products = useRealtimeTable('products')
return ( <div> {products.map(product => ( <div key={product.id}>{product.name}</div> ))} </div> )}Filtering Updates
Section titled “Filtering Updates”Subscribe to specific rows using RLS policies:
// Only receive updates for rows user has access to// Access control is enforced via Row-Level Security policiesconst channel = client.realtime .channel("table:public.posts") .on("*", (payload) => { // Only events matching RLS policies are received console.log("Post update:", payload); }) .subscribe();Multiple Subscriptions
Section titled “Multiple Subscriptions”Subscribe to multiple tables:
const productsChannel = client.realtime .channel("table:public.products") .on("*", handleProductChange) .subscribe();
const ordersChannel = client.realtime .channel("table:public.orders") .on("*", handleOrderChange) .subscribe();
// CleanupproductsChannel.unsubscribe();ordersChannel.unsubscribe();Connection States
Section titled “Connection States”Monitor connection status:
const channel = client.realtime .channel("table:public.products") .on("*", handleChange) .subscribe((status) => { if (status === "SUBSCRIBED") { console.log("Connected and listening"); } else if (status === "CHANNEL_ERROR") { console.error("Subscription error"); } else if (status === "CLOSED") { console.log("Connection closed"); } });Enabling Realtime on Tables
Section titled “Enabling Realtime on Tables”By default, realtime is disabled on user tables. Use the admin API to enable it:
// Enable realtime on a table (all events)await client.admin.realtime.enableRealtime("products");
// Enable with specific events onlyawait client.admin.realtime.enableRealtime("orders", { events: ["INSERT", "UPDATE"], // Skip DELETE events});
// Exclude large columns from notifications (helps with 8KB pg_notify limit)await client.admin.realtime.enableRealtime("posts", { exclude: ["content", "raw_html"],});
// Enable on a custom schemaawait client.admin.realtime.enableRealtime("events", { schema: "analytics",});What Happens Under the Hood
Section titled “What Happens Under the Hood”When you enable realtime, Fluxbase automatically:
- Sets
REPLICA IDENTITY FULLon the table (required for UPDATE/DELETE to include old values) - Creates a trigger that calls
pg_notify('fluxbase_changes', ...)on INSERT/UPDATE/DELETE - Registers the table in
realtime.schema_registryfor subscription validation
Managing Realtime Tables
Section titled “Managing Realtime Tables”// List all realtime-enabled tablesconst { tables, count } = await client.admin.realtime.listTables();console.log(`${count} tables have realtime enabled`);
// Check status of a specific tableconst status = await client.admin.realtime.getStatus("public", "products");if (status.realtime_enabled) { console.log("Events:", status.events.join(", "));}
// Update configuration (change events or excluded columns)await client.admin.realtime.updateConfig("public", "products", { events: ["INSERT"], // Only track inserts now exclude: ["metadata"],});
// Disable realtime on a tableawait client.admin.realtime.disableRealtime("public", "products");Manual SQL Setup (Alternative)
Section titled “Manual SQL Setup (Alternative)”You can also enable realtime manually with SQL:
-- Enable realtime for a tableALTER TABLE products REPLICA IDENTITY FULL;
-- Register in schema registryINSERT INTO realtime.schema_registry (schema_name, table_name, realtime_enabled, events)VALUES ('public', 'products', true, ARRAY['INSERT', 'UPDATE', 'DELETE']);
-- Create trigger using the shared notify functionCREATE TRIGGER products_realtime_notifyAFTER INSERT OR UPDATE OR DELETE ON productsFOR EACH ROW EXECUTE FUNCTION public.notify_realtime_change();Architecture
Section titled “Architecture”Fluxbase uses PostgreSQL’s LISTEN/NOTIFY system:
- Database triggers detect changes (INSERT, UPDATE, DELETE)
- Trigger calls
pg_notify()to broadcast change - Fluxbase server listens to notifications
- Server forwards changes to subscribed WebSocket clients
- Clients receive real-time updates
This architecture is lightweight and scales well for moderate traffic.
graph LR A[Client App] -->|WebSocket| B[Fluxbase Server] B -->|LISTEN| C[(PostgreSQL)] C -->|pg_notify| B D[Database Trigger] -->|INSERT/UPDATE/DELETE| C B -->|Broadcast| A B -->|Broadcast| E[Client App 2] B -->|Broadcast| F[Client App 3]
style C fill:#336791,color:#fff style B fill:#3178c6,color:#fffSingle-Instance Architecture
Section titled “Single-Instance Architecture”In a single-instance deployment, all WebSocket connections are handled by one Fluxbase server. This is the default setup and works well for most use cases.
Multi-Instance Architecture (Horizontal Scaling)
Section titled “Multi-Instance Architecture (Horizontal Scaling)”Fluxbase supports two types of realtime notifications, each handled differently in multi-instance deployments:
Database Change Notifications
Section titled “Database Change Notifications”Database changes (INSERT/UPDATE/DELETE) are automatically broadcast to all instances via PostgreSQL’s LISTEN/NOTIFY. No extra configuration needed.
graph TB A[Client 1] -->|WebSocket| LB[Load Balancer<br/>Session Stickiness] B[Client 2] -->|WebSocket| LB C[Client 3] -->|WebSocket| LB
LB -->|Sticky to Instance 1| FB1[Fluxbase Instance 1] LB -->|Sticky to Instance 2| FB2[Fluxbase Instance 2] LB -->|Sticky to Instance 3| FB3[Fluxbase Instance 3]
FB1 -->|LISTEN| DB[(PostgreSQL)] FB2 -->|LISTEN| DB FB3 -->|LISTEN| DB
DB -->|pg_notify broadcasts<br/>to all instances| FB1 DB -->|pg_notify broadcasts<br/>to all instances| FB2 DB -->|pg_notify broadcasts<br/>to all instances| FB3
style DB fill:#336791,color:#fff style FB1 fill:#3178c6,color:#fff style FB2 fill:#3178c6,color:#fff style FB3 fill:#3178c6,color:#fff style LB fill:#ff6b6b,color:#fffApplication Broadcasts (Cross-Instance)
Section titled “Application Broadcasts (Cross-Instance)”For application-level broadcasts (chat messages, custom events, presence), configure a distributed pub/sub backend:
# Option 1: PostgreSQL LISTEN/NOTIFY (default, no extra dependencies)FLUXBASE_SCALING_BACKEND=postgres
# Option 2: Redis/Dragonfly (for high-scale deployments)FLUXBASE_SCALING_BACKEND=redisFLUXBASE_SCALING_REDIS_URL=redis://dragonfly:6379When configured, broadcasts sent via BroadcastGlobal() are delivered to clients on all instances, not just the originating one.
Key points for horizontal scaling:
- Each Fluxbase instance maintains its own PostgreSQL LISTEN connection
- PostgreSQL broadcasts
pg_notify()to all listening instances simultaneously - Load balancer uses session stickiness (source IP or cookie-based) to route each client to the same Fluxbase instance
- Requires external PostgreSQL (which also stores authentication sessions shared across instances)
- With
postgresorredisscaling backend, rate limiting is shared across instances
See Deployment: Scaling for configuration details.
Connection Management
Section titled “Connection Management”Auto-reconnect: SDK automatically reconnects on connection loss
Heartbeat: Periodic ping/pong to detect stale connections
Cleanup: Always unsubscribe when done to prevent memory leaks
// Good: cleanup in effectuseEffect(() => { const channel = client.realtime .channel("table:public.products") .on("*", handleChange) .subscribe();
return () => channel.unsubscribe(); // Cleanup}, []);Presence Tracking
Section titled “Presence Tracking”Presence tracking enables real-time user online/offline status and custom state sharing. This is useful for collaborative applications, chat systems, and multiplayer features.
Tracking User Presence
Section titled “Tracking User Presence”Track when users join and leave channels:
import { createClient } from '@nimbleflux/fluxbase-sdk'
const client = createClient('http://localhost:8080', 'your-anon-key')
// Track presence with custom stateconst channel = client.realtime .channel('presence:online_users') .on('presence', { event: 'sync' }, (payload) => { // Initial presence state (all users currently online) console.log('Current users:', payload.presences) }) .on('presence', { event: 'join' }, (payload) => { // User came online console.log('User joined:', payload.key) console.log('User state:', payload.presence) }) .on('presence', { event: 'leave' }, (payload) => { // User went offline console.log('User left:', payload.key) }) .subscribe()
// Track your own presence with custom statechannel.track({ online: true, last_seen: new Date().toISOString(), status: 'working',})Presence State Structure
Section titled “Presence State Structure”Presence data is sent as JSON objects:
interface PresenceState { online: boolean last_seen: string status?: 'online' | 'away' | 'busy' | 'offline' custom_field?: string // Any custom data}
interface PresencePayload { event: 'sync' | 'join' | 'leave' key: string // Unique identifier (user ID, session ID, etc.) presence: PresenceState presences: Record<string, PresenceState[]> // All presences for 'sync' event}Presence Events
Section titled “Presence Events”| Event | Description | When It Fires |
|---|---|---|
sync | Initial presence state | When you first subscribe |
join | User came online | When a user tracks their presence |
leave | User went offline | When a user disconnects or untracks |
Custom Presence State
Section titled “Custom Presence State”Update your presence state in real-time:
// Update presence statechannel.track({ online: true, status: 'busy', inMeeting: true, meetingId: 'abc-123',})
// Update state laterchannel.track({ online: true, status: 'available', inMeeting: false,})Untracking Presence
Section titled “Untracking Presence”Remove your presence when done:
// Stop being trackedchannel.untrack()
// Or unsubscribe to automatically untrackchannel.unsubscribe()Channel Patterns
Section titled “Channel Patterns”Presence works with any channel name:
// Global presence across appconst globalPresence = client.realtime .channel('presence:global')
// Room-specific presenceconst roomPresence = client.realtime .channel('presence:room:123')
// Document collaboration presenceconst docPresence = client.realtime .channel('presence:document:abc')React Example
Section titled “React Example”Build a user list with online indicators:
import { useEffect, useState } from 'react'import { createClient } from '@nimbleflux/fluxbase-sdk'
interface UserPresence { key: string state: { online: boolean status: string last_seen: string }}
function OnlineUsers({ documentId }: { documentId: string }) { const [users, setUsers] = useState<UserPresence[]>([]) const client = createClient('http://localhost:8080', 'your-anon-key')
useEffect(() => { const channel = client.realtime .channel(`presence:document:${documentId}`) .on('presence', { event: 'sync' }, (payload) => { // Convert presences object to array const userArray = Object.entries(payload.presences).map(([key, states]) => ({ key, state: states[0], })) setUsers(userArray) }) .on('presence', { event: 'join' }, (payload) => { setUsers(prev => [...prev, { key: payload.key, state: payload.presence, }]) }) .on('presence', { event: 'leave' }, (payload) => { setUsers(prev => prev.filter(u => u.key !== payload.key)) }) .subscribe()
// Track your own presence channel.track({ online: true, last_seen: new Date().toISOString(), })
return () => { channel.untrack() channel.unsubscribe() } }, [documentId])
return ( <div> <h3>Online Users ({users.filter(u => u.state.online).length})</h3> <ul> {users.map(user => ( <li key={user.key}> <span style={{ display: 'inline-block', width: 10, height: 10, borderRadius: '50%', backgroundColor: user.state.online ? 'green' : 'gray', marginRight: 8, }} /> {user.key} ({user.state.status}) </li> ))} </ul> </div> )}Presence with RLS
Section titled “Presence with RLS”Presence channels respect authentication but not RLS policies:
// Authenticated presenceconst authChannel = client.realtime .channel('presence:room:private') .subscribe() // Requires authentication
// Anonymous presence (if allowed)const anonChannel = client.realtime .channel('presence:public') .subscribe()Rate Limiting
Section titled “Rate Limiting”Presence tracking is subject to rate limits:
realtime: presence: max_presences_per_channel: 1000 # Max users tracking presence in a channel max_channels_per_connection: 10 # Max presence channels per connectionSecurity
Section titled “Security”Realtime subscriptions respect Row-Level Security policies. Users only receive updates for rows they have permission to view.
-- Example: Users only see their own postsCREATE POLICY "Users see own posts"ON postsFOR SELECTUSING (current_setting('app.user_id', true)::uuid = user_id);When authenticated, users receive realtime updates only for their own posts.
Best Practices
Section titled “Best Practices”Performance:
- Limit number of active subscriptions per client
- Unsubscribe from unused channels
- Use wildcard (
*) when listening to all event types
Security:
- Always use RLS policies to control data access
- Validate JWT tokens for authenticated subscriptions
- Never expose sensitive data in realtime payloads
Reliability:
- Handle connection errors gracefully
- Implement reconnection logic for long-running apps
- Cache local state to handle brief disconnections
Debugging:
- Monitor connection status
- Log payload structures during development
- Use browser DevTools to inspect WebSocket traffic
Raw WebSocket Protocol
Section titled “Raw WebSocket Protocol”For non-JavaScript environments, see the Realtime SDK Documentation for WebSocket protocol details.
Troubleshooting
Section titled “Troubleshooting”No updates received:
- Verify realtime is enabled on table (triggers exist)
- Check RLS policies allow access to rows
- Confirm WebSocket connection is established
- Verify channel name matches table:
table:schema.table_name
Connection drops:
- Check network stability
- Verify Fluxbase server is running
- Review firewall/proxy WebSocket support
- Ensure JWT token is valid (not expired)
Performance issues:
- Reduce number of subscriptions
- Optimize RLS policies (avoid slow queries)
- Consider aggregating rapid changes client-side
- Monitor PostgreSQL NOTIFY queue size
Next Steps
Section titled “Next Steps”- Row-Level Security - Control data access
- Authentication - Secure subscriptions
- Monitoring - Track realtime performance
- Scaling Guide - Configure realtime for horizontal scaling