У цьому посібнику ми побудуємо надійну систему черги завдань, використовуючи PostgreSQL і Go. Ми створимо щось подібне до популярних систем черг завдань, таких як Sidekiq або Bull, але орієнтоване на додатки на Go та використання потужних можливостей PostgreSQL.
Уявіть, що ви будуєте платформу електронної комерції, де користувачі можуть завантажувати свої каталоги продуктів. Обробка цих каталогів включає кілька етапів: зміна розміру зображень, перевірка даних та оновлення інвентарю. Виконання цих завдань під час HTTP-запиту зробить сайт повільним і неприємним для користувачів. Ось тут і стає в пригоді система черги завдань!
Життєвий цикл завдання
Важливо розуміти життєвий цикл завдання. Дозвольте пояснити життєві цикли завдань простим способом, який зрозуміє навіть 20-річний:
Життєвий цикл завдання: від початку до кінця
Життєвий цикл завдання (bull)
Уявіть, що завдання — це як замовлення, яке ви подаєте через додаток для доставки їжі. З моменту, коли ви робите замовлення, до моменту його доставки до дверей, воно проходить через різні етапи. Ось як працюють завдання в нашій системі черг:
Коли ви вперше подаєте завдання, воно може розпочатися двома способами:
- Звичайне очікування: Більшість завдань починають саме з цього. Це як стояти в черзі в кав'ярні — ви чекаєте свою чергу, поки бариста (worker) не стане вільним для вашого замовлення.
- Затримане початок: Це як запланувати доставку їжі на пізніше. Завдання чекає до запланованого часу, після чого переміщується на передній план.
Як тільки worker забирає ваше завдання, воно потрапляє в "активний" стан. Як коли водій доставки забирає вашу їжу і вирушає до вашої адреси. Завдання залишається активним, поки не буде оброблено.
Нарешті, відбувається одне з двох:
- Успіх: Завдання завершується, як отримання вашої доставки.
- Помилка: Щось йде не так, як коли доставка не може бути виконана.
Якщо завдання зазнає невдачі, система може спробувати знову, як це може статися в додатку для доставки їжі, якщо перша спроба не вдалася.
Ось і все! Кожне завдання проходить через ці етапи: очікування/затримка → активне → завершене/невдало. Все дуже просто!
Розуміння черг завдань на реальному прикладі
Уявімо, що власник магазину завантажує CSV файл з 1000 продуктів. Замість того, щоб обробляти його відразу, наша система буде:
- Приймати файл і швидко реагувати на користувача.
- Створювати завдання в черзі.
- Обробляти файл у фоновому режимі.
- Оновлювати користувача через повідомлення, коли обробка завершена.
Це допоможе зрозуміти, як завдання проходять через систему і як workers взаємодіють з чергою.
Схема послідовності робочого процесу черги (mermaid)
Пояснення:
- Успішна обробка завдання
- Клієнт подає завдання через API
- Завдання вставляється в PostgreSQL зі статусом «pending»
- Worker 1 отримує завдання за допомогою FOR UPDATE SKIP LOCKED
- Завдання обробляється та позначається як «completed»
- Спроба одночасної обробки
- Поки Worker 1 обробляє завдання, Worker 2 намагається отримати завдання
-
Завдяки SKIP LOCKED, Worker 2 не блокується і відразу бачить, що завдань немає
Сценарій з помилкою при виконанні завдання -
Клієнт подає нове завдання
-
Worker 2 бере завдання, але стикається з помилкою
-
Завдання позначається для повторної спроби з інкрементованим лічильником спроб
Проєктування нашої таблиці черги
Спочатку створимо таблицю, яка зберігатиме наші завдання:
CREATE TYPE job_status AS ENUM ('pending', 'running', 'completed', 'failed', 'retry');
CREATE TABLE jobs (
id BIGSERIAL PRIMARY KEY,
queue_name TEXT NOT NULL,
payload JSONB NOT NULL,
status job_status DEFAULT 'pending',
priority INTEGER DEFAULT 0,
max_retries INTEGER DEFAULT 3,
retry_count INTEGER DEFAULT 0,
error_message TEXT,
scheduled_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
locked_by TEXT,
locked_at TIMESTAMPTZ,
-- Додаємо індекс для швидшого отримання завдань
CONSTRAINT valid_retry_count CHECK (retry_count <= max_retries)
);
-- Індекс для пошуку завдань для обробки
CREATE INDEX idx_jobs_status_scheduled ON jobs (status, scheduled_at)
WHERE status = 'pending' OR status = 'retry';
Розглянемо, чому ми вибрали ці стовпці:
queue_name
: Різні черги для різних типів завдань (наприклад, "email", "catalog_processing")payload
: Зберігає дані, специфічні для завдання, у форматі JSON (гнучкість для різних типів завдань)priority
: Завдання з вищим пріоритетом обробляються першимиlocked_by
: Перешкоджає кільком workers обробляти одне і те ж завданняscheduled_at
: Дозволяє запланувати виконання завдання на пізніше
Обробка завдань в Go
Ось як ми реалізуємо систему workers в Go:
// Job представляє завдання в нашій черзі
type Job struct {
ID int64 `db:"id"`
QueueName string `db:"queue_name"`
Payload json.RawMessage `db:"payload"`
Status string `db:"status"`
RetryCount int `db:"retry_count"`
MaxRetries int `db:"max_retries"`
ScheduledAt time.Time `db:"scheduled_at"`
}
// Worker обробляє завдання
type Worker struct {
db *sqlx.DB
workerID string
handlers map[string]JobHandler
}
type JobHandler func(context.Context, json.RawMessage) error
// FetchJob намагається отримати та заблокувати завдання для обробки
func (w *Worker) FetchJob(ctx context.Context) (*Job, error) {
var job Job
// Використовуємо транзакцію, щоб гарантувати атомарне отримання завдання
tx, err := w.db.BeginTxx(ctx, nil)
if err != nil {
return nil, err
}
defer tx.Rollback()
// Спробуємо знайти та заблокувати завдання
err = tx.QueryRowxContext(ctx, `
UPDATE jobs
SET status = 'running',
locked_by = $1,
locked_at = CURRENT_TIMESTAMP,
started_at = CURRENT_TIMESTAMP
WHERE id = (
SELECT id
FROM jobs
WHERE (status = 'pending' OR status = 'retry')
AND scheduled_at <= CURRENT_TIMESTAMP
AND locked_by IS NULL
ORDER BY priority DESC, scheduled_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *
`, w.workerID).StructScan(&job)
if err == sql.ErrNoRows {
return nil, nil // Завдання не знайдено
}
if err != nil {
return nil, err
}
return &job, tx.Commit()
}
Ключова роль тут полягає у використанні клаузи FOR UPDATE SKIP LOCKED
— вона дозволяє кільком workers одночасно отримувати завдання без конфліктів.
Коли один worker блокує завдання, інші пропускають його і переходять до наступного доступного завдання.
Реалізація обробників завдань
Тепер реалізуємо реальний обробник для обробки каталогів товарів:
type CatalogProcessingPayload struct {
FileURL string `json:"file_url"`
StoreID int64 `json:"store_id"`
NotifyEmail string `json:"notify_email"`
}
func ProcessCatalogHandler(ctx context.Context, payload json.RawMessage) error {
var data CatalogProcessingPayload
if err := json.Unmarshal(payload, &data); err != nil {
return fmt.Errorf("неправильний формат даних: %w", err)
}
// Завантажуємо та обробляємо файл каталогу
products, err := downloadAndParseCatalog(data.FileURL)
if err != nil {
return fmt.Errorf("не вдалося обробити каталог: %w", err)
}
// Оновлюємо інвентар у пакетах
if err := updateInventory(ctx, data.StoreID, products); err != nil {
return fmt.Errorf("не вдалося оновити інвентар: %w", err)
}
// Сповіщаємо користувача
if err := sendNotificationEmail(data.NotifyEmail); err != nil {
// Логування помилки, але не зупиняємо завдання через помилки сповіщення
log.Printf("Не вдалося надіслати сповіщення: %v", err)
}
return nil
}
Обробка помилок та повторні спроби
Коли завдання не вдається виконати, ми хочемо спробувати його знову, збільшуючи затримки між спробами.
Ось як ми обробляємо це:
func (w *Worker) handleJobFailure(ctx context.Context, job *Job, err error) error {
// Обчислюємо затримку для наступної спроби за допомогою експоненціального зворотного зв'язку
retryDelay := time.Minute * time.Duration(math.Pow(2, float64(job.RetryCount)))
status := "failed"
if job.RetryCount < job.MaxRetries {
status = "retry"
}
_, err = w.db.ExecContext(ctx, `
UPDATE jobs
SET status = $1,
error_message = $2,
retry_count = retry_count + 1,
scheduled_at = CASE
WHEN retry_count < max_retries
THEN CURRENT_TIMESTAMP + $3::interval
ELSE scheduled_at
END,
locked_by = NULL,
locked_at = NULL
WHERE id = $4
`, status, err.Error(), retryDelay.String(), job.ID)
return err
}
Створення пулу workers
Нарешті, давайте створимо пул workers для паралельної обробки завдань:
type WorkerPool struct {
workers []*Worker
quit chan struct{}
}
func NewWorkerPool(db *sqlx.DB, numWorkers int) *WorkerPool {
pool := &WorkerPool{
workers: make([]*Worker, numWorkers),
quit: make(chan struct{}),
}
for i := 0; i < numWorkers; i++ {
pool.workers[i] = &Worker{
db: db,
workerID: fmt.Sprintf("worker-%d", i),
handlers: make(map[string]JobHandler),
}
}
return pool
}
func (p *WorkerPool) Start(ctx context.Context) {
for _, worker := range p.workers {
go func(w *Worker) {
for {
select {
case <-ctx.Done():
return
case <-p.quit:
return
default:
if job, err := w.FetchJob(ctx); err != nil {
log.Printf("Помилка при отриманні завдання: %v", err)
time.Sleep(time.Second)
} else if job != nil {
if handler, ok := w.handlers[job.QueueName]; ok {
if err := handler(ctx, job.Payload); err != nil {
w.handleJobFailure(ctx, job, err)
} else {
w.completeJob(ctx, job)
}
}
} else {
// Завдань немає, чекаємо трохи
time.Sleep(time.Second)
}
}
}
}(worker)
}
}
Використання черги у вашій програмі
Ось як використовувати цю систему у вашій програмі:
func main() {
db := setupDatabase()
// Створюємо пул workers з 5 workers
pool := NewWorkerPool(db, 5)
// Регіструємо обробники завдань
pool.RegisterHandler("catalog_processing", ProcessCatalogHandler)
// Запускаємо пул workers
ctx := context.Background()
pool.Start(ctx)
// Приклад: додаємо нове завдання до черги
_, err := db.ExecContext(ctx, `
INSERT INTO jobs (queue_name, payload)
VALUES ($1, $2)
`, "catalog_processing", `{
"file_url": "https://example.com/catalog.csv",
"store_id": 123,
"notify_email": "[email protected]"
}`)
}
Кращі практики та поради
- Моніторинг вашої черги: Відслідковуйте час обробки завдань, рівень помилок та глибину черги.
- Створення сповіщень: Налаштуйте сповіщення для випадків, коли щось йде не так.
- Обробка коректного завершення роботи: Переконайтесь, що ваші workers можуть завершити поточні завдання перед завершенням роботи.
- Очищення старих завдань: Періодично архівуйте або видаляйте завершені та неуспішні завдання, щоб підтримувати розмір таблиці в межах.
- Використання транзакцій: Завжди використовуйте транзакції при оновленні статусу завдання, щоб уникнути умов гонки.
- Розгляд залежностей завдань: Для складних робочих процесів ви можете захотіти додати підтримку залежностей завдань, де одне завдання може початись тільки після завершення інших.
Коли використовувати цю систему
Ця система черги завдань на основі PostgreSQL ідеально підходить для:
- Малих та середніх додатків
- Команд, які вже використовують PostgreSQL
- Додатків з базовими потребами в обробці завдань
- Ситуацій, де важлива операційна простота
Розгляньте використання спеціалізованих систем черг завдань, таких як RabbitMQ або рішення на основі Redis, коли вам потрібно:
- Дуже висока пропускна здатність (тисячі завдань на секунду)
- Складні шаблони маршрутизації
- Вбудоване моніторинг і управлінський інтерфейс
- Гарантії збереження повідомлень
Підсумок
Ми побудували надійну систему черги завдань, яка:
- Обробляє паралельну обробку завдань
- Забезпечує надійне виконання завдань з повторними спробами
- Підтримує плановані та пріоритетні завдання
- Використовує потужні можливості PostgreSQL для блокування завдань та атомарних оновлень
Найкраща частина? Вона побудована за допомогою інструментів, які ви, ймовірно, вже маєте в своєму стеку! Не соромтеся розширювати цю систему новими функціями, такими як: скасування завдань, звіти про прогрес, залежності завдань, веб-дашборд для моніторингу
Не забувайте завжди моніторити вашу чергу завдань і налаштовувати відповідні сповіщення, щоб виявляти проблеми на ранніх етапах!
Якщо вам цікаво зануритися глибше в проектування систем і розробку бекенду, обов'язково слідкуйте за мною для отримання додаткових ідей, порад та практичних прикладів. Разом ми можемо досліджувати тонкощі створення ефективних систем, оптимізації продуктивності баз даних і освоєння інструментів, які приводять до успіху сучасні додатки. Приєднуйтесь до мене в цій подорожі, щоб покращити свої навички та залишатися в курсі останніх тенденцій у світі технологій! 🚀
Прочитайте дизайн системи на bahasa на iniakunhuda.com
Перекладено з: Building a Job Queue System with PostgreSQL and Go: A Practical Guide