Opis
Dask je "fleksibilna knjižnica namijenjena paralelizaciji proračuna u Pythonu". Osim što je usmjerena razvoju paralelnog koda, glavna uloga je omogućiti lagano skaliranje tipičnih data science problema i aplikacija na klaster, koje se tipično razvijaju na osobnim računalima. Ovo postiže kroz kroz imitaciju poznatijih API-ja usmjerenih obradi podataka (poput Numpya i Pandasa), oslanjanje na integriran raspoređivač poslova i činjenicu da je u potpunosti napisana u Pythonu.
Glavna su joj sučelja:
- Array - obrada tenzora s Numpy API-jem
- DataFrame - obrada strukturiranih podataka s Pandas API-jem
- Bag - obrada lista i iteratora namijenjeno tekstualnim podacima, JSON datotekama ili Python objektima
- Delayed - direktna paralelizacija koda funkcijama ili dekoratorima
- Futures - paralelizacija namijenjena izvršavanju poslova koji se mogu vršiti istodobno
Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog scikit API-ja i koja omogućava skaliranje na više čvorova putem knjižnice joblib, s kojom scikit paralelizira svoje algoritme. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći na online stranicama Daska.
Dostupne verzije
verzija | modul | Supek | Padobran |
---|
2023.7.0 | scientific/dask/2023.7.0-ghcr | |
|
Dokumentacija
Korištenje
Za korištenje na Supeku, skriptu python potrebno je pokrenuti wrapperom dask-launcher.sh kojim se stvara Dask klaster i putem kojeg se distribuiraju poslovi korištenjem Client API-ja.
#!/bin/bash
# aktiviraj dask
module load scientific/dask/2023.7.0-ghcr
# pokreni python program
dask-launcher.sh moj_program.py
# pozovi python modul
import os
from dask.distributed import Client
# spoji klijenta
client = Client(os.environ['SCHEDULER_ADDRESS'])
# ostatak programa
...
Primjeri
Ispod se nalaze primjeri korištenja Daska za najpoznatije ML knjižnice (SciKit, PyTorch i TensorFlow) kao i njegovo izvorno ML sučelje Dask-ML .
Dask-ML
#!/bin/bash
#PBS -l select=4:ncpus=4
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh dask-kmeans.py
import os
import time
import pprint
from dask.distributed import Client
import dask_ml.datasets
import dask_ml.cluster
def main():
# spoji klijenta putem datoteke scheduler.json
client = Client(os.environ['SCHEDULER_ADDRESS'])
# kreiraj podatke
n_clusters = 10
n_samples = 3*10**7
n_chunks = int(os.environ['PMI_SIZE'])
X, _ = dask_ml.datasets.make_blobs(centers = n_clusters,
chunks = n_samples//n_chunks,
n_samples = n_samples)
# izračunaj
km = dask_ml.cluster.KMeans(n_clusters=n_clusters)
now = time.time()
km.fit(X)
print('GB: %f' % (int(X.nbytes)/1073741824))
print('elapsed fit: %f' % (time.time()-now))
# shutdown
client.shutdown()
if __name__ == '__main__':
main()
scikit
#!/bin/bash
#PBS -l select=4:ncpus=4
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh scikit-svc.py
import os
import time
import joblib
import pprint
import numpy as np
from dask.distributed import Client
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
def main():
# client
client = Client(os.environ['SCHEDULER_ADDRESS'])
# data
digits = load_digits()
# model
param_space = {
'C': np.logspace(-6, 6, 30),
'tol': np.logspace(-4, -1, 30),
'gamma': np.logspace(-8, 8, 30),
'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model,
param_space,
cv=3,
n_iter=1000,
verbose=10)
# fit
with joblib.parallel_backend('dask'):
search.fit(digits.data,
digits.target)
# shutdown
client.shutdown()
if __name__ == '__main__':
main()
PyTorch
#!/bin/bash
#PBS -l select=2:ngpus=2:ncpus=2
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
dask-launcher.sh pytorch-skorch.py
import os
import time
import pprint
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from dask.distributed import Client
from joblib import parallel_backend
from skorch import NeuralNetClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
class MyModule(nn.Module):
def __init__(self, num_units=10, nonlin=nn.ReLU()):
super().__init__()
self.dense0 = nn.Linear(20, num_units)
self.nonlin = nonlin
self.dropout = nn.Dropout(0.5)
self.dense1 = nn.Linear(num_units, num_units)
self.output = nn.Linear(num_units, 2)
def forward(self, X, **kwargs):
X = self.nonlin(self.dense0(X))
X = self.dropout(X)
X = self.nonlin(self.dense1(X))
X = self.output(X)
return X
def main():
# client
client = Client(os.environ['SCHEDULER_ADDRESS'])
# vars
n_samples = 256*100
batch_size = 256
max_epochs = 10
# data
X, y = make_classification(n_samples, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)
# net
net = NeuralNetClassifier(module = MyModule,
max_epochs = max_epochs,
criterion = nn.CrossEntropyLoss,
batch_size = batch_size,
train_split = None,
device = 'cuda')
# search
search = GridSearchCV(net,
param_grid = {'max_epochs': [1, 3, 10, 30],
'module__num_units': [1, 10, 100, 1000],
'module__nonlin': [nn.ReLU(), nn.Tanh()]},
scoring = 'accuracy',
error_score = 'raise',
refit = False,
verbose = 3,
cv = 3)
# fit
now = time.time()
with parallel_backend('dask'):
search.fit(X, y)
print('fit elapsed in %0.2f' % (time.time()-now))
# shutdown
client.shutdown()
if __name__ == "__main__":
main()
TensorFlow
#!/bin/bash
#PBS -l select=1:ngpus=2:ncpus=2
#PBS -l place=scatter
# environment
module load scientific/dask/2023.7.0-ghcr
# cd
cd ${PBS_O_WORKDIR:-""}
# run
export TF_FORCE_GPU_ALLOW_GROWTH="true"
dask-launcher.sh tensorflow-scikeras.py
import os
import time
import pprint
import numpy as np
import tensorrt
from tensorflow import keras
from scikeras.wrappers import KerasClassifier
from dask.distributed import Client
from joblib import parallel_backend
from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV
def get_model(hidden_layer_dim, meta):
# note that meta is a special argument that will be
# handed a dict containing input metadata
n_features_in_ = meta["n_features_in_"]
X_shape_ = meta["X_shape_"]
n_classes_ = meta["n_classes_"]
model = keras.models.Sequential()
model.add(keras.layers.Dense(n_features_in_, input_shape=X_shape_[1:]))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(hidden_layer_dim))
model.add(keras.layers.Activation("relu"))
model.add(keras.layers.Dense(n_classes_))
model.add(keras.layers.Activation("softmax"))
return model
def main():
# client
client = Client(os.environ['SCHEDULER_ADDRESS'])
# vars
n_samples = 256*1000
batch_size = 256
max_epochs = 10
# data
X, y = make_classification(1000, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)
# clf
clf = KerasClassifier(get_model,
loss="sparse_categorical_crossentropy",
hidden_layer_dim=100)
# gs
params = {
"hidden_layer_dim": [50, 100, 200],
"loss": ["sparse_categorical_crossentropy"],
"optimizer": ["adam", "sgd"],
"optimizer__learning_rate": [0.0001, 0.001, 0.1],
}
gs = GridSearchCV(clf,
param_grid = {
"hidden_layer_dim": [50, 100, 200],
"loss": ["sparse_categorical_crossentropy"],
"optimizer": ["adam", "sgd"],
"optimizer__learning_rate": [0.0001, 0.001, 0.1],
},
refit=False,
cv=3,
scoring='accuracy')
# fit
now = time.time()
with parallel_backend('dask'):
gs.fit(X, y)
print('fit elapsed in %0.2f' % (time.time()-now))
if __name__ == "__main__":
main()