Panel |
---|
borderColor | lightgray |
---|
borderWidth | 1 |
---|
titleBGColor | mediumgray |
---|
borderStyle | solid |
---|
title | Sadržaj |
---|
|
|
Uvod
Kao što je navedeno u službenoj dokumentaciji, Dask je "fleksibilna knjižnica namijenjena paralelizaciji
...
Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog
scikit API-ja i koja omogućava skaliranje na više čvorova putem knjižnice joblib, s kojom scikit paralelizira svoje
algoritme. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći na
na online stranicama Daska.
Dostupne verzije
Verzija | Modul |
---|
2022.11.1 | dask/2022.11.1 |
Korištenje
Za širenje na Isabelli putem SGE-a, potrebno je koristiti Dask-MPI knjižnicu kojom se stvara Dask klaster i putem
kojeg se distribuiraju poslovi korištenjem Client API-ja. Primjer obrade tipičnog dataframea ili korištenja algoritma. Dva su načina na koji se ovo može postignuti:
- dask-mpi: Kreiranjem dask klastera u SGE skripti prije zvanja python programa
- initialize: Inicijalizacijom dask klastera unutar python programa
U prvom slučaju (nakon zvanja dask modula u SGE skripti) potrebno je pozvati dask-mpi naredbu prije izvršavanja
python programa, dok se u python skripti treba inicijalizirati klijent s kreiranom scheduler.json datotekom:
Code Block |
---|
language | bash |
---|
title | dask-mpi SGE primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
...
# aktiviraj dask
module load dask
# pokreni dask klaster putem dask-mpi
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--interface ib0 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pokreni python program
python moj_program.py |
Code Block |
---|
language | py |
---|
title | dask-mpi python primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# pozovi python modul
from dask.distributed import Client
# spoji klijenta
client = Client(scheduler_file='scheduler.json')
# ostatak programa
... |
U drugom slučaju, dask klaster se inicijalizira unutar python skripte dok se u SGE poziva putem naredbe mpirun:
Warning |
---|
Pri inicijalizaciji u python skripti putem dask_mpi.initialize modula, nužno je izvršiti inicijalizaciju prije pozivanja dask.distributed.Client modula kao što je navedeno ispod |
Code Block |
---|
language | bash |
---|
title | initialize SGE primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
...
# aktiviraj dask
module load dask
# pokreni python program
mpirun -np $NSLOTS python moj_program.py |
Code Block |
---|
language | py |
---|
title | initialize python primjer |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# pozovi i inicijaliziraj klaster
from dask_mpi import initialize
initialize()
# pozovi i definiraj klijenta
from dask.distributed import Client
client = Client()
# izvrši program
... |
Primjeri
Primjeri obrade tipičnog dataframea, korištenja algoritma K sredina ili izabira najboljeg ML modela podnošenjem
K sredina podnošenjem na *mpi paralelnu okolinu se nalaze ispod.
...
Code Block |
---|
language | bash |
---|
title | dataframe.sge |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#$ -cwd
#$ -o output/
#$ -e output/
#$ -P SRCE-18-0000-11-2222
#$ -pe *mpi 4
# aktiviraj dask
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--interface ib0 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python example.py |
...
Code Block |
---|
language | py |
---|
title | kmeans.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# https://examples.dask.org/machine-learning/training-on-large-datasets.html
import time
from dask_mpi import initialize
from dask.distributed import Client
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt
if __name__ == '__main__':
# spoji klijenta putem datoteke scheduler.json
client = Client(scheduler_file="scheduler.json")
# kreiraj podatke
n_clusters = 10
n_samples = 10**4
n_chunks = int(os.environ['NSLOTS'])-2
X, _ = dask_ml.datasets.make_blobs(
centers = n_clusters,
n_samples = n_samples,
chunks = n_samples//n_chunks,
)
# izračunaj
km = dask_ml.cluster.KMeans(n_clusters=n_clusters, oversampling_factor=10)
now = time.time()
km.fit(X)
print('GB: %f' % (int(X.nbytes)/1073741824))
print('elapsed fit: %f' % (time.time()-now)) |
Joblib
Code Block |
---|
language | bash |
---|
title | run_joblib.sge |
---|
linenumbers | true |
---|
collapse | true |
---|
|
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 8
# aktiviraj modul
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python run_joblib.py |
Code Block |
---|
language | py |
---|
title | run_joblib.py |
---|
linenumbers | true |
---|
collapse | true |
---|
|
# source
# https://ml.dask.org/joblib.html
import time
import numpy as np
from dask.distributed import Client
import joblib
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
if __name__ == '__main__':
# client
client = Client(scheduler_file="scheduler.json")
# data
digits = load_digits()
# space
param_space = {
'C': np.logspace(-6, 6, 20),
'gamma': np.logspace(-8, 8, 20),
'tol': np.logspace(-4, -1, 20),
'class_weight': [None, 'balanced'],
}
# fit
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=10, n_iter=10**3, verbose=1)
now = time.time()
with joblib.parallel_backend('dask'):
search.fit(digits.data, digits.target)
elapsed = time.time()-now
# print
cv_results = pd.DataFrame(search.cv_results_)
print(cv_results)
print('elapsed: %is' % elapsed) |
Performanse
Broj jezgara | Dataframe [s] | K-means [s] | Joblib [s] |
---|
4 | 111 | 352 | 1287 |
8 | 98 | 158 | 1295 |
16 | 84 | 93 | 317 |
32 | 16 | 137 | 153 |
64 | 17 | 81 | 76 |
128 | 18.48 | 89 | 52 |