Image Added
Uvod
Opis
Kao što je navedeno u službenoj dokumentaciji, Dask je "fleksibilna knjižnica namijenjena paralelizaciji proračuna u Pythonu". Osim što je usmjerena razvoju paralelnog koda, glavna uloga je omogućiti lagano skaliranje tipičnih data science problema i aplikacija na klaster, koje se tipično razvijaju na osobnim računalima. Ovo postiže kroz kroz imitaciju poznatijih API-ja usmjerenih obradi podataka (poput Numpya i Pandasa), oslanjanje na integriran raspoređivač poslova i činjenicu da je u potpunosti napisana u Pythonu.
...
Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog scikit API-ja i koja omogućava skaliranje na više čvorova putem knjižnice joblib, s kojom scikit paralelizira svoje algoritme. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći na online stranicama Daska.
Dostupne verzije
Verzija | Modul |
---|
20222023.117.10 | scientific/dask/20222023.117.10-ghcr |
Korištenje
Za širenje na Isabelli putem SGE-a, potrebno je koristiti Dask-MPI knjižnicu kojom se stvara Dask klaster i putem
kojeg se distribuiraju poslovi korištenjem Client API-ja. Dva su načina na koji se ovo može postignuti:
- dask-mpi: Kreiranjem dask klastera u SGE skripti prije zvanja python programa
- initialize: Inicijalizacijom dask klastera unutar python programa
U prvom slučaju (nakon zvanja dask modula u SGE skripti) potrebno je pozvati dask-mpi naredbu prije izvršavanja
python programa, dok se u python skripti treba inicijalizirati klijent s kreiranom scheduler.json datotekom:
Dokumentacija
Korištenje
Za korištenje na Supeku, skriptu python potrebno je pokrenuti wrapperom dask-launcher.sh kojim se stvara Dask klaster i putem kojeg se distribuiraju poslovi korištenjem Client API-ja.
Code Block |
---|
language | bash |
---|
title | Primjer skripte PBS |
---|
linenumbers | true |
---|
|
#!/bin/bash |
Code Block |
---|
language | bash |
---|
title | dask-mpi SGE primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
...
# aktiviraj dask
module load scientific/dask/2023.7.0-ghcr
# pokreni dask klaster putem dask-mpi
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--interface ib0 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pokreni python program
python moj_program.pypython program
dask-launcher.sh moj_program.py |
Code Block |
---|
language | py |
---|
title | Primjer skripte python |
---|
linenumbers | true |
---|
|
# pozovi python modul
import os
from dask.distributed import Client
# spoji klijenta
client = Client(os.environ['SCHEDULER_ADDRESS'])
# ostatak programa
... |
Primjeri
Ispod se nalaze primjeri korištenja Daska za najpoznatije ML knjižnice (SciKit, PyTorch i TensorFlow) kao i njegovo izvorno ML sučelje Dask-ML .
Dask-ML
Code Block |
---|
language | bash |
---|
title | dask-kmeans.sh |
---|
|
Code Block |
---|
language | py |
---|
title | dask-mpi python primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# pozovi python modul
from dask.distributed import Client!/bin/bash
#PBS -l select=4:ncpus=4
#PBS -l place=scatter
# spoji klijentaenvironment
clientmodule = Client(scheduler_file='scheduler.json')load scientific/dask/2023.7.0-ghcr
# ostatak programa
... |
U drugom slučaju, dask klaster se inicijalizira unutar python skripte dok se u SGE poziva putem naredbe mpirun:
Warning |
---|
Pri inicijalizaciji u python skripti putem dask_mpi.initialize modula, nužno je izvršiti inicijalizaciju prije pozivanja dask.distributed.Client modula kao što je navedeno ispod |
cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh dask-kmeans.py |
Code Block |
---|
language | py |
---|
title | dask-kmeans.py |
---|
|
Code Block |
---|
language | bash |
---|
title | initialize SGE primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
...
# aktiviraj dask
module load dask
# pokreni python program
mpirun -np $NSLOTS python moj_program.py |
Code Block |
---|
language | py |
---|
title | initialize python primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# pozovi i inicijaliziraj klaster
from dask_mpi import initialize
initialize()
# pozovi i definiraj klijenta
from dask.distributed import Client
client = Client()
# izvrši program
... |
Primjeri
Primjeri obrade tipičnog dataframea, korištenja algoritma K sredina ili izabira najboljeg ML modela podnošenjem
na *mpi paralelnu okolinu se nalaze ispod.
Dataframe
Code Block |
---|
language | bash |
---|
title | dataframe.sge |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 4
# aktiviraj dask
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--interface ib0 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python example.py |
Code Block |
---|
language | py |
---|
title | dataframe.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
import time
import dask
from dask.distributed import Client
if __name__ == '__main__':
# spoji klijenta putem datoteke scheduler.json
client = Client(scheduler_file="scheduler.json"import os
import time
import pprint
from dask.distributed import Client
import dask_ml.datasets
import dask_ml.cluster
def main():
# spoji klijenta putem datoteke scheduler.json
client = Client(os.environ['SCHEDULER_ADDRESS'])
# kreiraj podatke
n_clusters = 10
n_samples = 3*10**7
n_chunks = int(os.environ['PMI_SIZE'])
X, _ = dask_ml.datasets.make_blobs(centers = n_clusters,
chunks = n_samples//n_chunks,
n_samples = n_samples)
# kreiraj dataframeizračunaj
dfkm = dask_ml.datasetscluster.timeseriesKMeans(freq='10ms')
# izračunaj
n_clusters=n_clusters)
now = time.time()
computed_df = df.describe().compute()
df.info(memory_usage=True)km.fit(X)
print('compute elapsedGB: %f' % (int(timeX.time()-now)) |
K-means
nbytes)/1073741824))
print('elapsed fit: %f' % (time.time()-now))
# shutdown
client.shutdown()
if __name__ == '__main__':
main() |
scikit
Code Block |
---|
language | bash |
---|
title | scikit-svc.sh |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#!/bin/bash
#PBS -l select=4:ncpus=4
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh scikit-svc |
Code Block |
---|
language | bash |
---|
title | kmeans.sge |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 4
# aktiviraj modul
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python run_kmeans.py |
Code Block |
---|
language | py |
---|
title | kmeansscikit-svc.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# https://examples.dask.org/machine-learning/training-on-large-datasets.html
import os
import time
import timejoblib
import pprint
fromimport dask_mpinumpy importas initializenp
from dask.distributed import Client
import dask_ml.datasets
from sklearn.datasets import daskload_ml.clusterdigits
from sklearn.model_selection import RandomizedSearchCV
importfrom matplotlibsklearn.pyplotsvm asimport pltSVC
if __name__ == '__main__'def main():
# spojiclient
klijenta putem datoteke scheduler.json
client client = Client(scheduler_file="scheduler.json"os.environ['SCHEDULER_ADDRESS'])
# kreirajdata
podatke
digits n_clusters = 10= load_digits()
# model
nparam_samplesspace = 10**4{
n_chunks = int(os.environ['NSLOTS'])-2 'C': np.logspace(-6, 6, 30),
X, _ = dask_ml.datasets.make_blobs(
'tol': np.logspace(-4, -1, 30),
centers = n_clusters 'gamma': np.logspace(-8, 8, 30),
n_samples = n_samples'class_weight': [None, 'balanced'],
}
chunksmodel = n_samples//n_chunks,SVC(kernel='rbf')
)
search = RandomizedSearchCV(model,
# izračunaj
km = dask_ml.cluster.KMeans(n_clusters=n_clusters, oversampling_factor=10)
now = time.time()
km.fit(X)
print('GB: %f' % (int(X.nbytes)/1073741824))
print('elapsed fit: %f' % (time.time()-now)) |
Joblib
Code Block |
---|
language | bash |
---|
title | run_joblib.sge |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 8
# aktiviraj modul
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python run_joblib.py |
Code Block |
---|
language | py |
---|
title | run_joblib.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# source
# https://ml.dask.org/joblib.html
import time
import numpy as np
from dask.distributed import Client
import joblib
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
if __name__ == '__main__':
# client
client = Client(scheduler_file="scheduler.json")
# data
digits = load_digits()
# space
param_space = {
'C': np.logspace(-6, 6, 20),
'gamma': np.logspace(-8, 8, 20),
'tol': np.logspace(-4, -1, 20),
'class_weight': [None, 'balanced'],
}
# fit
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=10, n_iter=10**3, verbose=1)
now = time.time()
with joblib.parallel_backend('dask'):
search.fit(digits.data, digits.target)
elapsed = time.time()-now
# print
cv_results = pd.DataFrame(search.cv_results_)
print(cv_results)
print('elapsed: %is' % elapsed) |
Performanse
...
param_space,
cv=3,
n_iter=1000,
verbose=10)
# fit
with joblib.parallel_backend('dask'):
search.fit(digits.data,
digits.target)
# shutdown
client.shutdown()
if __name__ == '__main__':
main() |
PyTorch
Code Block |
---|
language | bash |
---|
title | pytorch-skorch.sh |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#!/bin/bash
#PBS -l select=2:ngpus=2:ncpus=2
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh pytorch-skorch.py |
Code Block |
---|
language | py |
---|
title | pytorch-skorch.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
import os
import time
import pprint
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from dask.distributed import Client
from joblib import parallel_backend
from skorch import NeuralNetClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
class MyModule(nn.Module):
def __init__(self, num_units=10, nonlin=nn.ReLU()):
super().__init__()
self.dense0 = nn.Linear(20, num_units)
self.nonlin = nonlin
self.dropout = nn.Dropout(0.5)
self.dense1 = nn.Linear(num_units, num_units)
self.output = nn.Linear(num_units, 2)
def forward(self, X, **kwargs):
X = self.nonlin(self.dense0(X))
X = self.dropout(X)
X = self.nonlin(self.dense1(X))
X = self.output(X)
return X
def main():
# client
client = Client(os.environ['SCHEDULER_ADDRESS'])
# vars
n_samples = 256*100
batch_size = 256
max_epochs = 10
# data
X, y = make_classification(n_samples, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)
# net
net = NeuralNetClassifier(module = MyModule,
max_epochs = max_epochs,
criterion = nn.CrossEntropyLoss,
batch_size = batch_size,
train_split = None,
device = 'cuda')
# search
search = GridSearchCV(net,
param_grid = {'max_epochs': [1, 3, 10, 30],
'module__num_units': [1, 10, 100, 1000],
'module__nonlin': [nn.ReLU(), nn.Tanh()]},
scoring = 'accuracy',
error_score = 'raise',
refit = False,
verbose = 3,
cv = 3)
# fit
now = time.time()
with parallel_backend('dask'):
search.fit(X, y)
print('fit elapsed in %0.2f' % (time.time()-now))
# shutdown
client.shutdown()
if __name__ == "__main__":
main() |
TensorFlow
Code Block |
---|
language | bash |
---|
title | tensorflow-scikeras.sh |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#!/bin/bash
#PBS -l select=1:ngpus=2:ncpus=2
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
export TF_FORCE_GPU_ALLOW_GROWTH="true"
dask-launcher.sh tensorflow-scikeras.py
|
Code Block |
---|
language | py |
---|
title | tensorflow-scikeras.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
import os
import time
import pprint
import numpy as np
import tensorrt
from tensorflow import keras
from scikeras.wrappers import KerasClassifier
from dask.distributed import Client
from joblib import parallel_backend
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
def get_model(hidden_layer_dim, meta):
# note that meta is a special argument that will be
# handed a dict containing input metadata
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = keras.models.Sequential()
model.add(keras.layers.Dense(n_features_in_, input_shape=X_shape_[1:]))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(n_classes_))
model.add(keras.layers.Activation("softmax"))
return model
def main():
# client
client = Client(os.environ['SCHEDULER_ADDRESS'])
# vars
n_samples = 256*1000
batch_size = 256
max_epochs = 10
# data
X, y = make_classification(1000, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)
# clf
clf = KerasClassifier(get_model,
loss="sparse_categorical_crossentropy",
hidden_layer_dim=100)
# gs
params = {
"hidden_layer_dim": [50, 100, 200],
"loss": ["sparse_categorical_crossentropy"],
"optimizer": ["adam", "sgd"],
"optimizer__learning_rate": [0.0001, 0.001, 0.1],
}
gs = GridSearchCV(clf,
param_grid = {
"hidden_layer_dim": [50, 100, 200],
"loss": ["sparse_categorical_crossentropy"],
"optimizer": ["adam", "sgd"],
"optimizer__learning_rate": [0.0001, 0.001, 0.1],
},
refit=False,
cv=3,
scoring='accuracy')
# fit
now = time.time()
with parallel_backend('dask'):
gs.fit(X, y)
print('fit elapsed in %0.2f' % (time.time()-now))
if __name__ == "__main__":
main() |
...