Функція обробки пакетів в Sidekiq Pro є потужним інструментом для управління згрупованими асинхронними завданнями. У цьому пості я поділюсь тим, як я побудував надійну структуру обробки пакетів, використовуючи Sidekiq Pro, з фокусом на масштабованість, обробку помилок і модульність.
Ключові компоненти
- Модуль
BatchProcessable
Цей модуль визначає контракт для будь-якого класу обробки пакетів. Він містить три ключові методи:
start_job_batch
: Ініціалізує пакет і ставить завдання в чергу.process_job_batch
: Обробляє індивідуальні завдання.finish_job_batch
: Завершує пакет після того, як всі завдання будуть виконані.
module BatchProcessable
extend ActiveSupport::Concern
module ClassMethods
def start_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
def process_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
def finish_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
end
end
Ця структура забезпечує, щоб кожен клас, що реалізує BatchProcessable
, дотримувався єдиного інтерфейсу.
2. Основні класи Worker
a. BatchWorker
Базовий клас Worker, який обробляє спільну логіку, таку як встановлення batch_class
і визначення черг.
class BatchWorker < BaseWorker
attr_accessor :batch_class, :owner, :now
def perform(owner_id, epoch, class_name, *args)
set_owner
self.batch_class = class_name.constantize
# Виконати завдання
execute # повинно бути реалізовано в дочірніх класах
end
private
def set_owner
self.owner = batch_class.find_owner owner_id
end
end
b. BatchStarterWorker
Обробляє ініціалізацію пакету та постановку завдань на обробку. Використовує start_job_batch
для генерації аргументів завдання та керує батьківсько-дочірніми відносинами для пакетів.
class BatchStarterWorker < BatchWorker
def execute(*start_job_batch_args)
batch_class.start_job_batch(owner, now, *start_job_batch_args) do |*job_args|
# Ставимо завдання в чергу тут
end
end
end
c. BatchProcessorWorker
Обробляє окремі завдання за допомогою process_job_batch
. Він гарантує, що специфічний для пакету контекст, як-от власник і часовий штамп, передається в завдання.
class BatchProcessorWorker < BatchWorker
def execute(*args)
batch_class.process_job_batch(owner, now, *args)
end
end
d. BatchFinisherWorker
Обробляє очищення після пакету за допомогою finish_job_batch
.
Це також керує зворотними викликами для завершення пакету, включаючи координацію батьківських і дочірніх пакетів.
class BatchFinisherWorker < BatchWorker
def execute(*args)
batch_class.finish_job_batch(owner, now, *args)
end
end
Як це працює
Крок 1: Визначити клас пакету
Створіть клас, який включає модуль BatchProcessable
та реалізує його методи.
class BackgroundStuff
include BatchProcessable
def self.start_job_batch(facility, now)
# Логіка для початку пакету і передавання аргументів завдання
# скажімо, знаходимо пакет і змінюємо стан на обробку
end
def self.process_job_batch(facility, now, unit_id)
# Логіка для обробки індивідуальних завдань
end
def self.finish_job_batch(facility, now, aggregator_id)
# Логіка для завершення пакету
# скажімо, знаходимо пакет і змінюємо стан на завершено
end
end
Крок 2: Запустіть пакет
Використовуйте BatchStarterWorker
, щоб ініціалізувати пакет.
BatchStarterWorker.perform_async_on_class_queue(BackgroundStuff, facility.id, Time.now.to_f)
Крок 3: Обробка завдань
BatchProcessorWorker
обробляє індивідуальні завдання асинхронно, передаючи необхідний контекст.
Крок 4: Завершення пакету
BatchFinisherWorker
спрацьовує після завершення всіх завдань для фіналізації пакету.
Розширені можливості
Обробка батьківських і дочірніх пакетів
Ця структура підтримує ієрархічну обробку пакетів, координуючи батьківські і дочірні пакети.
Обробка помилок і логування
Кожен Worker включає надійне логування та обробку помилок для забезпечення можливості відстеження проблем.
Налаштування
Ви можете налаштувати поведінку пакету (наприклад, зберігання sidekiq_batch_id
), переозначивши конкретні методи в модулі BatchProcessable
.
Ключові переваги
- Масштабованість: Використовує розподілену архітектуру Sidekiq для ефективної обробки великих навантажень.
- Повторне використання: Модуль
BatchProcessable
гарантує єдиний інтерфейс для всіх класів обробки пакетів. - Гнучкість: Підтримує користувацькі аргументи завдань, батьківсько-дочірні відносини та зворотні виклики.
Висновок
Ця структура забезпечує надійне, масштабоване рішення для обробки пакетів за допомогою Sidekiq Pro. Завдяки модульності робочого процесу, ми створили багаторазову і розширювану систему, яка може легко обробляти складні вимоги до обробки пакетів.
Перекладено з: Building a Batch Processing Framework with Sidekiq Pro