Skip to main content

Queue Service

The queue service provides reliable background job processing using Redis. Perfect for handling asynchronous tasks, email sending, data processing, and scheduled jobs.

Service Management

Start Queue Service

# Start queue service
lc start queue

# Start all services (including queue if configured)
lc start

Stop Queue Service

# Stop queue service
lc stop queue

# Stop all services
lc stop

Check Queue Status

# Check service status
lc status

# Check specific queue status
lc service status queue

Component Management

Add Queue to Project

# Add queue component
lc component add queue

# List available components
lc component list

Remove Queue from Project

# Remove queue component
lc component remove queue

Get Queue Information

# Get queue component details
lc component info queue

Service Configuration

Default Settings

  • Port: 6380
  • Host: localhost
  • Container: localcloud-redis-queue
  • Max Memory: 512MB
  • Memory Policy: noeviction (don’t evict data)
  • Persistence: Enabled (AOF + RDB)
  • Append Only: true
  • Fsync: everysec

Connection Details

# Connection string
redis://localhost:6380

# No authentication by default

Basic Usage Examples

Connect to Queue

# Using redis-cli (if installed)
redis-cli -h localhost -p 6380

# Or connect via Docker container
lc exec queue redis-cli

Basic Queue Operations

Once connected to redis-cli:
# Add job to queue
LPUSH jobs:email '{"to": "user@example.com", "subject": "Hello"}'

# Get job from queue (blocking)
BRPOP jobs:email 0

# Add job with priority
LPUSH jobs:high_priority '{"task": "urgent_task"}'

# Check queue length
LLEN jobs:email

# View jobs without removing
LRANGE jobs:email 0 -1

# Add job to specific queue
LPUSH jobs:image_processing '{"image_id": 123, "resize": "thumbnail"}'

Job Management

# Create delayed job (using sorted sets)
ZADD jobs:delayed 1640995200 '{"task": "send_reminder", "user_id": 123}'

# Get jobs ready to process
ZRANGEBYSCORE jobs:delayed -inf 1640995200

# Remove processed job
ZREM jobs:delayed '{"task": "send_reminder", "user_id": 123}'

# Failed job handling
LPUSH jobs:failed '{"original_queue": "email", "error": "smtp_error", "data": "{}"}'

Application Integration

Python Example

import redis
import json
import time

# Connect to queue
queue = redis.Redis(host='localhost', port=6380, decode_responses=True)

# Producer - Add jobs to queue
def add_job(queue_name, job_data):
    job_json = json.dumps(job_data)
    queue.lpush(f'jobs:{queue_name}', job_json)
    print(f"Added job to {queue_name}: {job_data}")

# Consumer - Process jobs from queue
def process_jobs(queue_name):
    while True:
        # Blocking pop - waits for jobs
        result = queue.brpop(f'jobs:{queue_name}', timeout=5)
        if result:
            _, job_json = result
            job_data = json.loads(job_json)
            print(f"Processing job: {job_data}")
            
            # Process the job here
            process_job(job_data)
        else:
            print("No jobs, waiting...")

# Add some jobs
add_job('email', {'to': 'user@example.com', 'subject': 'Welcome'})
add_job('image', {'image_id': 123, 'operation': 'resize'})

# Process jobs
process_jobs('email')

Node.js Example

const redis = require('redis');

// Connect to queue
const queue = redis.createClient({
    host: 'localhost',
    port: 6380
});

// Producer
async function addJob(queueName, jobData) {
    const jobJson = JSON.stringify(jobData);
    await queue.lPush(`jobs:${queueName}`, jobJson);
    console.log(`Added job to ${queueName}:`, jobData);
}

// Consumer
async function processJobs(queueName) {
    while (true) {
        try {
            // Blocking pop with timeout
            const result = await queue.brPop(`jobs:${queueName}`, 5);
            if (result) {
                const jobData = JSON.parse(result.element);
                console.log('Processing job:', jobData);
                
                // Process the job
                await processJob(jobData);
            }
        } catch (error) {
            console.error('Queue error:', error);
            await new Promise(resolve => setTimeout(resolve, 1000));
        }
    }
}

// Add jobs
await addJob('email', {to: 'user@example.com', template: 'welcome'});
await addJob('cleanup', {type: 'temp_files', older_than: '24h'});

// Start processing
processJobs('email');

Go Example

package main

import (
    "encoding/json"
    "fmt"
    "github.com/go-redis/redis/v8"
    "context"
    "time"
)

