Background Jobs
Fluxbase provides a powerful background jobs system for executing long-running tasks asynchronously with progress tracking, retry logic, and role-based access control.
Features
Section titled “Features”- Asynchronous execution - Submit jobs and track progress without blocking
- Real-time progress updates - Subscribe to live job updates via WebSockets
- Retry logic - Automatic retry on failure with configurable limits
- Role-based permissions - Control who can submit specific jobs
- User context - Jobs inherit user identity for RLS-aware database access
- Scheduled execution - Run jobs at specific times or on cron schedules
- Worker pool management - Scalable job processing with multiple workers
- Progress tracking - Report granular progress with custom messages and data
Configuration
Section titled “Configuration”Configure the jobs system via environment variables:
# Enable background jobsFLUXBASE_JOBS_ENABLED=true
# Directory for job function filesFLUXBASE_JOBS_DIR=./jobs
# Auto-load job functions on server startupFLUXBASE_JOBS_AUTO_LOAD_ON_BOOT=true
# Number of embedded worker threadsFLUXBASE_JOBS_EMBEDDED_WORKER_COUNT=4
# Default timeout for job execution (seconds)FLUXBASE_JOBS_DEFAULT_TIMEOUT=300
# Default maximum retry attemptsFLUXBASE_JOBS_DEFAULT_MAX_RETRIES=3Worker Architecture
Section titled “Worker Architecture”Fluxbase uses an embedded worker architecture where job workers run as goroutines within the main server process. This provides a simple, single-binary deployment model with no external dependencies beyond PostgreSQL.
How Workers Operate
Section titled “How Workers Operate”graph TB Server[Fluxbase Server]
subgraph Workers["Worker Pool (Goroutines)"] W1[Worker 1] W2[Worker 2] W3[Worker 3] W4[Worker 4] end
DB[(PostgreSQL<br/>Job Queue)]
W1 -->|Poll every 1s| DB W2 -->|Poll every 1s| DB W3 -->|Poll every 1s| DB W4 -->|Poll every 1s| DB
W1 -.->|Heartbeat every 10s| DB W2 -.->|Heartbeat every 10s| DB W3 -.->|Heartbeat every 10s| DB W4 -.->|Heartbeat every 10s| DB
Server --> Workers
style Server fill:#e1f5ff style DB fill:#fff4e1Worker Lifecycle:
- Startup - Workers register in database with unique ID and hostname
- Polling - Each worker polls database for pending jobs (default: every 1 second)
- Job Claiming - When available, worker atomically claims next job from queue
- Execution - Job runs in isolated Deno runtime with configured permissions
- Heartbeat - Worker sends heartbeat every 10 seconds to prove it’s alive
- Completion - Worker updates job status and becomes available for next job
- Shutdown - Graceful shutdown waits up to 30 seconds for in-flight jobs
Embedded Workers (Default)
Section titled “Embedded Workers (Default)”Embedded workers run within the Fluxbase server process:
jobs: enabled: true worker_mode: "embedded" # Default mode embedded_worker_count: 4 # Number of worker goroutines max_concurrent_per_worker: 5 # Max jobs per worker poll_interval: "1s" # How often to check for jobs worker_heartbeat_interval: "10s" # Heartbeat frequency worker_timeout: "30s" # Stale worker cleanup thresholdPros:
- ✅ Simple deployment (single binary/container)
- ✅ No additional infrastructure needed
- ✅ Fast startup and low overhead
- ✅ Shared memory and resources
Cons:
- ⚠️ Workers share resources with API server
- ⚠️ All workers restart when server restarts
- ⚠️ Limited horizontal scaling (vertical only)
Deployment Patterns
Section titled “Deployment Patterns”Single Binary/Container (Recommended)
Section titled “Single Binary/Container (Recommended)”Deploy one Fluxbase instance with embedded workers:
# Dockerdocker run -e FLUXBASE_JOBS_EMBEDDED_WORKER_COUNT=4 \ ghcr.io/nimbleflux/fluxbase:latest
# BinaryFLUXBASE_JOBS_EMBEDDED_WORKER_COUNT=4 ./fluxbaseThis is the recommended approach for most use cases:
- Handles 1000s of jobs per day
- Scales vertically (increase worker count and resources)
- Simple operations and monitoring
Scaling Workers
Section titled “Scaling Workers”To handle higher job throughput, increase worker count and resources:
jobs: embedded_worker_count: 8 # More workers max_concurrent_per_worker: 10 # More concurrent jobs per worker max_concurrent_per_namespace: 50 # Higher namespace limitCapacity calculation:
Max concurrent jobs = embedded_worker_count × max_concurrent_per_workerExample: 8 workers × 10 jobs = 80 concurrent jobsExternal Workers (Not Currently Supported)
Section titled “External Workers (Not Currently Supported)”While the codebase includes worker_mode: "standalone" configuration, external worker deployment is not currently exposed as a CLI feature. The architecture supports it (database-backed queue, worker registration, heartbeats), but you would need to build custom tooling to run standalone workers.
Future consideration: If your use case requires separate worker processes (e.g., different resource limits, isolated failure domains), please open a GitHub issue to discuss adding standalone worker support.
Worker Configuration Reference
Section titled “Worker Configuration Reference”| Setting | Default | Description |
|---|---|---|
worker_mode | embedded | Worker mode: embedded, standalone, disabled |
embedded_worker_count | 4 | Number of worker goroutines |
max_concurrent_per_worker | 5 | Max jobs each worker can run simultaneously |
max_concurrent_per_namespace | 20 | Max concurrent jobs per namespace |
poll_interval | 1s | How often workers check for new jobs |
worker_heartbeat_interval | 10s | Heartbeat frequency |
worker_timeout | 30s | Time before worker is considered dead |
default_progress_timeout | 300s | Kill job if no progress reported |
Monitoring Workers
Section titled “Monitoring Workers”Check worker health via the admin API:
// List active workersconst { data: workers } = await client.admin.jobs.listWorkers();
workers?.forEach((worker) => { console.log(`${worker.name}`); console.log(` Status: ${worker.status}`); console.log(` Current jobs: ${worker.current_job_count}`); console.log(` Max concurrent: ${worker.max_concurrent_jobs}`); console.log(` Last heartbeat: ${worker.last_heartbeat_at}`);});Installation
Section titled “Installation”npm install @nimbleflux/fluxbase-sdkQuick Start
Section titled “Quick Start”Submit a Job
Section titled “Submit a Job”import { createClient } from "@nimbleflux/fluxbase-sdk";
const client = createClient("http://localhost:8080", { apiKey: "your-anon-key",});
// Submit a background jobconst { data: job, error } = await client.jobs.submit("process-data", { items: [1, 2, 3, 4, 5],});
if (error) { console.error("Failed to submit job:", error);} else { console.log("Job submitted:", job.id); console.log("Status:", job.status); // "pending"}Track Progress (Polling)
Section titled “Track Progress (Polling)”// Poll for job statusasync function waitForJob(jobId: string) { let completed = false;
while (!completed) { const { data: job } = await client.jobs.get(jobId);
console.log(`Status: ${job.status}`); if (job.progress) { console.log( `Progress: ${job.progress.percent}% - ${job.progress.message}` ); }
if (job.status === "completed") { console.log("Result:", job.result); completed = true; } else if (job.status === "failed") { console.error("Error:", job.error_message); completed = true; } else if (job.status === "cancelled") { console.log("Job was cancelled"); completed = true; }
if (!completed) { await new Promise((resolve) => setTimeout(resolve, 2000)); } }}
await waitForJob(job.id);Real-Time Progress Tracking
Section titled “Real-Time Progress Tracking”Jobs are fully integrated with Fluxbase’s PostgreSQL LISTEN/NOTIFY system. Subscribe to live updates via WebSockets:
Basic Realtime Subscription
Section titled “Basic Realtime Subscription”// Submit jobconst { data: job } = await client.jobs.submit("process-data", { items: [1, 2, 3],});
// Subscribe to realtime updatesconst channel = client.realtime.channel("table:jobs.queue");
channel .on("UPDATE", (payload) => { const updatedJob = payload.new;
// Filter for our specific job if (updatedJob.id === job.id) { console.log( `[${updatedJob.progress.percent}%] ${updatedJob.progress.message}` );
// Handle completion if (updatedJob.status === "completed") { console.log("Result:", updatedJob.result); channel.unsubscribe(); }
// Handle failure if (updatedJob.status === "failed") { console.error("Error:", updatedJob.error_message); console.error("Logs:", updatedJob.logs); channel.unsubscribe(); } } }) .subscribe();What Information is Exposed
Section titled “What Information is Exposed”When a job updates, subscribers receive the complete job record (respecting Row-Level Security):
interface RealtimeJobUpdate { eventType: "UPDATE"; schema: "jobs"; table: "queue"; commit_timestamp: string;
// New state after the update new: { id: string; // Job UUID job_name: string; // Name of job function namespace: string; // Job namespace status: "pending" | "running" | "completed" | "failed" | "cancelled";
// Progress tracking progress: { percent: number; // 0-100 message: string; // Human-readable status data?: { // Optional custom data processed?: number; total?: number; currentStep?: string; [key: string]: any; }; };
// Timestamps created_at: string; started_at?: string; completed_at?: string; last_progress_at?: string;
// Results and errors result?: any; // Job output (when completed) error_message?: string; // Failure reason (when failed) logs?: string; // Captured stdout/stderr
// User context (if own job) created_by: string; // User UUID user_role: string; user_email: string;
// Worker information worker_id?: string; retry_count: number; max_retries: number; };
// Previous state before the update old: { /* same structure */ };}RLS Filtering on Realtime
Section titled “RLS Filtering on Realtime”Realtime subscriptions respect Row-Level Security policies:
Regular users see only their own jobs:
// User A subscribes to job updatesconst channel = client.realtime.channel("table:jobs.queue");
channel .on("UPDATE", (payload) => { // Only receives updates for jobs where created_by = userA_uuid // Other users' jobs are automatically filtered by RLS console.log("My job updated:", payload.new); }) .subscribe();Admins see all jobs:
// Admin or dashboard_admin role can see all jobsconst channel = adminClient.realtime.channel("table:jobs.queue");
channel .on("UPDATE", (payload) => { // Receives updates for ALL jobs across all users console.log("Job updated:", payload.new); console.log("User:", payload.new.user_email); }) .subscribe();Advanced: Filter Subscriptions
Section titled “Advanced: Filter Subscriptions”Subscribe only to jobs with specific statuses:
const channel = client.realtime.channel("jobs:running");
channel .on( "postgres_changes", { event: "UPDATE", schema: "jobs", table: "queue", filter: "status=eq.running", // Only running jobs }, (payload) => { console.log("Running job progress:", payload.new.progress); } ) .subscribe();Realtime Architecture
Section titled “Realtime Architecture”sequenceDiagram participant Job as Job Function participant Worker as Worker Process participant DB as PostgreSQL participant Server as Fluxbase Server participant Client as Browser/Client
Client->>Server: Subscribe to channel<br/>'table:jobs.queue' Server-->>Client: Subscription confirmed
Job->>Job: job.reportProgress(50, "Processing...") Job->>Worker: Return progress update Worker->>DB: UPDATE jobs.queue<br/>SET progress = {...}
Note over DB: Trigger fires after UPDATE DB->>DB: jobs.notify_realtime_change() DB->>DB: pg_notify('fluxbase_changes', JSON)
DB-->>Server: NOTIFY received Note over Server: RLS check:<br/>Filter by created_by
Server->>Client: WebSocket broadcast<br/>{new: {...}, old: {...}} Client->>Client: Update UI with progressJob States
Section titled “Job States”Jobs transition through the following states:
stateDiagram-v2 [*] --> pending: Job submitted pending --> running: Worker picks up running --> completed: Success running --> failed: Error occurred running --> cancelled: User cancels pending --> cancelled: User cancels failed --> pending: Retry completed --> [*] failed --> [*] cancelled --> [*]- pending - Job is queued, waiting for a worker
- running - Job is currently being executed
- completed - Job finished successfully
- failed - Job failed (can be retried)
- cancelled - Job was cancelled by user or admin
Creating Job Functions
Section titled “Creating Job Functions”Job functions are TypeScript/JavaScript files with a handler export and optional annotations. The handler receives four parameters:
| Parameter | Type | Description |
|---|---|---|
req | Request | HTTP Request object (for compatibility) |
fluxbase | FluxbaseClient | null | SDK client with user’s RLS context - can only access data the user can access |
fluxbaseService | FluxbaseClient | null | SDK client with service role - bypasses RLS for system-wide access |
job | JobUtils | Job utilities for progress reporting, context access, and cancellation checking |
Basic Job Function
Section titled “Basic Job Function”/** * Process user data * @fluxbase:timeout 300 * @fluxbase:description Processes user-specific data items */export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const { items } = context.payload;
// User context is automatically available console.log("Running for user:", context.user?.email);
for (let i = 0; i < items.length; i++) { // Process item await processItem(items[i]);
// Report progress const percent = Math.floor(((i + 1) / items.length) * 100); job.reportProgress(percent, `Processed ${i + 1}/${items.length}`); }
// Query user's own data (RLS applies) const { data } = await fluxbase .from("app.my_table") .select("*") .eq("user_id", context.user.id);
return { success: true, processed: items.length };}Job Function Annotations
Section titled “Job Function Annotations”Control job behavior with JSDoc-style annotations:
@fluxbase:namespace <name>- Specify namespace (overrides CLI--namespaceflag)@fluxbase:require-role <role>- Require specific user role (admin, authenticated, custom)@fluxbase:timeout <seconds>- Maximum execution time (default: 300)@fluxbase:max-retries <count>- Number of retry attempts (default: 3)@fluxbase:schedule <cron>- Cron expression for scheduled execution@fluxbase:description <text>- Human-readable job description
Admin-Only Job
Section titled “Admin-Only Job”/** * Generate system-wide report * @fluxbase:require-role admin * @fluxbase:timeout 600 */export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext();
// Only admins can submit this job console.log("Admin:", context.user?.email);
job.reportProgress(25, "Fetching data...");
// Use service client to access all data (bypasses RLS) const { data } = await fluxbaseService.from("app.system_data").select("*");
job.reportProgress(50, "Generating report..."); const report = await generateReport(data);
job.reportProgress(75, "Saving results...");
// Save report using service role await fluxbaseService .from("app.reports") .insert({ data: report, generated_at: new Date().toISOString() });
job.reportProgress(100, "Complete");
return { report, recordCount: data?.length };}Scheduled Job
Section titled “Scheduled Job”/** * Cleanup old data daily at 2 AM * @fluxbase:schedule 0 2 * * * * @fluxbase:require-role admin * @fluxbase:timeout 1800 */export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const { retention_days = 30 } = context.payload;
job.reportProgress(10, "Starting cleanup...");
const cutoffDate = new Date(); cutoffDate.setDate(cutoffDate.getDate() - retention_days);
// Use service role to access all records across users const { data: deleted } = await fluxbaseService .from("app.old_records") .delete() .lt("created_at", cutoffDate.toISOString()) .select();
job.reportProgress(100, "Cleanup complete");
return { deleted: deleted?.length ?? 0 };}Available APIs in Job Context
Section titled “Available APIs in Job Context”Job handlers receive the Fluxbase SDK clients and job utilities as parameters:
Job Utilities (job parameter)
Section titled “Job Utilities (job parameter)”// Get job contextconst context = job.getJobContext()// Returns: { job_id, job_name, namespace, retry_count, payload, user }
// Report progress (0-100)job.reportProgress(percent, message, data?)
// Check if job was cancelled (call periodically in long-running jobs)if (job.checkCancellation()) { return { cancelled: true }}
// Get payload directly (convenience method)const payload = job.getJobPayload()Fluxbase SDK Client (fluxbase parameter)
Section titled “Fluxbase SDK Client (fluxbase parameter)”The fluxbase client has the user’s RLS context - it can only access data the user has permission for:
// Query database with RLS appliedconst { data } = await fluxbase.from("app.my_table").select("*");
// Access storage (user's permissions apply)const { data: files } = await fluxbase.storage.from("user-uploads").list();
// Submit follow-up jobsawait fluxbase.jobs.submit("process-next", { batch: 2 });Service Role Client (fluxbaseService parameter)
Section titled “Service Role Client (fluxbaseService parameter)”The fluxbaseService client bypasses RLS for system-wide access:
// Access all data across usersconst { data: allRecords } = await fluxbaseService .from("app.all_data") .select("count");
// System-level storage operationsawait fluxbaseService.storage.from("system-bucket").upload("report.json", blob);When to Use Each Client
Section titled “When to Use Each Client”| Use Case | Client | Reason |
|---|---|---|
| Query user’s own data | fluxbase | Respects RLS, user can only see their data |
| Read user-uploaded files | fluxbase | User has permission to access their uploads |
| Generate user-specific exports | fluxbase | Ensures data isolation |
| System cleanup jobs | fluxbaseService | Needs access to all records |
| Admin reports across all users | fluxbaseService | Aggregates data system-wide |
| Write to system tables | fluxbaseService | May need elevated permissions |
Environment Variables
Section titled “Environment Variables”The following environment variables are automatically available in jobs:
| Variable | Description |
|---|---|
FLUXBASE_URL | Server URL for SDK client (set automatically) |
FLUXBASE_JOB_ID | Current job UUID |
FLUXBASE_JOB_NAME | Job function name |
FLUXBASE_JOB_NAMESPACE | Job namespace |
FLUXBASE_JOB_CANCELLED | "true" if job was cancelled |
Custom FLUXBASE_* variables from your server environment are also available (except blocked secrets like JWT keys).
// Read job-specific environment variablesconst jobId = Deno.env.get("FLUXBASE_JOB_ID");
// Read custom environment variables (must be prefixed with FLUXBASE_)const apiKey = Deno.env.get("FLUXBASE_EXTERNAL_API_KEY");
// Log output (captured in job.logs)console.log("Info message");console.error("Error message");Secrets
Section titled “Secrets”Jobs can access secrets the same way as Edge Functions. The secrets object is automatically available - no import needed:
export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { // Get secrets with automatic fallback: user -> system const stripeKey = secrets.getRequired("stripe_api_key"); const webhookSecret = secrets.get("webhook_secret");
job.reportProgress(25, "Processing payment...");
// Use the secret for external API calls const response = await fetch("https://api.stripe.com/v1/charges", { method: "POST", headers: { Authorization: `Bearer ${stripeKey}`, }, body: new URLSearchParams({ amount: "1000", currency: "usd" }), });
job.reportProgress(100, "Complete"); return { success: true };}Managing secrets: Use the CLI to create system secrets:
fluxbase settings secrets set stripe_api_key "sk_live_..."fluxbase settings secrets set webhook_secret "whsec_..."fluxbase settings secrets listSee the Edge Functions guide for full secrets API reference.
Security & Permissions
Section titled “Security & Permissions”Row-Level Security (RLS)
Section titled “Row-Level Security (RLS)”Jobs use PostgreSQL Row-Level Security to enforce data isolation:
graph TB subgraph "User Job Flow" U[User] -->|1. Submit job| API[POST /api/v1/jobs/submit] API -->|2. Validate auth| AUTH[Auth Middleware] AUTH -->|3. Extract user_id| CTX[Request Context] CTX -->|4. Create job record| DB[(Database)] DB -->|created_by = user_id| JOB[Job Record] end
subgraph "Job Queue Table RLS" JOB -->|RLS Policy| POLICY{Policy Check} POLICY -->|SELECT| READ[Users can read their own jobs:<br/>WHERE created_by = auth.uid] POLICY -->|INSERT| WRITE[Users can submit jobs:<br/>WITH CHECK created_by = auth.uid] POLICY -->|UPDATE| CANCEL[Users can cancel own jobs:<br/>WHERE created_by = auth.uid<br/>AND status IN pending,running] end
subgraph "Admin Access" ADMIN[Admin/Service Role] -->|Bypass RLS| ALL[Full access to all jobs] endRLS Policies
Section titled “RLS Policies”Four policies control access to the jobs.queue table:
1. Users can read their own jobs:
CREATE POLICY "Users can read their own jobs" ON jobs.queue FOR SELECT TO authenticated USING (created_by = auth.uid());2. Users can submit jobs:
CREATE POLICY "Users can submit jobs" ON jobs.queue FOR INSERT TO authenticated WITH CHECK (created_by = auth.uid());3. Users can cancel their own jobs:
CREATE POLICY "Users can cancel their own pending/running jobs" ON jobs.queue FOR UPDATE TO authenticated USING (created_by = auth.uid() AND status IN ('pending', 'running')) WITH CHECK (status = 'cancelled');4. Service role has full access:
CREATE POLICY "Service role can manage all jobs" ON jobs.queue FOR ALL TO service_role USING (true) WITH CHECK (true);Job Submission Permission Flow
Section titled “Job Submission Permission Flow”graph LR A[User submits job] --> B{Job has<br/>require_role?} B -->|No| D[Check RLS policy] B -->|Yes| C{User role<br/>matches?} C -->|No| E[403 Forbidden] C -->|Yes| D D -->|created_by = auth.uid| F[Create job] D -->|Fails policy| G[401 Unauthorized] F --> H[Job in queue]Job Lifecycle with Security
Section titled “Job Lifecycle with Security”sequenceDiagram participant User participant API participant Auth participant DB participant Worker
User->>API: POST /api/v1/jobs/submit API->>Auth: Validate JWT token Auth-->>API: user_id, role, email API->>API: Check job function permissions<br/>(require_role if set) API->>DB: INSERT INTO queue<br/>created_by = user_id Note over DB: RLS checks:<br/>created_by = auth.uid() DB-->>API: Job created (pending) API-->>User: {id, status: "pending"}
Worker->>DB: SELECT pending jobs<br/>(service_role) Note over Worker: Bypasses RLS Worker->>Worker: Execute job code Worker->>DB: UPDATE progress<br/>(service_role)
User->>API: GET /api/v1/jobs/{id} API->>DB: SELECT * WHERE id = ? Note over DB: RLS filters:<br/>created_by = auth.uid() DB-->>API: Job details (if owned) API-->>User: {status, progress, result}Security Mechanisms
Section titled “Security Mechanisms”- RLS Enforcement - Users only access jobs they created
- Role-Based Submission - Jobs can require specific roles
- Service Role Workers - Workers bypass RLS to execute jobs
- User Context Preservation - Jobs store user identity at submission
Monitoring & Operations
Section titled “Monitoring & Operations”List Jobs
Section titled “List Jobs”// List user's own jobsconst { data: jobs } = await client.jobs.list({ status: "running", limit: 20, offset: 0,});
jobs?.forEach((job) => { console.log(`${job.job_name}: ${job.status} (${job.progress?.percent}%)`);});Cancel a Job
Section titled “Cancel a Job”const { error } = await client.jobs.cancel(jobId);
if (!error) { console.log("Job cancelled");}Retry a Failed Job
Section titled “Retry a Failed Job”const { data: newJob, error } = await client.jobs.retry(jobId);
console.log("Retry job ID:", newJob.id);Admin Operations
Section titled “Admin Operations”Admins can manage all jobs across all users:
// List all jobs (admin only)const { data: allJobs } = await client.admin.jobs.listJobs({ status: "running", namespace: "default",});
// Get statisticsconst { data: stats } = await client.admin.jobs.getStats("default");console.log("Pending:", stats.pending);console.log("Running:", stats.running);console.log("Completed:", stats.completed);console.log("Failed:", stats.failed);
// List active workersconst { data: workers } = await client.admin.jobs.listWorkers();workers?.forEach((worker) => { console.log(`Worker ${worker.id}: ${worker.current_jobs} jobs`);});
// Terminate a job immediatelyawait client.admin.jobs.terminate(jobId);Deploying Job Functions
Section titled “Deploying Job Functions”Method 1: SDK Upload
Section titled “Method 1: SDK Upload”import { createClient } from "@nimbleflux/fluxbase-sdk";import { readFile } from "fs/promises";
const client = createClient(process.env.FLUXBASE_URL!, { serviceKey: process.env.FLUXBASE_SERVICE_KEY!,});
// Read job fileconst code = await readFile("./jobs/process-data.ts", "utf-8");
// Create job functionconst { data, error } = await client.admin.jobs.create({ name: "process-data", namespace: "default", code, enabled: true,});Method 2: Filesystem Auto-Load
Section titled “Method 2: Filesystem Auto-Load”Configure in .env:
FLUXBASE_JOBS_DIR=./jobsFLUXBASE_JOBS_AUTO_LOAD_ON_BOOT=truePlace job files in the jobs directory:
mkdir -p jobscp my-job.ts jobs/Restart the server - jobs will be automatically loaded.
Method 3: Sync API
Section titled “Method 3: Sync API”// Sync all jobs from filesystem to databaseconst { data, error } = await client.admin.jobs.sync({ namespace: "default", path: "./jobs",});
console.log(`Created: ${data.summary.created}`);console.log(`Updated: ${data.summary.updated}`);console.log(`Deleted: ${data.summary.deleted}`);Advanced Patterns
Section titled “Advanced Patterns”Long-Running Import
Section titled “Long-Running Import”export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const { file_url } = context.payload;
// Download file job.reportProgress(10, "Downloading file..."); const response = await fetch(file_url); const data = await response.json();
// Process in batches const batchSize = 100; let processed = 0;
for (let i = 0; i < data.length; i += batchSize) { // Check for cancellation if (job.checkCancellation()) { return { cancelled: true, processed }; }
const batch = data.slice(i, i + batchSize);
// Insert batch - RLS ensures user can only insert to their tables await fluxbase.from("app.imports").insert( batch.map((item) => ({ ...item, user_id: context.user?.id, })) );
processed += batch.length; const progress = 10 + Math.floor((processed / data.length) * 90); job.reportProgress( progress, `Imported ${processed}/${data.length} records` ); }
return { success: true, imported: processed };}Parallel Processing
Section titled “Parallel Processing”export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const { items } = job.getJobPayload();
// Process items in parallel const results = await Promise.all( items.map(async (item: any, index: number) => { const result = await processItem(item);
// Update progress const progress = Math.floor(((index + 1) / items.length) * 100); job.reportProgress(progress, `Processed ${index + 1}/${items.length}`);
return result; }) );
return { success: true, results };}External API Integration
Section titled “External API Integration”export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const apiKey = Deno.env.get("FLUXBASE_EXTERNAL_API_KEY");
job.reportProgress(25, "Calling external API...");
const response = await fetch("https://api.example.com/data", { headers: { Authorization: `Bearer ${apiKey}`, "Content-Type": "application/json", }, });
const data = await response.json();
job.reportProgress(50, "Storing results...");
// Store with user's RLS context await fluxbase.from("app.api_results").insert({ data, user_id: context.user?.id, fetched_at: new Date().toISOString(), });
job.reportProgress(100, "Complete");
return { success: true, records: data.length };}Storage Operations
Section titled “Storage Operations”export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const { source_file } = context.payload;
job.reportProgress(10, "Downloading source file...");
// Download file from user's storage (RLS applies) const { data: fileData, error } = await fluxbase.storage .from("temp-files") .download(source_file);
if (error) { throw new Error(`Failed to download: ${error.message}`); }
job.reportProgress(50, "Processing file..."); const processed = await processFile(fileData);
job.reportProgress(80, "Uploading result...");
// Upload result to user's storage const resultBlob = new Blob([JSON.stringify(processed)], { type: "application/json", }); await fluxbase.storage .from("exports") .upload(`results/${context.job_id}.json`, resultBlob);
job.reportProgress(100, "Complete");
return { success: true, output_file: `results/${context.job_id}.json` };}Submitting Follow-up Jobs
Section titled “Submitting Follow-up Jobs”export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const context = job.getJobContext(); const { total_batches, current_batch = 0 } = context.payload;
job.reportProgress( 0, `Processing batch ${current_batch + 1}/${total_batches}` );
// Process current batch await processBatch(current_batch, fluxbase);
job.reportProgress(100, `Batch ${current_batch + 1} complete`);
// Submit next batch job if not done if (current_batch + 1 < total_batches) { await fluxbase.jobs.submit("process-batch", { total_batches, current_batch: current_batch + 1, }); }
return { batch: current_batch, remaining: total_batches - current_batch - 1 };}Troubleshooting
Section titled “Troubleshooting”Jobs not loading at startup
Section titled “Jobs not loading at startup”- Check
FLUXBASE_JOBS_ENABLED=truein environment - Verify
FLUXBASE_JOBS_DIRpoints to correct directory - Check
FLUXBASE_JOBS_AUTO_LOAD_ON_BOOT=true - Review server logs for error messages
Job stuck in “pending” status
Section titled “Job stuck in “pending” status”- Check workers are running:
await client.admin.jobs.listWorkers()should return active workers - Verify
FLUXBASE_JOBS_EMBEDDED_WORKER_COUNT > 0 - Review server logs for worker errors
- Check database connectivity
Permission denied errors
Section titled “Permission denied errors”- Regular users cannot submit jobs with
@fluxbase:require-role admin - Verify user’s role matches the required role
- Use service key or admin token for admin-only jobs
- Check RLS policies on jobs.queue table
Job fails immediately
Section titled “Job fails immediately”- Check job logs:
await client.jobs.get(jobId)and inspectlogsfield - Look for syntax errors in job code
- Verify database permissions (RLS policies)
- Check environment variables are set correctly
Environment variables not available
Section titled “Environment variables not available”- Only
FLUXBASE_*prefixed variables are accessible - Verify variable is set in
.env.localor environment - Restart server after changing environment variables
- Sensitive secrets are blocked for security
Realtime updates not received
Section titled “Realtime updates not received”- Check WebSocket connection is established
- Verify subscription to correct channel
- Review RLS policies - users only see their own jobs
- Check browser console for connection errors
AI Capabilities
Section titled “AI Capabilities”Jobs can leverage AI capabilities via the job.ai object (same as utils.ai in edge functions) for chat completions and embeddings.
AI Chat Completions
Section titled “AI Chat Completions”/** * AI-powered data analysis job * @fluxbase:allow-net * @fluxbase:timeout 300 */export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { job.reportProgress(10, "Fetching data...");
const { data: records } = await fluxbaseService .from("sales_data") .select("*") .gte("created_at", new Date(Date.now() - 7 * 24 * 60 * 60 * 1000).toISOString()) .execute();
job.reportProgress(40, "Analyzing with AI...");
// Use AI to generate insights const response = await job.ai.chat({ messages: [ { role: "system", content: "You are a business analyst. Provide concise, actionable insights." }, { role: "user", content: `Analyze this week's sales data and provide key insights:\n${JSON.stringify(records, null, 2)}` } ], maxTokens: 500 });
job.reportProgress(80, "Saving report...");
// Store the AI-generated report await fluxbaseService .from("reports") .insert({ type: "weekly_analysis", content: response.content, generated_at: new Date().toISOString() }) .execute();
job.reportProgress(100, "Complete");
return { analysis: response.content };}AI Embeddings for Batch Processing
Section titled “AI Embeddings for Batch Processing”/** * Generate embeddings for documents * @fluxbase:allow-net * @fluxbase:timeout 600 */export async function handler( req: Request, fluxbase: FluxbaseClient, fluxbaseService: FluxbaseClient, job: JobUtils) { const { document_ids } = job.getJobPayload();
job.reportProgress(0, "Starting embedding generation...");
for (let i = 0; i < document_ids.length; i++) { // Get document content const { data: doc } = await fluxbaseService .from("documents") .select("id, content") .eq("id", document_ids[i]) .single() .execute();
if (doc) { // Generate embedding const { embedding } = await job.ai.embed({ text: doc.content });
// Store embedding await fluxbaseService .from("document_embeddings") .upsert({ document_id: doc.id, embedding: embedding, updated_at: new Date().toISOString() }) .execute(); }
job.reportProgress( Math.round(((i + 1) / document_ids.length) * 100), `Processed ${i + 1}/${document_ids.length} documents` ); }
return { processed: document_ids.length };}JobUtils AI Interface
Section titled “JobUtils AI Interface”The job parameter includes AI capabilities:
interface JobUtils { // Progress and context reportProgress(percent: number, message?: string, data?: any): void; isCancelled(): Promise<boolean>; getJobContext(): JobContext; getJobPayload(): any;
// AI capabilities (requires allow-net permission) ai: { chat(options: { messages: Array<{ role: string; content: string }>; provider?: string; model?: string; maxTokens?: number; temperature?: number; }): Promise<{ content: string; model: string; finish_reason?: string; usage?: { prompt_tokens: number; completion_tokens: number; total_tokens: number }; }>;
embed(options: { text: string; provider?: string; }): Promise<{ embedding: number[]; model: string; }>;
listProviders(): Promise<{ providers: Array<{ name: string; type: string; model: string; enabled: boolean }>; default: string; }>; };}Note: AI capabilities require @fluxbase:allow-net to be set in the job annotations.
Next Steps
Section titled “Next Steps”- Explore the Edge Functions guide for serverless functions
- Learn about Realtime subscriptions for live updates
- Review Row Level Security for data isolation
- Check the Admin Dashboard for job management UI