Наразі я працюю над нейронною мережею для передавання повідомлень з напрямленими зв'язками під назвою Chemprop, яка є передовою моделлю для прогнозування властивостей на основі молекулярної структури. Оскільки я студент і маю обмежені ресурси — лише 8 ГБ або 16 ГБ ОЗУ, — мені потрібно працювати з набором даних, що містить 1 мільйон записів для мого проєкту.
Для використання Chemprop на вхід подаються SMILES-рядки та цільові значення, що зберігаються у CSV-файлі. Ці SMILES-рядки перетворюються в MoleculeDataset, який включає молекулярний граф, цільові значення та інші дані. MoleculeDataset потім передається в DataLoader для створення пакетів під час циклів навчання.
Мій ноутбук з 8 ГБ ОЗУ здатен завантажити весь CSV-файл, але неможливо одночасно перетворити всі дані на MoleculeDataset. Тому я шукав альтернативний підхід для:
- Обробки цього кроку конвертації
- Сумісності з фреймворком Chemprop.
Зрештою, я знайшов можливість використання IterableDataset у PyTorch, що ефективно вирішує це питання. Усі необхідні скрипти можна знайти на моєму GitHub.
Ілюстрація архітектури Chemprop
Скрипти
Спочатку я маю визначити кілька корисних функцій для перетворення Pandas DataFrame, що містить SMILES-рядки, в MoleculeDataset. Якщо просто викликати функцію dataset_preparator()
, весь Pandas DataFrame буде перетворено в MoleculeDataset, що неможливо при наявності лише 8 ГБ ОЗУ.
import pandas as pd
from chemprop import data, featurizers
import pandas as pd
import torch
from torch.utils.data import IterableDataset
from chemprop import data, featurizers
from sklearn.preprocessing import StandardScaler
import psutil
import os
import gc
import time
import numpy as np
def datapointpreparator(df,smilescolumn,targetcolumn):
smis = df.loc[:,smilescolumn].values
ys = df.loc[:,[target_column]].values
datapoints = [data.MoleculeDatapoint.from_smi(smi,y) for smi, y in zip(smis,ys)]
return datapoints
def datasetpreparator(df, smilescolumn, targetcolumn, featurizer = featurizers.SimpleMoleculeMolGraphFeaturizer()):
datapoints = datapointpreparator(df=df, smilescolumn=smilescolumn, targetcolumn=targetcolumn)
dataset = data.MoleculeDataset(datapoints, featurizer=featurizer)
return dataset
Таким чином, стратегія з використанням IterableDataset від PyTorch була реалізована.
class IterableMolDatapoints(IterableDataset):
'''Клас для підготовки даних для поточного потоку, який є підкласом IterableDataset.
Вихід - генератор, який по черзі повертає один об'єкт chemprop.data.datasets.Datum.
'''
def init(self, df, smilescolumn, targetcolumn, scaler = None, sizeattime=100, shuffle=True):
'''Параметри:
df (pd.DataFrame): Pandas DataFrame, що містить дані.
smilescolumn (str): Назва стовпця, що містить SMILES-рядки.
targetcolumn (str): Назва стовпця, що містить цільові значення.
scaler (StandardScaler): Об'єкт StandardScaler (вже підготовлений) для нормалізації цільових значень.
sizeattime (int): Кількість зразків, що перетворюються на chemprop.data.datasets.Datum за один раз.
shuffle (boolean): Чи потрібно перемішувати дані в df.'''
super().init()
self.df = df
self.smilescolumn = smilescolumn
self.targetcolumn = targetcolumn
self.sizeattime = sizeattime
self.shuffle= shuffle
self.scaler = scaler
def len(self):
return len(self.df)
def iter(self):
'''Функція для визначення логіки ітерації. Вона бере всі дані з CSV, перемішує їх, потім доступна лише підмножина даних для перетворення.
Вихід - генератор, який по черзі повертає chemprop.data.datasets.Datum, готовий до передачі в DataLoader.'''
'''
if self.shuffle:
dfshuffled = self.df.sample(frac=1).resetindex(drop=True)
else:
df_shuffled = self.df.copy()
# Перетворюємо Pandas DataFrame на MoleculeDataset відповідно до sizeattime, щоб уникнути перевантаження пам'яті. Це дозволяє збалансувати використання пам'яті та швидкість.
for i in range(0, len(dfshuffled), self.sizeattime):
dfattime = dfshuffled.iloc[i:i + self.sizeattime]
dfprocess = datasetpreparator(df=dfattime, smilescolumn=self.smilescolumn, targetcolumn=self.targetcolumn)
if self.scaler != None:
dfprocess.normalizetargets(self.scaler)
# Обробка паралельних процесів вручну
workerinfo = torch.utils.data.getworkerinfo()
if workerinfo is None:
for mol in dfprocess:
yield mol
else:
numworkers = workerinfo.numworkers
workerid = workerinfo.id
for i, mol in enumerate(dfprocess):
if i % numworkers == worker_id:
yield mol
Ось приклад, як використовувати клас IterableMolDatapoints
. Зверніть увагу, що якщо ви хочете нормалізувати цільові значення, це потрібно зробити вручну і потім передати об'єкт scaler
в IterableMolDatapoints
.
Підготовка даних
smilescolumn = 'smiles'
targetcolumn = 'dockingscore'
dftrain = pd.readcsv('ontheflydata.csv')
scaler = StandardScaler().fit(df[[target_column]])
Створення ітераційних даних
iterabledataset = IterableMolDatapoints(
df=dftrain,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=100, scaler=scaler, shuffle=True
)
iterabletrainloader = data.builddataloader(
iterabledataset,
batch_size=5, shuffle=False)
Використання DataLoader
for epoch in range(50):
for batch in iterabletrainloader:
# Навчання моделі або виконання інших операцій
Інший важливий момент — це параметр size_at_time
. Цей параметр допомагає збалансувати використання пам'яті з швидкістю обробки. Якщо ми встановимо size_at_time
рівним 1, клас поводитиметься як стандартний IterableDataset
, обробляючи один запис за раз. Однак, якщо ми збільшимо size_at_time
до вищого значення, скажімо, 100, процес трансформації буде працювати з 100 зразками одночасно, утримуючи їх до моменту, коли DataLoader запросить їх. Хоча цей підхід потребує більше пам'яті, він може забезпечити швидше виконання викликів DataLoader.
Використання IterableDataset
схоже на використання MapStyle Dataset
(за замовчуванням індексованого набору даних), як показано нижче.
Підготовка даних
smilescolumn = 'smiles'
targetcolumn = 'dockingscore'
dftrain = pd.readcsv('ontheflydata.csv')
scaler = StandardScaler().fit(df[[target_column]])
Створення Map даних
mapdataset = datasetpreparator(dftrain, smilescolumn, targetcolumn)
mapdataset.normalizetargets(scaler)
maploader = data.builddataloader(mapdataset, batch_size=5, shuffle=False)
Використання DataLoader
for epoch in range(50):
for batch in map_loader:
# Навчання моделі або виконання інших операцій
Два основні виклики: перемішування та паралелізація
Є два основні виклики при використанні ітераційного набору даних: перемішування та паралелізація. Оскільки я можу завантажити весь Pandas DataFrame, я перемішую його вручну за допомогою цього рядка.
dfshuffled = self.df.sample(frac=1).resetindex(drop=True)
Це можна вказати через аргумент shuffle
при ініціалізації класу. Важливо зазначити, що перемішування буде керуватися в межах IterableDataset
; при використанні DataLoader аргумент shuffle
завжди має бути вимкнений.
Щодо паралелізації, то IterableDataset
має значні проблеми з num_workers > 0
. Оскільки IterableDataset
не можна доступати за індексом, він не може вибирати індекси та розподіляти їх між робітниками.
Тому нам потрібно обробити це вручну в методі __iter__()
.
workerinfo = torch.utils.data.getworkerinfo()
if workerinfo is None:
for mol in dfprocess:
yield mol
else:
numworkers = workerinfo.numworkers
workerid = workerinfo.id
for i, mol in enumerate(dfprocess):
if i % numworkers == worker_id:
yield mol
Тест продуктивності
Я провів три тести щодо реалізації IterableDataset
.
Тест 1: Використання пам'яті при створенні наборів даних
Підготовка даних
datapath = 'ontheflydata.csv'
smilescolumn = 'smiles'
targetcolumn = 'docking_score'
df = pd.readcsv(datapath)
df = df.sample(100000)
scaler = StandardScaler().fit(df[[target_column]])
Функція для запису використання пам'яті
def memoryrecord():
process = psutil.Process(os.getpid())
mem = process.memoryinfo().rss / 1024 ** 2 # в МБ
return mem
Для Iterable Dataset
gc.collect()
starttime = time.time()
memorybefore = memoryrecord()
iterabledataset = IterableMolDatapoints(
df=df,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=100, scaler=None, shuffle=True
)
memoryafter =memoryrecord()
end_time = time.time()
gc.collect()
print(f'Використання пам\'яті для завантаження iterable dataset: {memoryafter-memorybefore} МБ ')
print(f'Час для завантаження iterable dataset: {endtime-starttime} с ')
Для Mapstyle Dataset
gc.collect()
starttime = time.time()
memorybefore = memoryrecord()
dataset = datasetpreparator(
df=df,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn
)
memoryafter = memoryrecord()
end_time = time.time()
gc.collect()
print(f'Використання пам\'яті для завантаження map dataset: {memoryafter-memorybefore} МБ ')
print(f'Час для завантаження map dataset: {endtime-starttime} с ')
Використання пам'яті для завантаження iterable dataset: 0.0 МБ
Час для завантаження iterable dataset: 0.0011718273162841797 с
Використання пам'яті для завантаження map dataset: 677.5 МБ
Час для завантаження map dataset: 9.14915418624878 с
Тест 2: Схоже поводження при передаванні IterableDataset та Mapstyle Dataset до DataLoader
Підготовка даних
smilescolumn = 'smiles'
targetcolumn = 'docking_score'
dftrain = pd.readcsv('ontheflydata.csv')
dftrain = df_train.sample(1000) # Вибираємо невеликий підмножину для ілюстрації
Створення map даних
mapdataset = datasetpreparator(dftrain, smilescolumn, targetcolumn)
maploader = data.builddataloader(mapdataset, batch_size=5, shuffle=False)
Створення iterable даних
iterabledataset = IterableMolDatapoints(
df=dftrain,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeattime=5, shuffle=False, #scaler=scaler
)
iterableloader = data.builddataloader(iterabledataset, batchsize=5, shuffle=False)
def compare_loader(loader1, loader2):
"""
Перевіряємо, чи два DataLoader генерують однакові дані в однаковому порядку.
Параметри:
- loader1, loader2: екземпляри DataLoader для порівняння
Повертає:
- bool: True, якщо загрузчики генерують однакові дані в одному порядку
Якщо їх довжина однакова:
if len(loader1) != len(loader2):
print(f"Загрузчики мають різні довжини: {len(loader1)} vs {len(loader2)}")
return False
- Порівнюємо атрибути кожної партії
for i, (batch1, batch2) in enumerate(zip(loader1, loader2)):
# Порівнюємо об'єкти MolGraph
samenodes = np.arrayequal(batch1.bmg.V, batch2.bmg.V)
sameedges = np.arrayequal(batch1.bmg.E, batch2.bmg.E)
if samenodes and sameedges:
print(f"MolGraphs ідентичні в партії {i}")
else:
print(f"MolGraphs різні в партії {i}")
return False
# Порівнюємо цілі
sametarget = np.arrayequal(batch1.Y, batch2.Y)
if same_target:
print(f"Цілі ідентичні в партії {i}")
else:
print(f"Цілі різні в партії {i}")
return False
- Порівнюємо інші атрибути, якщо потрібно
Повертаємо True, якщо всі перевірки пройшли успішно
return True
- Тестуємо схожість між двома загрузчиками даних:
iterableloader = data.builddataloader(iterabledataset, batchsize=2, shuffle=False)
maploader = data.builddataloader(mapdataset, batchsize=2, shuffle=False)
if compareloader(iterableloader, map_loader):
print("Загрузчики даних містять однакові дані в одному порядку")
else:
print("Загрузчики даних відрізняються")
MolGraphs ідентичні в партії 0
Цілі ідентичні в партії 0
MolGraphs ідентичні в партії 1
Цілі ідентичні в партії 1
...
MolGraphs ідентичні в партії 499
Цілі ідентичні в партії 499
Загрузчики даних містять однакові дані в одному порядку
Тест 3: Можливість перемішування
Підготовка даних
smilescolumn = 'smiles'
targetcolumn = 'docking_score'
dftrain = pd.readcsv('ontheflydata.csv')
dftrain10 = dftrain.sample(10) # Вибираємо невеликий підмножину для прикладу
Створення iterable даних
iterabledataset = IterableMolDatapoints(
df=dftrain10,
smilescolumn=smilescolumn,
targetcolumn=targetcolumn,
sizeat_time=5, scaler=None, shuffle=True
)
iterabletrainloader = data.builddataloader(
iterabledataset,
batch_size=5, shuffle=False)
print('Пакети даних з не нормалізованими цільовими значеннями:')
for epoch in range(2):
print(f'Епоха {epoch+1}')
for i, batch in enumerate(iterabletrainloader):
print(f'Пакет {i+1}')
print(batch.Y)
Пакети даних з не нормалізованими цільовими значеннями:
Епоха 1
Пакет 1
tensor([[-6.2346],
[-5.8180],
[-7.0793],
[-7.8406],
[-6.3524]])
Пакет 2
tensor([[-6.5732],
[-5.1087],
[-6.5070],
[-4.9264],
[-7.2945]])
Епоха 2
Пакет 1
tensor([[-7.0793],
[-6.5070],
[-6.3524],
[-6.2346],
[-5.8180]])
Пакет 2
tensor([[-4.9264],
[-7.2945],
[-7.8406],
[-5.1087],
[-6.5732]])
Основні повідомлення
IterableDataset
допомагає запобігти перевантаженню пам'яті. Ми можемо вибрати, скільки
Перекладено з: Large dataset on 8GB RAM? Let IterableDataset handle