Queue Service

The queue service provides reliable background job processing using Redis. Perfect for handling asynchronous tasks, email sending, data processing, and scheduled jobs.

Service Management

Start Queue Service

# Start queue service
lc start queue

# Start all services (including queue if configured)
lc start

Stop Queue Service

# Stop queue service
lc stop queue

# Stop all services
lc stop

Check Queue Status

# Check service status
lc status

# Check specific queue status
lc service status queue

Component Management

Add Queue to Project

# Add queue component
lc component add queue

# List available components
lc component list

Remove Queue from Project

# Remove queue component
lc component remove queue

Get Queue Information

# Get queue component details
lc component info queue

Service Configuration

Default Settings

  • Port: 6380
  • Host: localhost
  • Container: localcloud-redis-queue
  • Max Memory: 512MB
  • Memory Policy: noeviction (don’t evict data)
  • Persistence: Enabled (AOF + RDB)
  • Append Only: true
  • Fsync: everysec

Connection Details

# Connection string
redis://localhost:6380

# No authentication by default

Basic Usage Examples

Connect to Queue

# Using redis-cli (if installed)
redis-cli -h localhost -p 6380

# Or connect via Docker container
lc exec queue redis-cli

Basic Queue Operations

Once connected to redis-cli:
# Add job to queue
LPUSH jobs:email '{"to": "user@example.com", "subject": "Hello"}'

# Get job from queue (blocking)
BRPOP jobs:email 0

# Add job with priority
LPUSH jobs:high_priority '{"task": "urgent_task"}'

# Check queue length
LLEN jobs:email

# View jobs without removing
LRANGE jobs:email 0 -1

# Add job to specific queue
LPUSH jobs:image_processing '{"image_id": 123, "resize": "thumbnail"}'

Job Management

# Create delayed job (using sorted sets)
ZADD jobs:delayed 1640995200 '{"task": "send_reminder", "user_id": 123}'

# Get jobs ready to process
ZRANGEBYSCORE jobs:delayed -inf 1640995200

# Remove processed job
ZREM jobs:delayed '{"task": "send_reminder", "user_id": 123}'

# Failed job handling
LPUSH jobs:failed '{"original_queue": "email", "error": "smtp_error", "data": "{}"}'

Application Integration

Python Example

import redis
import json
import time

# Connect to queue
queue = redis.Redis(host='localhost', port=6380, decode_responses=True)

# Producer - Add jobs to queue
def add_job(queue_name, job_data):
    job_json = json.dumps(job_data)
    queue.lpush(f'jobs:{queue_name}', job_json)
    print(f"Added job to {queue_name}: {job_data}")

# Consumer - Process jobs from queue
def process_jobs(queue_name):
    while True:
        # Blocking pop - waits for jobs
        result = queue.brpop(f'jobs:{queue_name}', timeout=5)
        if result:
            _, job_json = result
            job_data = json.loads(job_json)
            print(f"Processing job: {job_data}")
            
            # Process the job here
            process_job(job_data)
        else:
            print("No jobs, waiting...")

# Add some jobs
add_job('email', {'to': 'user@example.com', 'subject': 'Welcome'})
add_job('image', {'image_id': 123, 'operation': 'resize'})

# Process jobs
process_jobs('email')

Node.js Example

const redis = require('redis');

// Connect to queue
const queue = redis.createClient({
    host: 'localhost',
    port: 6380
});

// Producer
async function addJob(queueName, jobData) {
    const jobJson = JSON.stringify(jobData);
    await queue.lPush(`jobs:${queueName}`, jobJson);
    console.log(`Added job to ${queueName}:`, jobData);
}

// Consumer
async function processJobs(queueName) {
    while (true) {
        try {
            // Blocking pop with timeout
            const result = await queue.brPop(`jobs:${queueName}`, 5);
            if (result) {
                const jobData = JSON.parse(result.element);
                console.log('Processing job:', jobData);
                
                // Process the job
                await processJob(jobData);
            }
        } catch (error) {
            console.error('Queue error:', error);
            await new Promise(resolve => setTimeout(resolve, 1000));
        }
    }
}

// Add jobs
await addJob('email', {to: 'user@example.com', template: 'welcome'});
await addJob('cleanup', {type: 'temp_files', older_than: '24h'});

// Start processing
processJobs('email');

Go Example

package main

import (
    "encoding/json"
    "fmt"
    "github.com/go-redis/redis/v8"
    "context"
    "time"
)

