Queue Service (Redis)

LocalCloud includes Redis configured as a job queue system for processing background tasks, handling asynchronous operations, and managing workloads efficiently.

What is Redis Queue?

Redis queues in LocalCloud provide reliable background job processing using Redis data structures. Perfect for:
  • Background processing - Handle tasks outside the request cycle
  • Asynchronous operations - Decouple time-consuming tasks
  • Task scheduling - Delayed and recurring jobs
  • Workflow management - Multi-step processes

Connection Details

The queue service uses the same Redis instance as cache but typically on a different database:
  • Host: localhost
  • Port: 6380 (separate port for queues)
  • Database: 1 (separate from cache db 0)
  • Password: None (local development)

Connection String

redis://localhost:6380/1

Getting Started

Start Queue Service

# Start all services (including queue)
lc start

# Start only queue service
lc start queue

# Check queue status
lc status queue

Connect to Queue Redis

# Connect to queue Redis instance
redis-cli -h localhost -p 6380 -n 1

# Or through LocalCloud
lc redis connect --port 6380 --db 1

Queue Patterns

Simple Job Queue (FIFO)

# Producer: Add job to queue
LPUSH jobs:email '{"to":"user@example.com","subject":"Welcome"}'

# Consumer: Process job from queue
BRPOP jobs:email 0  # Blocking pop

Priority Queues

# Add jobs with different priorities
LPUSH jobs:high '{"type":"urgent_notification"}'
LPUSH jobs:medium '{"type":"email"}'
LPUSH jobs:low '{"type":"cleanup"}'

# Process high priority first
BRPOP jobs:high jobs:medium jobs:low 5

Delayed Jobs

# Schedule job for future execution (timestamp)
ZADD jobs:delayed 1640995200 '{"task":"send_reminder","user_id":123}'

# Check for ready jobs
ZRANGEBYSCORE jobs:delayed 0 1640995200 LIMIT 0 10

Job Processing Libraries

Python with Celery

# celery_app.py
from celery import Celery

app = Celery('localcloud_tasks', broker='redis://localhost:6380/1')

@app.task
def send_email(to, subject, body):
    # Process email sending
    print(f"Sending email to {to}: {subject}")
    # Actual email sending logic here
    return f"Email sent to {to}"

@app.task
def process_document(document_id):
    # Background document processing
    document = get_document(document_id)
    # Process document (OCR, analysis, etc.)
    return f"Document {document_id} processed"

# Usage
send_email.delay("user@example.com", "Welcome", "Thank you for joining!")

Python with RQ (Redis Queue)

# jobs.py
import time
from rq import Queue
from redis import Redis

# Connect to queue Redis
redis_conn = Redis(host='localhost', port=6380, db=1)
q = Queue(connection=redis_conn)

def long_running_task(duration):
    print(f"Starting task that will take {duration} seconds")
    time.sleep(duration)
    return f"Task completed after {duration} seconds"

def process_ai_request(prompt, model="llama3.2"):
    # Simulate AI processing
    import requests
    response = requests.post('http://localhost:11434/api/generate', {
        'model': model,
        'prompt': prompt,
        'stream': False
    })
    return response.json()

# Enqueue jobs
job1 = q.enqueue(long_running_task, 10)
job2 = q.enqueue(process_ai_request, "Explain quantum computing")

print(f"Job {job1.id} queued")
print(f"Job {job2.id} queued")

# Worker (run in separate process)
# python worker.py
if __name__ == '__main__':
    from rq import Worker
    worker = Worker(['default'], connection=redis_conn)
    worker.work()

Node.js with Bull

// queue.js
const Queue = require('bull');

const emailQueue = new Queue('email processing', {
    redis: { port: 6380, host: 'localhost', db: 1 }
});

const aiQueue = new Queue('ai processing', {
    redis: { port: 6380, host: 'localhost', db: 1 }
});

// Define job processors
emailQueue.process(async (job) => {
    const { to, subject, body } = job.data;
    
    console.log(`Processing email job: ${to}`);
    // Email sending logic
    await sendEmail(to, subject, body);
    
    return { status: 'sent', to };
});

aiQueue.process(async (job) => {
    const { prompt, model } = job.data;
    
    console.log(`Processing AI request: ${prompt.substring(0, 50)}...`);
    
    // Call local AI service
    const response = await fetch('http://localhost:11434/api/generate', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ model, prompt, stream: false })
    });
    
    const result = await response.json();
    return result.response;
});

// Add jobs to queue
async function queueEmail(to, subject, body) {
    const job = await emailQueue.add('send-email', { to, subject, body });
    return job.id;
}

async function queueAIRequest(prompt, model = 'llama3.2') {
    const job = await aiQueue.add('ai-request', { prompt, model }, {
        delay: 1000,  // Delay 1 second
        attempts: 3,  // Retry 3 times if failed
    });
    return job.id;
}

module.exports = { queueEmail, queueAIRequest };

Go with go-redis

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "time"
    "github.com/go-redis/redis/v8"
)

type Job struct {
    ID   string      `json:"id"`
    Type string      `json:"type"`
    Data interface{} `json:"data"`
}

type EmailJob struct {
    To      string `json:"to"`
    Subject string `json:"subject"`
    Body    string `json:"body"`
}

var ctx = context.Background()

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6380",
        DB:   1,
    })

    // Producer: Add jobs to queue
    emailJob := Job{
        ID:   "email_123",
        Type: "email",
        Data: EmailJob{
            To:      "user@example.com",
            Subject: "Welcome",
            Body:    "Thank you for joining!",
        },
    }

    jobData, _ := json.Marshal(emailJob)
    rdb.LPush(ctx, "jobs:email", jobData)

    // Consumer: Process jobs
    go func() {
        for {
            result, err := rdb.BRPop(ctx, 30*time.Second, "jobs:email").Result()
            if err != nil {
                continue
            }

            var job Job
            json.Unmarshal([]byte(result[1]), &job)
            
            fmt.Printf("Processing job: %s\n", job.ID)
            processEmailJob(job)
        }
    }()
}

