Створення системи черги завдань за допомогою PostgreSQL та Go: Практичний посібник

У цьому посібнику ми побудуємо надійну систему черги завдань, використовуючи PostgreSQL і Go. Ми створимо щось подібне до популярних систем черг завдань, таких як Sidekiq або Bull, але орієнтоване на додатки на Go та використання потужних можливостей PostgreSQL.

Уявіть, що ви будуєте платформу електронної комерції, де користувачі можуть завантажувати свої каталоги продуктів. Обробка цих каталогів включає кілька етапів: зміна розміру зображень, перевірка даних та оновлення інвентарю. Виконання цих завдань під час HTTP-запиту зробить сайт повільним і неприємним для користувачів. Ось тут і стає в пригоді система черги завдань!

Життєвий цикл завдання

Важливо розуміти життєвий цикл завдання. Дозвольте пояснити життєві цикли завдань простим способом, який зрозуміє навіть 20-річний:

Життєвий цикл завдання: від початку до кінця

pic

Життєвий цикл завдання (bull)

Уявіть, що завдання — це як замовлення, яке ви подаєте через додаток для доставки їжі. З моменту, коли ви робите замовлення, до моменту його доставки до дверей, воно проходить через різні етапи. Ось як працюють завдання в нашій системі черг:

Коли ви вперше подаєте завдання, воно може розпочатися двома способами:

  1. Звичайне очікування: Більшість завдань починають саме з цього. Це як стояти в черзі в кав'ярні — ви чекаєте свою чергу, поки бариста (worker) не стане вільним для вашого замовлення.
  2. Затримане початок: Це як запланувати доставку їжі на пізніше. Завдання чекає до запланованого часу, після чого переміщується на передній план.

Як тільки worker забирає ваше завдання, воно потрапляє в "активний" стан. Як коли водій доставки забирає вашу їжу і вирушає до вашої адреси. Завдання залишається активним, поки не буде оброблено.

Нарешті, відбувається одне з двох:

  • Успіх: Завдання завершується, як отримання вашої доставки.
  • Помилка: Щось йде не так, як коли доставка не може бути виконана.

Якщо завдання зазнає невдачі, система може спробувати знову, як це може статися в додатку для доставки їжі, якщо перша спроба не вдалася.

Ось і все! Кожне завдання проходить через ці етапи: очікування/затримка → активне → завершене/невдало. Все дуже просто!

Розуміння черг завдань на реальному прикладі

Уявімо, що власник магазину завантажує CSV файл з 1000 продуктів. Замість того, щоб обробляти його відразу, наша система буде:

  1. Приймати файл і швидко реагувати на користувача.
  2. Створювати завдання в черзі.
  3. Обробляти файл у фоновому режимі.
  4. Оновлювати користувача через повідомлення, коли обробка завершена.

Це допоможе зрозуміти, як завдання проходять через систему і як workers взаємодіють з чергою.

pic

Схема послідовності робочого процесу черги (mermaid)

Пояснення:

  1. Успішна обробка завдання
  • Клієнт подає завдання через API
  • Завдання вставляється в PostgreSQL зі статусом «pending»
  • Worker 1 отримує завдання за допомогою FOR UPDATE SKIP LOCKED
  • Завдання обробляється та позначається як «completed»
  1. Спроба одночасної обробки
  • Поки Worker 1 обробляє завдання, Worker 2 намагається отримати завдання
  • Завдяки SKIP LOCKED, Worker 2 не блокується і відразу бачить, що завдань немає
    Сценарій з помилкою при виконанні завдання

  • Клієнт подає нове завдання

  • Worker 2 бере завдання, але стикається з помилкою

  • Завдання позначається для повторної спроби з інкрементованим лічильником спроб

Проєктування нашої таблиці черги

Спочатку створимо таблицю, яка зберігатиме наші завдання:

CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed', 'retry');  

CREATE TABLE jobs (  
 id BIGSERIAL PRIMARY KEY,  
 queue_name TEXT NOT NULL,  
 payload JSONB NOT NULL,  
 status job_status DEFAULT 'pending',  
 priority INTEGER DEFAULT 0,  
 max_retries INTEGER DEFAULT 3,  
 retry_count INTEGER DEFAULT 0,  
 error_message TEXT,  
 scheduled_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,  
 started_at TIMESTAMPTZ,  
 completed_at TIMESTAMPTZ,  
 created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,  
 locked_by TEXT,  
 locked_at TIMESTAMPTZ,  

 -- Додаємо індекс для швидшого отримання завдань  
 CONSTRAINT valid_retry_count CHECK (retry_count <= max_retries)  
);  

-- Індекс для пошуку завдань для обробки  
CREATE INDEX idx_jobs_status_scheduled ON jobs (status, scheduled_at)   
WHERE status = 'pending' OR status = 'retry';

Розглянемо, чому ми вибрали ці стовпці:

  • queue_name: Різні черги для різних типів завдань (наприклад, "email", "catalog_processing")
  • payload: Зберігає дані, специфічні для завдання, у форматі JSON (гнучкість для різних типів завдань)
  • priority: Завдання з вищим пріоритетом обробляються першими
  • locked_by: Перешкоджає кільком workers обробляти одне і те ж завдання
  • scheduled_at: Дозволяє запланувати виконання завдання на пізніше

