Dask je "fleksibilna knjižnica namijenjena paralelizaciji proračuna u Pythonu". Osim što je usmjerena razvoju paralelnog koda, glavna uloga je omogućiti lagano skaliranje tipičnih data science problema i aplikacija na klaster, koje se tipično razvijaju na osobnim računalima. Ovo postiže kroz kroz imitaciju poznatijih API-ja usmjerenih obradi podataka (poput Numpya i Pandasa), oslanjanje na integriran raspoređivač poslova i činjenicu da je u potpunosti napisana u Pythonu.
Glavna su joj sučelja:
Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog scikit API-ja i koja omogućava skaliranje na više čvorova putem knjižnice joblib, s kojom scikit paralelizira svoje algoritme. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći na online stranicama Daska.
verzija | modul | Supek | Padobran |
---|---|---|---|
2023.7.0 | scientific/dask/2023.7.0-ghcr |
Python aplikacije i knjižnice na Supeku su dostavljene u obliku kontejnera i zahtijevaju korištenje wrappera kao što je opisano ispod. Više informacija o python aplikacijama i kontejnerima na Supeku možete dobiti na sljedećim poveznicama: |
Za korištenje na Supeku, skriptu python potrebno je pokrenuti wrapperom dask-launcher.sh kojim se stvara Dask klaster i putem kojeg se distribuiraju poslovi korištenjem Client API-ja.
#!/bin/bash # aktiviraj dask module load scientific/dask/2023.7.0-ghcr # pokreni python program dask-launcher.sh moj_program.py |
# pozovi python modul import os from dask.distributed import Client # spoji klijenta client = Client(os.environ['SCHEDULER_ADDRESS']) # ostatak programa ... |
Ispod se nalaze primjeri korištenja Daska za najpoznatije ML knjižnice (SciKit, PyTorch i TensorFlow) kao i njegovo izvorno ML sučelje Dask-ML .
#!/bin/bash #PBS -l select=4:ncpus=4 #PBS -l place=scatter # environment module load scientific/dask/2023.7.0-ghcr # cd cd ${PBS_O_WORKDIR:-""} # run dask-launcher.sh dask-kmeans.py |
import os import time import pprint from dask.distributed import Client import dask_ml.datasets import dask_ml.cluster def main(): # spoji klijenta putem datoteke scheduler.json client = Client(os.environ['SCHEDULER_ADDRESS']) # kreiraj podatke n_clusters = 10 n_samples = 3*10**7 n_chunks = int(os.environ['PMI_SIZE']) X, _ = dask_ml.datasets.make_blobs(centers = n_clusters, chunks = n_samples//n_chunks, n_samples = n_samples) # izračunaj km = dask_ml.cluster.KMeans(n_clusters=n_clusters) now = time.time() km.fit(X) print('GB: %f' % (int(X.nbytes)/1073741824)) print('elapsed fit: %f' % (time.time()-now)) # shutdown client.shutdown() if __name__ == '__main__': main() |
#!/bin/bash #PBS -l select=4:ncpus=4 #PBS -l place=scatter # environment module load scientific/dask/2023.7.0-ghcr # cd cd ${PBS_O_WORKDIR:-""} # run dask-launcher.sh scikit-svc.py |
import os import time import joblib import pprint import numpy as np from dask.distributed import Client from sklearn.datasets import load_digits from sklearn.model_selection import RandomizedSearchCV from sklearn.svm import SVC def main(): # client client = Client(os.environ['SCHEDULER_ADDRESS']) # data digits = load_digits() # model param_space = { 'C': np.logspace(-6, 6, 30), 'tol': np.logspace(-4, -1, 30), 'gamma': np.logspace(-8, 8, 30), 'class_weight': [None, 'balanced'], } model = SVC(kernel='rbf') search = RandomizedSearchCV(model, param_space, cv=3, n_iter=1000, verbose=10) # fit with joblib.parallel_backend('dask'): search.fit(digits.data, digits.target) # shutdown client.shutdown() if __name__ == '__main__': main() |
#!/bin/bash #PBS -l select=2:ngpus=2:ncpus=2 #PBS -l place=scatter # environment module load scientific/dask/2023.7.0-ghcr # cd cd ${PBS_O_WORKDIR:-""} # run dask-launcher.sh pytorch-skorch.py |
import os import time import pprint import numpy as np import torch import torch.nn as nn import torch.optim as optim from dask.distributed import Client from joblib import parallel_backend from skorch import NeuralNetClassifier from sklearn.datasets import make_classification from sklearn.model_selection import GridSearchCV class MyModule(nn.Module): def __init__(self, num_units=10, nonlin=nn.ReLU()): super().__init__() self.dense0 = nn.Linear(20, num_units) self.nonlin = nonlin self.dropout = nn.Dropout(0.5) self.dense1 = nn.Linear(num_units, num_units) self.output = nn.Linear(num_units, 2) def forward(self, X, **kwargs): X = self.nonlin(self.dense0(X)) X = self.dropout(X) X = self.nonlin(self.dense1(X)) X = self.output(X) return X def main(): # client client = Client(os.environ['SCHEDULER_ADDRESS']) # vars n_samples = 256*100 batch_size = 256 max_epochs = 10 # data X, y = make_classification(n_samples, 20, n_informative=10, random_state=0) X = X.astype(np.float32) y = y.astype(np.int64) # net net = NeuralNetClassifier(module = MyModule, max_epochs = max_epochs, criterion = nn.CrossEntropyLoss, batch_size = batch_size, train_split = None, device = 'cuda') # search search = GridSearchCV(net, param_grid = {'max_epochs': [1, 3, 10, 30], 'module__num_units': [1, 10, 100, 1000], 'module__nonlin': [nn.ReLU(), nn.Tanh()]}, scoring = 'accuracy', error_score = 'raise', refit = False, verbose = 3, cv = 3) # fit now = time.time() with parallel_backend('dask'): search.fit(X, y) print('fit elapsed in %0.2f' % (time.time()-now)) # shutdown client.shutdown() if __name__ == "__main__": main() |
#!/bin/bash #PBS -l select=1:ngpus=2:ncpus=2 #PBS -l place=scatter # environment module load scientific/dask/2023.7.0-ghcr # cd cd ${PBS_O_WORKDIR:-""} # run export TF_FORCE_GPU_ALLOW_GROWTH="true" dask-launcher.sh tensorflow-scikeras.py |
import os import time import pprint import numpy as np import tensorrt from tensorflow import keras from scikeras.wrappers import KerasClassifier from dask.distributed import Client from joblib import parallel_backend from sklearn.datasets import make_classification from sklearn.model_selection import GridSearchCV def get_model(hidden_layer_dim, meta): # note that meta is a special argument that will be # handed a dict containing input metadata n_features_in_ = meta["n_features_in_"] X_shape_ = meta["X_shape_"] n_classes_ = meta["n_classes_"] model = keras.models.Sequential() model.add(keras.layers.Dense(n_features_in_, input_shape=X_shape_[1:])) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(hidden_layer_dim)) model.add(keras.layers.Activation("relu")) model.add(keras.layers.Dense(n_classes_)) model.add(keras.layers.Activation("softmax")) return model def main(): # client client = Client(os.environ['SCHEDULER_ADDRESS']) # vars n_samples = 256*1000 batch_size = 256 max_epochs = 10 # data X, y = make_classification(1000, 20, n_informative=10, random_state=0) X = X.astype(np.float32) y = y.astype(np.int64) # clf clf = KerasClassifier(get_model, loss="sparse_categorical_crossentropy", hidden_layer_dim=100) # gs params = { "hidden_layer_dim": [50, 100, 200], "loss": ["sparse_categorical_crossentropy"], "optimizer": ["adam", "sgd"], "optimizer__learning_rate": [0.0001, 0.001, 0.1], } gs = GridSearchCV(clf, param_grid = { "hidden_layer_dim": [50, 100, 200], "loss": ["sparse_categorical_crossentropy"], "optimizer": ["adam", "sgd"], "optimizer__learning_rate": [0.0001, 0.001, 0.1], }, refit=False, cv=3, scoring='accuracy') # fit now = time.time() with parallel_backend('dask'): gs.fit(X, y) print('fit elapsed in %0.2f' % (time.time()-now)) if __name__ == "__main__": main() |