Opis
Ray je sučelje za raspodijeljeni proračun najpoznatijih knjižnica za strojno učenje kojim se nastoji olakšati razvoj i korištenje ML aplikacija na više procesorskih jedinica i/ili čvorova. U svojoj osnovi, sastoji se od sučelja Ray Core koje omogućava razvoj distribuiranog koda kroz korištenje dekoratora, no najveću funkcionalnost pruža kroz sučelja koja raspodjeljuju dijelove tipičnog machine learning pipelinea:
Verzije
Dokumentacija
Primjeri pokretanja na Supeku
Ispod se nalaze neki primjeri funckionalnosti koje Ray pruža poput:
- Sučelja Ray Train u svrhu treniranja modela PyTorch
- Sučelja Ray Tune u svrhu optimizacije hiperparametara modela TensorFlow
- Sučelja Ray Air u svrhu pronalaženja najboljeg Scikit-Learn modela za klasifikaciju
Ray Train + PyTorch
#!/bin/bash
#PBS -l select=2:ngpus=2:ncpus=16
# environment
module load scientific/ray/2.4.0-rayproject
# cd
cd ${PBS_O_WORKDIR:-""}
# run
ray-launcher.sh train-pytorch.py
# source
# - https://docs.ray.io/en/latest/train/examples/pytorch/torch_resnet50_example.html#torch-fashion-mnist-ex
# - https://pytorch.org/vision/stable/generated/torchvision.datasets.FakeData.html#torchvision.datasets.FakeData
import os
import sys
import time
import pprint
import argparse
import datetime
from typing import Dict
from ray.air import session
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.models import resnet50
from torchvision.transforms import ToTensor
import ray.train as train
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
training_data = datasets.FakeData(size=2560,
transform=ToTensor())
test_data = datasets.FakeData(size=256,
transform=ToTensor())
def train_epoch(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) // session.get_world_size()
model.train()
for batch, (X, y) in enumerate(dataloader):
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate_epoch(dataloader, model, loss_fn):
size = len(dataloader.dataset) // session.get_world_size()
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(f"Test Error: \n "
f"Accuracy: {(100 * correct):>0.1f}%, "
f"Avg loss: {test_loss:>8f} \n")
return test_loss
export IMAGE_PATH="${PWD}/ray-2.4.0-rayproject.sif"
export PATH=${PWD}:$PATH
def train_func(config: Dict):
batch_size = config["batch_size"]
lr = config["lr"]
epochs = config["epochs"]
worker_batch_size = batch_size
# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
train_dataloader = train.torch.prepare_data_loader(train_dataloader)
test_dataloader = train.torch.prepare_data_loader(test_dataloader)
# Create model.
model = resnet50()
model = train.torch.prepare_model(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
loss_results = []
for _ in range(epochs):
train_epoch(train_dataloader, model, loss_fn, optimizer)
loss = validate_epoch(test_dataloader, model, loss_fn)
loss_results.append(loss)
session.report(dict(loss=loss))
# return required for backwards compatibility with the old API
# TODO(team-ml) clean up and remove return
return loss_results
def main():
# node & worker info
resources = ray.cluster_resources()
gpus = int(resources['GPU'])
cpus = int(resources['CPU'])
resources_per_worker = {'GPU': 1,
'CPU': (cpus-1)//gpus}
# scaling_config
scaling_config = ScalingConfig(use_gpu=True,
num_workers=gpus,
resources_per_worker=resources_per_worker)
# trainer
trainer = TorchTrainer(train_func,
scaling_config=scaling_config,
train_loop_config={"lr": 1e-3,
"epochs": 3,
"batch_size": 256})
result = trainer.fit()
print(f"Results: {result.metrics}")
if __name__ == "__main__":
import ray
ray.init(address='auto',
_node_ip_address=os.environ['NODE_IP_ADDRESS'])
main()
Ray Tune + TensorFlow
#!/bin/bash
#PBS -l select=2:ngpus=2:ncpus=16
# environment
module load scientific/ray/2.4.0-rayproject
# cd
cd ${PBS_O_WORKDIR:-""}
# run
ray-launcher.sh tune-tensorflow.py
import argparse
import os
import ray
from ray import air, tune
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.integration.keras import TuneReportCallback
import tensorrt
import numpy as np
import tensorflow as tf
from tensorflow import keras
def train_mnist(config):
# data
X = np.random.uniform(size=[samples, 224, 224, 3])
y = np.random.uniform(size=[samples, 1], low=0, high=999).astype("int64")
dataset = tf.data.Dataset.from_tensor_slices((X, y))
# model
model = tf.keras.applications.ResNet50(weights=None)
loss = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.optimizers.SGD(lr=config["lr"],
momentum=config["momentum"])
model.compile(optimizer=optimizer,
loss=loss,
metrics=["accuracy"])
# fit
model.fit(X,
y,
batch_size = batch_size,
epochs = epochs,
verbose = 0,
callbacks = [TuneReportCallback({"mean_accuracy": "accuracy"})])
def main():
# vars
epochs = 10
batch_size = 256
samples = batch_size*10
num_classes = 1000
# resources
resources = ray.cluster_resources()
gpus = int(resources['GPU'])
cpus = int(resources['CPU'])
resources_per_worker = {'GPU': 1,
'CPU': (cpus-1)//gpus}
# scheduler
sched = AsyncHyperBandScheduler(time_attr="training_iteration",
max_t=400,
grace_period=20)
# tuner
trainable = tune.with_resources(train_mnist,
resources=resources_per_worker)
param_space = {"threads": 2,
"lr": tune.uniform(0.001, 0.1),
"momentum": tune.uniform(0.1, 0.9)}
tune_config = tune.TuneConfig(metric="mean_accuracy",
mode="max",
scheduler=sched,
num_samples=10)
run_config = air.RunConfig(name="exp",
stop={"mean_accuracy": 1e-4,
"training_iteration": 100})
tuner = tune.Tuner(trainable=trainable,
tune_config=tune_config,
run_config=run_config,
param_space=param_space)
results = tuner.fit()
print("Best hyperparameters found were: ", results.get_best_result().config)
if __name__ == "__main__":
import ray
ray.init(address='auto',
_node_ip_address=os.environ['NODE_IP_ADDRESS'])
main()
Ray AIR + Scikit-Learn
#!/bin/bash
#PBS -l select=12:ngpus=1:ncpus=4
# environment
# module load scientific/ray/2.4.0-rayproject
# cd
cd ${PBS_O_WORKDIR:-""}
# run
ray-launcher.sh sklearn-automl.py
# sources:
# 1) data - https://archive.ics.uci.edu/dataset/15/breast+cancer+wisconsin+original
import os
import glob
import numpy as np
import pandas as pd
import itertools
import ray
from ray import air, tune
from ray.air import Checkpoint, session
from ray.train.sklearn import SklearnTrainer
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_validate
from sklearn.metrics import mean_squared_error, mean_absolute_error
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
def get_estimators(estimators):
for model, options in estimators.items():
options_cross = itertools.product(*options.values())
options_keys = options.keys()
for options_row in options_cross:
yield model(**{key: value for key, value in zip(options_keys, options_row)})
def cross_validate_config(config, data):
# X, y
df = data.to_pandas()
X = df.drop(columns=['id-number', 'diagnosis'])
y = df['diagnosis'].map({'M': 1, 'B': 0})
# cv
result = cross_validate(estimator=config['estimator'],
X=X,
y=y,
scoring="f1",
n_jobs=1)
# output
results = { "f1": result['test_score'].mean() }
session.report(results)
def main():
# train, test
data = ray.data.read_csv('wdbc.data')
train, test = data.train_test_split(test_size=0.2)
# estimator space
estimators = {
SVC: {
'kernel': ['linear',
'poly',
'rbf',
'sigmoid'],
'C': [1,
4,
16],
},
DecisionTreeClassifier: {
'max_depth': [None,
2,
8,
32],
'splitter': ['best',
'random'],
},
KNeighborsClassifier: {
'algorithm': ['auto',
'ball_tree',
'kd_tree',
'brute'],
'weights': ['uniform',
'distance'],
},
MLPClassifier: {
'hidden_layer_sizes': [10,
40,
160],
'activation': ['identity',
'logistic',
'tanh'],
},
}
estimators = list(get_estimators(estimators))
# grid search
trainable = tune.with_parameters(cross_validate_config, data=train)
param_space = {"estimator": tune.grid_search(estimators),
"n_splits": 5}
tune_config = tune.TuneConfig(metric="f1",
mode="max")
tuner = tune.Tuner(trainable=trainable,
param_space=param_space,
tune_config=tune_config)
result_grid = tuner.fit()
best_result = result_grid.get_best_result()
print(best_result)
print(best_result.config)
if __name__ == '__main__':
import ray
ray.init(address='auto',
_node_ip_address=os.environ['NODE_IP_ADDRESS'])
main()
Primjer pokretanja na Padobranu
#!/bin/bash
#PBS -q cpu
#PBS -l select=1:ncpus=20:mem=20GB
# environment
module load scientific/ray/v2.10.0
# cd
cd ${PBS_O_WORKDIR}
# run
ray-run python3 ime_vase_py_skripte.py
Napomene za pokretanje na Supeku