Обробка завдань в Go

Ось як ми реалізуємо систему workers в Go:

// Job представляє завдання в нашій черзі  
type Job struct {  
 ID int64 `db:"id"`  
 QueueName string `db:"queue_name"`  
 Payload json.RawMessage `db:"payload"`  
 Status string `db:"status"`  
 RetryCount int `db:"retry_count"`  
 MaxRetries int `db:"max_retries"`  
 ScheduledAt time.Time `db:"scheduled_at"`  
}  

// Worker обробляє завдання  
type Worker struct {  
 db *sqlx.DB  
 workerID string  
 handlers map[string]JobHandler  
}  

type JobHandler func(context.Context, json.RawMessage) error  

// FetchJob намагається отримати та заблокувати завдання для обробки  
func (w *Worker) FetchJob(ctx context.Context) (*Job, error) {  
 var job Job  

 // Використовуємо транзакцію, щоб гарантувати атомарне отримання завдання  
 tx, err := w.db.BeginTxx(ctx, nil)  
 if err != nil {  
 return nil, err  
 }  
 defer tx.Rollback()  

 // Спробуємо знайти та заблокувати завдання  
 err = tx.QueryRowxContext(ctx, `  
 UPDATE jobs   
 SET status = 'running',  
 locked_by = $1,  
 locked_at = CURRENT_TIMESTAMP,  
 started_at = CURRENT_TIMESTAMP  
 WHERE id = (  
 SELECT id   
 FROM jobs   
 WHERE (status = 'pending' OR status = 'retry')  
 AND scheduled_at <= CURRENT_TIMESTAMP  
 AND locked_by IS NULL  
 ORDER BY priority DESC, scheduled_at ASC  
 FOR UPDATE SKIP LOCKED  
 LIMIT 1  
 )  
 RETURNING *  
 `, w.workerID).StructScan(&job)  

 if err == sql.ErrNoRows {  
 return nil, nil // Завдання не знайдено  
 }  
 if err != nil {  
 return nil, err  
 }  

 return &job, tx.Commit()  
}

Ключова роль тут полягає у використанні клаузи FOR UPDATE SKIP LOCKED — вона дозволяє кільком workers одночасно отримувати завдання без конфліктів.
Коли один worker блокує завдання, інші пропускають його і переходять до наступного доступного завдання.

Реалізація обробників завдань

Тепер реалізуємо реальний обробник для обробки каталогів товарів:

type CatalogProcessingPayload struct {  
 FileURL string `json:"file_url"`  
 StoreID int64 `json:"store_id"`  
 NotifyEmail string `json:"notify_email"`  
}  

func ProcessCatalogHandler(ctx context.Context, payload json.RawMessage) error {  
 var data CatalogProcessingPayload  
 if err := json.Unmarshal(payload, &data); err != nil {  
 return fmt.Errorf("неправильний формат даних: %w", err)  
 }  

 // Завантажуємо та обробляємо файл каталогу  
 products, err := downloadAndParseCatalog(data.FileURL)  
 if err != nil {  
 return fmt.Errorf("не вдалося обробити каталог: %w", err)  
 }  

 // Оновлюємо інвентар у пакетах  
 if err := updateInventory(ctx, data.StoreID, products); err != nil {  
 return fmt.Errorf("не вдалося оновити інвентар: %w", err)  
 }  

 // Сповіщаємо користувача  
 if err := sendNotificationEmail(data.NotifyEmail); err != nil {  
 // Логування помилки, але не зупиняємо завдання через помилки сповіщення  
 log.Printf("Не вдалося надіслати сповіщення: %v", err)  
 }  

 return nil  
}

Обробка помилок та повторні спроби

Коли завдання не вдається виконати, ми хочемо спробувати його знову, збільшуючи затримки між спробами.
Ось як ми обробляємо це:

func (w *Worker) handleJobFailure(ctx context.Context, job *Job, err error) error {  
 // Обчислюємо затримку для наступної спроби за допомогою експоненціального зворотного зв'язку  
 retryDelay := time.Minute * time.Duration(math.Pow(2, float64(job.RetryCount)))  

 status := "failed"  
 if job.RetryCount < job.MaxRetries {  
 status = "retry"  
 }  

 _, err = w.db.ExecContext(ctx, `  
 UPDATE jobs   
 SET status = $1,  
 error_message = $2,  
 retry_count = retry_count + 1,  
 scheduled_at = CASE   
 WHEN retry_count < max_retries   
 THEN CURRENT_TIMESTAMP + $3::interval   
 ELSE scheduled_at   
 END,  
 locked_by = NULL,  
 locked_at = NULL  
 WHERE id = $4  
 `, status, err.Error(), retryDelay.String(), job.ID)  

 return err  
}

Створення пулу workers

Нарешті, давайте створимо пул workers для паралельної обробки завдань:

type WorkerPool struct {  
 workers []*Worker  
 quit chan struct{}  
}  

