Будова фреймворку для обробки пакетів з використанням Sidekiq Pro

Функція обробки пакетів в Sidekiq Pro є потужним інструментом для управління згрупованими асинхронними завданнями. У цьому пості я поділюсь тим, як я побудував надійну структуру обробки пакетів, використовуючи Sidekiq Pro, з фокусом на масштабованість, обробку помилок і модульність.

Ключові компоненти

  1. Модуль BatchProcessable

Цей модуль визначає контракт для будь-якого класу обробки пакетів. Він містить три ключові методи:

  • start_job_batch: Ініціалізує пакет і ставить завдання в чергу.
  • process_job_batch: Обробляє індивідуальні завдання.
  • finish_job_batch: Завершує пакет після того, як всі завдання будуть виконані.
module BatchProcessable  
 extend ActiveSupport::Concern  

 module ClassMethods  
 def start_job_batch(_owner, _now, *_args)  
 raise 'You must implement this method in your subclass'  
 end  

 def process_job_batch(_owner, _now, *_args)  
 raise 'You must implement this method in your subclass'  
 end  

 def finish_job_batch(_owner, _now, *_args)  
 raise 'You must implement this method in your subclass'  
 end  
 end  
end

Ця структура забезпечує, щоб кожен клас, що реалізує BatchProcessable, дотримувався єдиного інтерфейсу.

2. Основні класи Worker

a. BatchWorker

Базовий клас Worker, який обробляє спільну логіку, таку як встановлення batch_class і визначення черг.

class BatchWorker < BaseWorker  
 attr_accessor :batch_class, :owner, :now  
 def perform(owner_id, epoch, class_name, *args)  
 set_owner  
 self.batch_class = class_name.constantize  
 # Виконати завдання  
 execute # повинно бути реалізовано в дочірніх класах   
 end  

 private  

 def set_owner  
 self.owner = batch_class.find_owner owner_id  
 end  
end

b. BatchStarterWorker

Обробляє ініціалізацію пакету та постановку завдань на обробку. Використовує start_job_batch для генерації аргументів завдання та керує батьківсько-дочірніми відносинами для пакетів.

class BatchStarterWorker < BatchWorker  
 def execute(*start_job_batch_args)  
 batch_class.start_job_batch(owner, now, *start_job_batch_args) do |*job_args|  
 # Ставимо завдання в чергу тут  
 end  
 end  
end

c. BatchProcessorWorker

Обробляє окремі завдання за допомогою process_job_batch. Він гарантує, що специфічний для пакету контекст, як-от власник і часовий штамп, передається в завдання.

class BatchProcessorWorker < BatchWorker  
 def execute(*args)  
 batch_class.process_job_batch(owner, now, *args)  
 end  
end

d. BatchFinisherWorker

Обробляє очищення після пакету за допомогою finish_job_batch.
Це також керує зворотними викликами для завершення пакету, включаючи координацію батьківських і дочірніх пакетів.

class BatchFinisherWorker < BatchWorker  
 def execute(*args)  
 batch_class.finish_job_batch(owner, now, *args)  
 end  
end

Як це працює

Крок 1: Визначити клас пакету

Створіть клас, який включає модуль BatchProcessable та реалізує його методи.

class BackgroundStuff  
 include BatchProcessable  

 def self.start_job_batch(facility, now)  
 # Логіка для початку пакету і передавання аргументів завдання  
 # скажімо, знаходимо пакет і змінюємо стан на обробку  
 end  

 def self.process_job_batch(facility, now, unit_id)  
 # Логіка для обробки індивідуальних завдань  
 end  

 def self.finish_job_batch(facility, now, aggregator_id)  
 # Логіка для завершення пакету  
 # скажімо, знаходимо пакет і змінюємо стан на завершено  
 end  
end

Крок 2: Запустіть пакет

Використовуйте BatchStarterWorker, щоб ініціалізувати пакет.

BatchStarterWorker.perform_async_on_class_queue(BackgroundStuff, facility.id, Time.now.to_f)

Крок 3: Обробка завдань

BatchProcessorWorker обробляє індивідуальні завдання асинхронно, передаючи необхідний контекст.

Крок 4: Завершення пакету

BatchFinisherWorker спрацьовує після завершення всіх завдань для фіналізації пакету.

Розширені можливості

Обробка батьківських і дочірніх пакетів

Ця структура підтримує ієрархічну обробку пакетів, координуючи батьківські і дочірні пакети.

Обробка помилок і логування

Кожен Worker включає надійне логування та обробку помилок для забезпечення можливості відстеження проблем.

Налаштування

Ви можете налаштувати поведінку пакету (наприклад, зберігання sidekiq_batch_id), переозначивши конкретні методи в модулі BatchProcessable.

Ключові переваги

  • Масштабованість: Використовує розподілену архітектуру Sidekiq для ефективної обробки великих навантажень.
  • Повторне використання: Модуль BatchProcessable гарантує єдиний інтерфейс для всіх класів обробки пакетів.
  • Гнучкість: Підтримує користувацькі аргументи завдань, батьківсько-дочірні відносини та зворотні виклики.

Висновок

Ця структура забезпечує надійне, масштабоване рішення для обробки пакетів за допомогою Sidekiq Pro. Завдяки модульності робочого процесу, ми створили багаторазову і розширювану систему, яка може легко обробляти складні вимоги до обробки пакетів.

Перекладено з: Building a Batch Processing Framework with Sidekiq Pro

Leave a Reply

Your email address will not be published. Required fields are marked *