func processEmailJob(job Job) {
    // Process the email job
    fmt.Printf("Sending email job: %+v\n", job.Data)
}

Advanced Queue Features

Job Retry Logic

from rq import Queue
from rq.job import Job
import redis

redis_conn = redis.Redis(host='localhost', port=6380, db=1)
q = Queue(connection=redis_conn)

def unreliable_task():
    import random
    if random.random() < 0.7:  # 70% chance of failure
        raise Exception("Task failed randomly")
    return "Success!"

# Enqueue with retry
job = q.enqueue(
    unreliable_task,
    retry=3,  # Retry up to 3 times
    job_timeout='30s'
)

Scheduled Jobs

// Schedule recurring jobs
const cron = require('node-cron');

// Every day at midnight
cron.schedule('0 0 * * *', () => {
    emailQueue.add('daily-report', { type: 'daily_summary' });
});

// Every 5 minutes
cron.schedule('*/5 * * * *', () => {
    aiQueue.add('health-check', { service: 'ollama' });
});

Job Progress Tracking

from rq import get_current_job

def long_task_with_progress(items):
    job = get_current_job()
    total = len(items)
    
    for i, item in enumerate(items):
        # Update progress
        job.meta['progress'] = f"{i+1}/{total}"
        job.save_meta()
        
        # Process item
        process_item(item)
        
    return "All items processed"

# Check job progress
job = q.enqueue(long_task_with_progress, [1,2,3,4,5])
print(job.meta.get('progress', 'Not started'))

Common Queue Use Cases

AI Application Queues

# Document processing pipeline
@app.task
def process_document_pipeline(document_id):
    # Step 1: Extract text
    text = extract_text(document_id)
    
    # Step 2: Generate embeddings
    embeddings = generate_embeddings.delay(text)
    
    # Step 3: Store in vector database
    store_embeddings.delay(document_id, embeddings.get())
    
    return f"Document {document_id} processed"

@app.task
def generate_embeddings(text):
    response = requests.post('http://localhost:11434/api/embeddings', {
        'model': 'nomic-embed-text',
        'prompt': text
    })
    return response.json()['embedding']

Notification System

@app.task
def send_notification(user_id, message, channels=None):
    channels = channels or ['email', 'push']
    
    user = get_user(user_id)
    
    if 'email' in channels:
        send_email.delay(user.email, "Notification", message)
    
    if 'push' in channels:
        send_push_notification.delay(user.device_token, message)
    
    if 'sms' in channels:
        send_sms.delay(user.phone, message)

Data Processing Workflows

@app.task
def data_ingestion_workflow(source_url):
    # Download data
    data = download_data.delay(source_url)
    
    # Clean and validate
    cleaned_data = clean_data.delay(data.get())
    
    # Store in database
    store_data.delay(cleaned_data.get())
    
    # Generate reports
    generate_report.delay(cleaned_data.get())

Monitoring and Management

Queue Monitoring

from rq import Queue
from redis import Redis

redis_conn = Redis(host='localhost', port=6380, db=1)

def queue_stats():
    q = Queue(connection=redis_conn)
    
    stats = {
        'pending': len(q),
        'started': q.started_job_registry.count,
        'finished': q.finished_job_registry.count,
        'failed': q.failed_job_registry.count,
    }
    
    return stats

Using LocalCloud CLI

# View queue logs
lc logs queue

# Monitor queue status
lc queue status

# Clear failed jobs
lc queue clear-failed

# Get queue statistics
lc queue stats

Configuration

Configure queue service in .localcloud/config.yaml:
services:
  queue:
    port: 6380
    memory_limit: 512MB
    redis:
      databases: 16
      maxmemory_policy: "allkeys-lru"
    workers:
      default: 2
      ai: 1
      email: 1

Best Practices

Job Design

# Good: Small, focused jobs
@app.task
def resize_image(image_id, size):
    image = get_image(image_id)
    resized = resize(image, size)
    save_image(resized)
    return resized.id

# Bad: Large, monolithic jobs
@app.task
def process_entire_album(album_id):
    # This could take hours and block other jobs
    pass

Error Handling

@app.task(bind=True, max_retries=3)
def robust_task(self, data):
    try:
        # Task logic here
        return process_data(data)
    except Exception as exc:
        # Log error
        print(f"Task failed: {exc}")
        
        # Retry with exponential backoff
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Resource Management

# Use connection pooling
from redis import ConnectionPool

pool = ConnectionPool(host='localhost', port=6380, max_connections=10)
redis_conn = Redis(connection_pool=pool)

Troubleshooting

Queue Issues

# Check queue service status
lc status queue

# View queue logs
lc logs queue

# Restart queue service
lc restart queue

# Clear stuck jobs
lc queue clear-all

Performance Issues

# Monitor queue memory usage
lc redis info memory --port 6380

# Check for long-running jobs
lc queue jobs --status running

# Monitor job throughput
lc queue metrics

Integration with AI Services

Queue services work seamlessly with LocalCloud’s AI components:
# Queue AI processing tasks
@app.task
def process_chat_message(user_id, message):
    # Generate AI response in background
    response = requests.post('http://localhost:11434/api/generate', {
        'model': 'llama3.2',
        'prompt': message,
        'stream': False
    })
    
    # Store conversation
    store_conversation(user_id, message, response.json()['response'])
    
    # Send response to user
    send_response_to_user(user_id, response.json()['response'])
This makes LocalCloud perfect for building responsive AI applications that don’t block the main thread while processing complex AI operations.