Великий набір даних на 8 ГБ RAM? Нехай з цим впорається IterableDataset.

Наразі я працюю над нейронною мережею для передавання повідомлень з напрямленими зв'язками під назвою Chemprop, яка є передовою моделлю для прогнозування властивостей на основі молекулярної структури. Оскільки я студент і маю обмежені ресурси — лише 8 ГБ або 16 ГБ ОЗУ, — мені потрібно працювати з набором даних, що містить 1 мільйон записів для мого проєкту.

Для використання Chemprop на вхід подаються SMILES-рядки та цільові значення, що зберігаються у CSV-файлі. Ці SMILES-рядки перетворюються в MoleculeDataset, який включає молекулярний граф, цільові значення та інші дані. MoleculeDataset потім передається в DataLoader для створення пакетів під час циклів навчання.

Мій ноутбук з 8 ГБ ОЗУ здатен завантажити весь CSV-файл, але неможливо одночасно перетворити всі дані на MoleculeDataset. Тому я шукав альтернативний підхід для:

  • Обробки цього кроку конвертації
  • Сумісності з фреймворком Chemprop.

Зрештою, я знайшов можливість використання IterableDataset у PyTorch, що ефективно вирішує це питання. Усі необхідні скрипти можна знайти на моєму GitHub.

pic

Ілюстрація архітектури Chemprop

Скрипти

Спочатку я маю визначити кілька корисних функцій для перетворення Pandas DataFrame, що містить SMILES-рядки, в MoleculeDataset. Якщо просто викликати функцію dataset_preparator(), весь Pandas DataFrame буде перетворено в MoleculeDataset, що неможливо при наявності лише 8 ГБ ОЗУ.

import pandas as pd
from chemprop import data, featurizers
import pandas as pd
import torch
from torch.utils.data import IterableDataset
from chemprop import data, featurizers
from sklearn.preprocessing import StandardScaler
import psutil
import os
import gc
import time
import numpy as np

def datapointpreparator(df,smilescolumn,targetcolumn):
smis = df.loc[:,smiles
column].values
ys = df.loc[:,[target_column]].values

datapoints = [data.MoleculeDatapoint.from_smi(smi,y) for smi, y in zip(smis,ys)]
return datapoints

def datasetpreparator(df, smilescolumn, targetcolumn, featurizer = featurizers.SimpleMoleculeMolGraphFeaturizer()):
datapoints = datapoint
preparator(df=df, smilescolumn=smilescolumn, targetcolumn=targetcolumn)
dataset = data.MoleculeDataset(datapoints, featurizer=featurizer)
return dataset

Таким чином, стратегія з використанням IterableDataset від PyTorch була реалізована.

class IterableMolDatapoints(IterableDataset):
'''Клас для підготовки даних для поточного потоку, який є підкласом IterableDataset.
Вихід - генератор, який по черзі повертає один об'єкт chemprop.data.datasets.Datum.
'''

def init(self, df, smilescolumn, targetcolumn, scaler = None, sizeattime=100, shuffle=True):
'''Параметри:


df (pd.DataFrame): Pandas DataFrame, що містить дані.
smilescolumn (str): Назва стовпця, що містить SMILES-рядки.
target
column (str): Назва стовпця, що містить цільові значення.
scaler (StandardScaler): Об'єкт StandardScaler (вже підготовлений) для нормалізації цільових значень.
sizeattime (int): Кількість зразків, що перетворюються на chemprop.data.datasets.Datum за один раз.
shuffle (boolean): Чи потрібно перемішувати дані в df.'''

super().init()
self.df = df
self.smilescolumn = smilescolumn
self.targetcolumn = targetcolumn
self.sizeattime = sizeattime
self.shuffle= shuffle
self.scaler = scaler

def len(self):
return len(self.df)

def iter(self):
'''Функція для визначення логіки ітерації. Вона бере всі дані з CSV, перемішує їх, потім доступна лише підмножина даних для перетворення.
Вихід - генератор, який по черзі повертає chemprop.data.datasets.Datum, готовий до передачі в DataLoader.'''

