Функція обробки пакетів в Sidekiq Pro є потужним інструментом для управління згрупованими асинхронними завданнями. У цьому пості я поділюсь тим, як я побудував надійну структуру обробки пакетів, використовуючи Sidekiq Pro, з фокусом на масштабованість, обробку помилок і модульність.
Ключові компоненти
- Модуль
BatchProcessable
Цей модуль визначає контракт для будь-якого класу обробки пакетів. Він містить три ключові методи:
start_job_batch: Ініціалізує пакет і ставить завдання в чергу.process_job_batch: Обробляє індивідуальні завдання.finish_job_batch: Завершує пакет після того, як всі завдання будуть виконані.
module BatchProcessable
extend ActiveSupport::Concern
module ClassMethods
def start_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
def process_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
def finish_job_batch(_owner, _now, *_args)
raise 'You must implement this method in your subclass'
end
end
end
Ця структура забезпечує, щоб кожен клас, що реалізує BatchProcessable, дотримувався єдиного інтерфейсу.
2. Основні класи Worker
a. BatchWorker
Базовий клас Worker, який обробляє спільну логіку, таку як встановлення batch_class і визначення черг.
class BatchWorker < BaseWorker
attr_accessor :batch_class, :owner, :now
def perform(owner_id, epoch, class_name, *args)
set_owner
self.batch_class = class_name.constantize
# Виконати завдання
execute # повинно бути реалізовано в дочірніх класах
end
private
def set_owner
self.owner = batch_class.find_owner owner_id
end
end
b. BatchStarterWorker
Обробляє ініціалізацію пакету та постановку завдань на обробку. Використовує start_job_batch для генерації аргументів завдання та керує батьківсько-дочірніми відносинами для пакетів.
class BatchStarterWorker < BatchWorker
def execute(*start_job_batch_args)
batch_class.start_job_batch(owner, now, *start_job_batch_args) do |*job_args|
# Ставимо завдання в чергу тут
end
end
end
c. BatchProcessorWorker
Обробляє окремі завдання за допомогою process_job_batch. Він гарантує, що специфічний для пакету контекст, як-от власник і часовий штамп, передається в завдання.
class BatchProcessorWorker < BatchWorker
def execute(*args)
batch_class.process_job_batch(owner, now, *args)
end
end
d. BatchFinisherWorker
Обробляє очищення після пакету за допомогою finish_job_batch.
Це також керує зворотними викликами для завершення пакету, включаючи координацію батьківських і дочірніх пакетів.
class BatchFinisherWorker < BatchWorker
def execute(*args)
batch_class.finish_job_batch(owner, now, *args)
end
end
Як це працює
Крок 1: Визначити клас пакету
Створіть клас, який включає модуль BatchProcessable та реалізує його методи.
class BackgroundStuff
include BatchProcessable
def self.start_job_batch(facility, now)
# Логіка для початку пакету і передавання аргументів завдання
# скажімо, знаходимо пакет і змінюємо стан на обробку
end
def self.process_job_batch(facility, now, unit_id)
# Логіка для обробки індивідуальних завдань
end
def self.finish_job_batch(facility, now, aggregator_id)
# Логіка для завершення пакету
# скажімо, знаходимо пакет і змінюємо стан на завершено
end
end
Крок 2: Запустіть пакет
Використовуйте BatchStarterWorker, щоб ініціалізувати пакет.
BatchStarterWorker.perform_async_on_class_queue(BackgroundStuff, facility.id, Time.now.to_f)
Крок 3: Обробка завдань
BatchProcessorWorker обробляє індивідуальні завдання асинхронно, передаючи необхідний контекст.
Крок 4: Завершення пакету
BatchFinisherWorker спрацьовує після завершення всіх завдань для фіналізації пакету.
Розширені можливості
Обробка батьківських і дочірніх пакетів
Ця структура підтримує ієрархічну обробку пакетів, координуючи батьківські і дочірні пакети.
Обробка помилок і логування
Кожен Worker включає надійне логування та обробку помилок для забезпечення можливості відстеження проблем.
Налаштування
Ви можете налаштувати поведінку пакету (наприклад, зберігання sidekiq_batch_id), переозначивши конкретні методи в модулі BatchProcessable.
Ключові переваги
- Масштабованість: Використовує розподілену архітектуру Sidekiq для ефективної обробки великих навантажень.
- Повторне використання: Модуль
BatchProcessableгарантує єдиний інтерфейс для всіх класів обробки пакетів. - Гнучкість: Підтримує користувацькі аргументи завдань, батьківсько-дочірні відносини та зворотні виклики.
Висновок
Ця структура забезпечує надійне, масштабоване рішення для обробки пакетів за допомогою Sidekiq Pro. Завдяки модульності робочого процесу, ми створили багаторазову і розширювану систему, яка може легко обробляти складні вимоги до обробки пакетів.
Перекладено з: Building a Batch Processing Framework with Sidekiq Pro