func NewWorkerPool(db *sqlx.DB, numWorkers int) *WorkerPool {  
 pool := &WorkerPool{  
 workers: make([]*Worker, numWorkers),  
 quit: make(chan struct{}),  
 }  

 for i := 0; i < numWorkers; i++ {  
 pool.workers[i] = &Worker{  
 db: db,  
 workerID: fmt.Sprintf("worker-%d", i),  
 handlers: make(map[string]JobHandler),  
 }  
 }  

 return pool  
}  

func (p *WorkerPool) Start(ctx context.Context) {  
 for _, worker := range p.workers {  
 go func(w *Worker) {  
 for {  
 select {  
 case <-ctx.Done():  
 return  
 case <-p.quit:  
 return  
 default:  
 if job, err := w.FetchJob(ctx); err != nil {  
 log.Printf("Помилка при отриманні завдання: %v", err)  
 time.Sleep(time.Second)  
 } else if job != nil {  
 if handler, ok := w.handlers[job.QueueName]; ok {  
 if err := handler(ctx, job.Payload); err != nil {  
 w.handleJobFailure(ctx, job, err)  
 } else {  
 w.completeJob(ctx, job)  
 }  
 }  
 } else {  
 // Завдань немає, чекаємо трохи  
 time.Sleep(time.Second)  
 }  
 }  
 }  
 }(worker)  
 }  
}

Використання черги у вашій програмі

Ось як використовувати цю систему у вашій програмі:

func main() {  
 db := setupDatabase()  

 // Створюємо пул workers з 5 workers  
 pool := NewWorkerPool(db, 5)  

 // Регіструємо обробники завдань  
 pool.RegisterHandler("catalog_processing", ProcessCatalogHandler)  

 // Запускаємо пул workers  
 ctx := context.Background()  
 pool.Start(ctx)  

 // Приклад: додаємо нове завдання до черги  
 _, err := db.ExecContext(ctx, `  
 INSERT INTO jobs (queue_name, payload)   
 VALUES ($1, $2)  
 `, "catalog_processing", `{  
 "file_url": "https://example.com/catalog.csv",  
 "store_id": 123,  
 "notify_email": "[email protected]"  
 }`)  
}

Кращі практики та поради

  • Моніторинг вашої черги: Відслідковуйте час обробки завдань, рівень помилок та глибину черги.
  • Створення сповіщень: Налаштуйте сповіщення для випадків, коли щось йде не так.
  • Обробка коректного завершення роботи: Переконайтесь, що ваші workers можуть завершити поточні завдання перед завершенням роботи.
  • Очищення старих завдань: Періодично архівуйте або видаляйте завершені та неуспішні завдання, щоб підтримувати розмір таблиці в межах.
  • Використання транзакцій: Завжди використовуйте транзакції при оновленні статусу завдання, щоб уникнути умов гонки.
  • Розгляд залежностей завдань: Для складних робочих процесів ви можете захотіти додати підтримку залежностей завдань, де одне завдання може початись тільки після завершення інших.

Коли використовувати цю систему

Ця система черги завдань на основі PostgreSQL ідеально підходить для:

  • Малих та середніх додатків
  • Команд, які вже використовують PostgreSQL
  • Додатків з базовими потребами в обробці завдань
  • Ситуацій, де важлива операційна простота

Розгляньте використання спеціалізованих систем черг завдань, таких як RabbitMQ або рішення на основі Redis, коли вам потрібно:

  • Дуже висока пропускна здатність (тисячі завдань на секунду)
  • Складні шаблони маршрутизації
  • Вбудоване моніторинг і управлінський інтерфейс
  • Гарантії збереження повідомлень

Підсумок

Ми побудували надійну систему черги завдань, яка:

  • Обробляє паралельну обробку завдань
  • Забезпечує надійне виконання завдань з повторними спробами
  • Підтримує плановані та пріоритетні завдання
  • Використовує потужні можливості PostgreSQL для блокування завдань та атомарних оновлень

Найкраща частина? Вона побудована за допомогою інструментів, які ви, ймовірно, вже маєте в своєму стеку! Не соромтеся розширювати цю систему новими функціями, такими як: скасування завдань, звіти про прогрес, залежності завдань, веб-дашборд для моніторингу

Не забувайте завжди моніторити вашу чергу завдань і налаштовувати відповідні сповіщення, щоб виявляти проблеми на ранніх етапах!

Якщо вам цікаво зануритися глибше в проектування систем і розробку бекенду, обов'язково слідкуйте за мною для отримання додаткових ідей, порад та практичних прикладів. Разом ми можемо досліджувати тонкощі створення ефективних систем, оптимізації продуктивності баз даних і освоєння інструментів, які приводять до успіху сучасні додатки. Приєднуйтесь до мене в цій подорожі, щоб покращити свої навички та залишатися в курсі останніх тенденцій у світі технологій! 🚀

Прочитайте дизайн системи на bahasa на iniakunhuda.com

Перекладено з: Building a Job Queue System with PostgreSQL and Go: A Practical Guide

Leave a Reply

Your email address will not be published. Required fields are marked *