'''

if self.shuffle:
dfshuffled = self.df.sample(frac=1).resetindex(drop=True)
else:
df_shuffled = self.df.copy()

# Перетворюємо Pandas DataFrame на MoleculeDataset відповідно до sizeattime, щоб уникнути перевантаження пам'яті. Це дозволяє збалансувати використання пам'яті та швидкість.
for i in range(0, len(dfshuffled), self.sizeattime):
df
attime = dfshuffled.iloc[i:i + self.sizeattime]
dfprocess = datasetpreparator(df=dfattime, smilescolumn=self.smilescolumn, targetcolumn=self.targetcolumn)

if self.scaler != None:
dfprocess.normalizetargets(self.scaler)

# Обробка паралельних процесів вручну
workerinfo = torch.utils.data.getworkerinfo()
if worker
info is None:
for mol in dfprocess:
yield mol
else:
num
workers = workerinfo.numworkers
workerid = workerinfo.id
for i, mol in enumerate(dfprocess):
if i % num
workers == worker_id:
yield mol

Ось приклад, як використовувати клас IterableMolDatapoints. Зверніть увагу, що якщо ви хочете нормалізувати цільові значення, це потрібно зробити вручну і потім передати об'єкт scaler в IterableMolDatapoints.

Підготовка даних

smilescolumn = 'smiles'
target
column = 'dockingscore'
df
train = pd.readcsv('ontheflydata.csv')
scaler = StandardScaler().fit(df[[target_column]])

Створення ітераційних даних

iterabledataset = IterableMolDatapoints(
df=df
train,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=100, scaler=scaler, shuffle=True
)

iterabletrainloader = data.builddataloader(
iterable
dataset,
batch_size=5, shuffle=False)

Використання DataLoader

for epoch in range(50):
for batch in iterabletrainloader:
# Навчання моделі або виконання інших операцій

Інший важливий момент — це параметр size_at_time. Цей параметр допомагає збалансувати використання пам'яті з швидкістю обробки. Якщо ми встановимо size_at_time рівним 1, клас поводитиметься як стандартний IterableDataset, обробляючи один запис за раз. Однак, якщо ми збільшимо size_at_time до вищого значення, скажімо, 100, процес трансформації буде працювати з 100 зразками одночасно, утримуючи їх до моменту, коли DataLoader запросить їх. Хоча цей підхід потребує більше пам'яті, він може забезпечити швидше виконання викликів DataLoader.

Використання IterableDataset схоже на використання MapStyle Dataset (за замовчуванням індексованого набору даних), як показано нижче.

Підготовка даних

smilescolumn = 'smiles'
target
column = 'dockingscore'
df
train = pd.readcsv('ontheflydata.csv')
scaler = StandardScaler().fit(df[[target_column]])

Створення Map даних

mapdataset = datasetpreparator(dftrain, smilescolumn, targetcolumn)
map
dataset.normalizetargets(scaler)
map
loader = data.builddataloader(mapdataset, batch_size=5, shuffle=False)

Використання DataLoader

for epoch in range(50):
for batch in map_loader:
# Навчання моделі або виконання інших операцій

Два основні виклики: перемішування та паралелізація

Є два основні виклики при використанні ітераційного набору даних: перемішування та паралелізація. Оскільки я можу завантажити весь Pandas DataFrame, я перемішую його вручну за допомогою цього рядка.

dfshuffled = self.df.sample(frac=1).resetindex(drop=True)

Це можна вказати через аргумент shuffle при ініціалізації класу. Важливо зазначити, що перемішування буде керуватися в межах IterableDataset; при використанні DataLoader аргумент shuffle завжди має бути вимкнений.

Щодо паралелізації, то IterableDataset має значні проблеми з num_workers > 0. Оскільки IterableDataset не можна доступати за індексом, він не може вибирати індекси та розподіляти їх між робітниками.
Тому нам потрібно обробити це вручну в методі __iter__().

workerinfo = torch.utils.data.getworkerinfo()
if worker
info is None:
for mol in dfprocess:
yield mol
else:
num
workers = workerinfo.numworkers
workerid = workerinfo.id
for i, mol in enumerate(dfprocess):
if i % num
workers == worker_id:
yield mol

Тест продуктивності

Я провів три тести щодо реалізації IterableDataset.

Тест 1: Використання пам'яті при створенні наборів даних

Підготовка даних

datapath = 'ontheflydata.csv'
smilescolumn = 'smiles'
target
column = 'docking_score'

df = pd.readcsv(datapath)
df = df.sample(100000)
scaler = StandardScaler().fit(df[[target_column]])

Функція для запису використання пам'яті

def memoryrecord():
process = psutil.Process(os.getpid())
mem = process.memory
info().rss / 1024 ** 2 # в МБ
return mem

Для Iterable Dataset

gc.collect()
starttime = time.time()
memory
before = memoryrecord()
iterable
dataset = IterableMolDatapoints(
df=df,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=100, scaler=None, shuffle=True
)
memoryafter =memoryrecord()
end_time = time.time()
gc.collect()

print(f'Використання пам\'яті для завантаження iterable dataset: {memoryafter-memorybefore} МБ ')
print(f'Час для завантаження iterable dataset: {endtime-starttime} с ')

Для Mapstyle Dataset

gc.collect()
starttime = time.time()
memory
before = memoryrecord()
dataset = dataset
preparator(
df=df,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn
)
memoryafter = memoryrecord()
end_time = time.time()
gc.collect()

print(f'Використання пам\'яті для завантаження map dataset: {memoryafter-memorybefore} МБ ')
print(f'Час для завантаження map dataset: {endtime-starttime} с ')

Використання пам'яті для завантаження iterable dataset: 0.0 МБ
Час для завантаження iterable dataset: 0.0011718273162841797 с

Використання пам'яті для завантаження map dataset: 677.5 МБ
Час для завантаження map dataset: 9.14915418624878 с

Тест 2: Схоже поводження при передаванні IterableDataset та Mapstyle Dataset до DataLoader

Підготовка даних

smilescolumn = 'smiles'
target
column = 'docking_score'

dftrain = pd.readcsv('ontheflydata.csv')
df
train = df_train.sample(1000) # Вибираємо невеликий підмножину для ілюстрації

Створення map даних

mapdataset = datasetpreparator(dftrain, smilescolumn, targetcolumn)
map
loader = data.builddataloader(mapdataset, batch_size=5, shuffle=False)

Створення iterable даних

iterabledataset = IterableMolDatapoints(
df=df
train,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=5, shuffle=False, #scaler=scaler
)
iterableloader = data.builddataloader(iterabledataset, batchsize=5, shuffle=False)

def compare_loader(loader1, loader2):
"""
Перевіряємо, чи два DataLoader генерують однакові дані в однаковому порядку.

Параметри:
- loader1, loader2: екземпляри DataLoader для порівняння

Повертає:
- bool: True, якщо загрузчики генерують однакові дані в одному порядку

Якщо їх довжина однакова:

if len(loader1) != len(loader2):
print(f"Загрузчики мають різні довжини: {len(loader1)} vs {len(loader2)}")
return False

  • Порівнюємо атрибути кожної партії

for i, (batch1, batch2) in enumerate(zip(loader1, loader2)):

# Порівнюємо об'єкти MolGraph
samenodes = np.arrayequal(batch1.bmg.V, batch2.bmg.V)
sameedges = np.arrayequal(batch1.bmg.E, batch2.bmg.E)
if samenodes and sameedges:
print(f"MolGraphs ідентичні в партії {i}")
else:
print(f"MolGraphs різні в партії {i}")
return False

# Порівнюємо цілі
sametarget = np.arrayequal(batch1.Y, batch2.Y)
if same_target:
print(f"Цілі ідентичні в партії {i}")
else:
print(f"Цілі різні в партії {i}")
return False

  • Порівнюємо інші атрибути, якщо потрібно

Повертаємо True, якщо всі перевірки пройшли успішно

return True

  • Тестуємо схожість між двома загрузчиками даних:

iterableloader = data.builddataloader(iterabledataset, batchsize=2, shuffle=False)
maploader = data.builddataloader(mapdataset, batchsize=2, shuffle=False)

if compareloader(iterableloader, map_loader):
print("Загрузчики даних містять однакові дані в одному порядку")
else:
print("Загрузчики даних відрізняються")

MolGraphs ідентичні в партії 0
Цілі ідентичні в партії 0
MolGraphs ідентичні в партії 1
Цілі ідентичні в партії 1
...
MolGraphs ідентичні в партії 499
Цілі ідентичні в партії 499
Загрузчики даних містять однакові дані в одному порядку

Тест 3: Можливість перемішування

Підготовка даних

smilescolumn = 'smiles'
target
column = 'docking_score'

dftrain = pd.readcsv('ontheflydata.csv')
df
train10 = dftrain.sample(10) # Вибираємо невеликий підмножину для прикладу

Створення iterable даних

iterabledataset = IterableMolDatapoints(
df=df
train10,
smiles
column=smilescolumn,
target
column=targetcolumn,
size
at_time=5, scaler=None, shuffle=True
)

iterabletrainloader = data.builddataloader(
iterable
dataset,
batch_size=5, shuffle=False)

print('Пакети даних з не нормалізованими цільовими значеннями:')
for epoch in range(2):
print(f'Епоха {epoch+1}')
for i, batch in enumerate(iterabletrainloader):
print(f'Пакет {i+1}')
print(batch.Y)

Пакети даних з не нормалізованими цільовими значеннями:
Епоха 1
Пакет 1
tensor([[-6.2346],
[-5.8180],
[-7.0793],
[-7.8406],
[-6.3524]])
Пакет 2
tensor([[-6.5732],
[-5.1087],
[-6.5070],
[-4.9264],

[-7.2945]])

Епоха 2
Пакет 1
tensor([[-7.0793],
[-6.5070],
[-6.3524],
[-6.2346],
[-5.8180]])
Пакет 2
tensor([[-4.9264],
[-7.2945],
[-7.8406],
[-5.1087],

[-6.5732]])

Основні повідомлення

  • IterableDataset допомагає запобігти перевантаженню пам'яті. Ми можемо вибрати, скільки

Перекладено з: Large dataset on 8GB RAM? Let IterableDataset handle