type Job struct {
    ID   string `json:"id"`
    Type string `json:"type"`
    Data map[string]interface{} `json:"data"`
}

func main() {
    // Connect to queue
    queue := redis.NewClient(&redis.Options{
        Addr: "localhost:6380",
    })
    
    ctx := context.Background()

    // Producer
    job := Job{
        ID:   "job_123",
        Type: "email",
        Data: map[string]interface{}{
            "to":      "user@example.com",
            "subject": "Hello",
        },
    }
    
    jobJSON, _ := json.Marshal(job)
    queue.LPush(ctx, "jobs:email", jobJSON)

    // Consumer
    for {
        result := queue.BRPop(ctx, 5*time.Second, "jobs:email")
        if len(result.Val()) > 0 {
            var job Job
            json.Unmarshal([]byte(result.Val()[1]), &job)
            fmt.Printf("Processing job: %+v\n", job)
            
            // Process the job here
            processJob(job)
        }
    }
}

Common Job Patterns

Priority Queues

# High priority queue
queue.lpush('jobs:high', json.dumps({'task': 'urgent'}))

# Normal priority queue  
queue.lpush('jobs:normal', json.dumps({'task': 'regular'}))

# Low priority queue
queue.lpush('jobs:low', json.dumps({'task': 'background'}))

# Process by priority
def process_by_priority():
    queues = ['jobs:high', 'jobs:normal', 'jobs:low']
    result = queue.brpop(queues, timeout=5)
    if result:
        queue_name, job_json = result
        print(f"Processing from {queue_name}: {job_json}")

Delayed Jobs

import time

# Schedule job for future execution
def schedule_job(job_data, delay_seconds):
    execute_at = time.time() + delay_seconds
    queue.zadd('jobs:delayed', {json.dumps(job_data): execute_at})

# Process delayed jobs
def process_delayed_jobs():
    now = time.time()
    jobs = queue.zrangebyscore('jobs:delayed', '-inf', now)
    
    for job_json in jobs:
        # Move to immediate queue
        queue.lpush('jobs:immediate', job_json)
        queue.zrem('jobs:delayed', job_json)

# Schedule a reminder for 1 hour
schedule_job({'type': 'reminder', 'user_id': 123}, 3600)

Job Retry Logic

def process_with_retry(job_data, max_retries=3):
    retries = job_data.get('retries', 0)
    
    try:
        # Process the job
        result = process_job(job_data)
        return result
    except Exception as e:
        if retries < max_retries:
            # Increment retry count and requeue
            job_data['retries'] = retries + 1
            job_data['last_error'] = str(e)
            
            # Add delay before retry
            delay = 2 ** retries  # Exponential backoff
            schedule_job(job_data, delay)
        else:
            # Move to failed queue
            queue.lpush('jobs:failed', json.dumps({
                'original_data': job_data,
                'error': str(e),
                'failed_at': time.time()
            }))

Monitoring and Debugging

View Queue Logs

# View queue logs
lc logs queue

# Follow logs in real-time
lc logs queue -f

Monitor Queue Status

# Connect and check queue lengths
lc exec queue redis-cli

# In redis-cli:
LLEN jobs:email
LLEN jobs:failed
ZCARD jobs:delayed

Queue Statistics

# Check all queue lengths
KEYS jobs:*

# Monitor queue activity
MONITOR

# Get memory usage
INFO memory

# Check persistence status
LASTSAVE

Configuration Options

Update Queue Settings

# Update queue configuration
lc component update queue

Available Configurations

# In .localcloud/config.yaml
services:
  queue:
    type: redis
    port: 6380
    max_memory: "512mb"
    max_memory_policy: "noeviction"
    persistence: true
    append_only: true
    append_fsync: "everysec"

Troubleshooting

Queue Service Won’t Start

Error: failed to start queue service
Solutions:
  1. Check if port is available: lsof -i :6380
  2. Check Docker status: lc doctor
  3. View logs: lc logs queue
  4. Restart service: lc restart queue

Jobs Not Processing

# Check queue lengths
lc exec queue redis-cli llen jobs:email

# Check for blocked consumers
lc exec queue redis-cli client list

Memory Issues

# Check memory usage
lc exec queue redis-cli info memory

# Check queue sizes
lc exec queue redis-cli eval "return #redis.call('keys', 'jobs:*')" 0

Data Persistence Issues

# Check last save
lc exec queue redis-cli lastsave

# Force save
lc exec queue redis-cli bgsave

# Check AOF status
lc exec queue redis-cli info persistence
I