type Job struct {
    ID   string `json:"id"`
    Type string `json:"type"`
    Data map[string]interface{} `json:"data"`
}

func main() {
    // Connect to queue
    queue := redis.NewClient(&redis.Options{
        Addr: "localhost:6380",
    })
    
    ctx := context.Background()

    // Producer
    job := Job{
        ID:   "job_123",
        Type: "email",
        Data: map[string]interface{}{
            "to":      "user@example.com",
            "subject": "Hello",
        },
    }
    
    jobJSON, _ := json.Marshal(job)
    queue.LPush(ctx, "jobs:email", jobJSON)

    // Consumer
    for {
        result := queue.BRPop(ctx, 5*time.Second, "jobs:email")
        if len(result.Val()) > 0 {
            var job Job
            json.Unmarshal([]byte(result.Val()[1]), &job)
            fmt.Printf("Processing job: %+v\n", job)
            
            // Process the job here
            processJob(job)
        }
    }
}

Common Job Patterns

Priority Queues

# High priority queue
queue.lpush('jobs:high', json.dumps({'task': 'urgent'}))

# Normal priority queue  
queue.lpush('jobs:normal', json.dumps({'task': 'regular'}))

# Low priority queue
queue.lpush('jobs:low', json.dumps({'task': 'background'}))

# Process by priority
def process_by_priority():
    queues = ['jobs:high', 'jobs:normal', 'jobs:low']
    result = queue.brpop(queues, timeout=5)
    if result:
        queue_name, job_json = result
        print(f"Processing from {queue_name}: {job_json}")

Delayed Jobs

import time

# Schedule job for future execution
def schedule_job(job_data, delay_seconds):
    execute_at = time.time() + delay_seconds
    queue.zadd('jobs:delayed', {json.dumps(job_data): execute_at})

# Process delayed jobs
def process_delayed_jobs():
    now = time.time()
    jobs = queue.zrangebyscore('jobs:delayed', '-inf', now)
    
    for job_json in jobs:
        # Move to immediate queue
        queue.lpush('jobs:immediate', job_json)
        queue.zrem('jobs:delayed', job_json)

# Schedule a reminder for 1 hour
schedule_job({'type': 'reminder', 'user_id': 123}, 3600)

Job Retry Logic

def process_with_retry(job_data, max_retries=3):
    retries = job_data.get('retries', 0)
    
    try:
        # Process the job
        result = process_job(job_data)
        return result
    except Exception as e:
        if retries < max_retries:
            # Increment retry count and requeue
            job_data['retries'] = retries + 1
            job_data['last_error'] = str(e)
            
            # Add delay before retry
            delay = 2 ** retries  # Exponential backoff
            schedule_job(job_data, delay)
        else:
            # Move to failed queue
            queue.lpush('jobs:failed', json.dumps({
                'original_data': job_data,
                'error': str(e),
                'failed_at': time.time()
            }))

Monitoring and Debugging

View Queue Logs

# View queue logs
lc logs queue

# Follow logs in real-time
lc logs queue -f

Monitor Queue Status

# Connect and check queue lengths
lc exec queue redis-cli

# In redis-cli:
LLEN jobs:email
LLEN jobs:failed
ZCARD jobs:delayed

Queue Statistics

# Check all queue lengths
KEYS jobs:*

# Monitor queue activity
MONITOR

# Get memory usage
INFO memory

# Check persistence status
LASTSAVE

Configuration Options

Update Queue Settings

# Update queue configuration
lc component update queue

Available Configurations

# In .localcloud/config.yaml
services:
  queue:
    type: redis
    port: 6380
    max_memory: "512mb"
    max_memory_policy: "noeviction"
    persistence: true
    append_only: true
    append_fsync: "everysec"

Troubleshooting

Queue Service Won’t Start

Error: failed to start queue service
Solutions:
  1. Check if port is available: lsof -i :6380
  2. Check Docker status: lc doctor
  3. View logs: lc logs queue
  4. Restart service: lc restart queue

Jobs Not Processing

# Check queue lengths
lc exec queue redis-cli llen jobs:email

# Check for blocked consumers
lc exec queue redis-cli client list

Memory Issues

# Check memory usage
lc exec queue redis-cli info memory

# Check queue sizes
lc exec queue redis-cli eval "return #redis.call('keys', 'jobs:*')" 0

Data Persistence Issues

# Check last save
lc exec queue redis-cli lastsave

# Force save
lc exec queue redis-cli bgsave

# Check AOF status
lc exec queue redis-cli info persistence