Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Image Added

Table of Contents

Uvod

Opis

Kao što je navedeno u službenoj dokumentaciji, Dask je "fleksibilna knjižnica namijenjena paralelizaciji proračuna u  Pythonu". Osim što je usmjerena razvoju paralelnog koda, glavna uloga je omogućiti lagano skaliranje tipičnih data science problema i aplikacija na klaster, koje se tipično razvijaju na osobnim računalima. Ovo postiže kroz kroz imitaciju poznatijih API-ja usmjerenih obradi podataka (poput Numpya i Pandasa), oslanjanje na integriran raspoređivač poslova i činjenicu da je u potpunosti napisana u Pythonu.

...

Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog scikit API-ja i koja omogućava skaliranje na više čvorova putem knjižnice joblib, s kojom scikit paralelizira svoje algoritme. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći na online stranicama Daska.

Dostupne verzije

verzijamodulVerzijaSupekModulPadobran
20222023.117.10scientific/dask/20222023.11.1

Korištenje

Za širenje na Isabelli putem SGE-a, potrebno je koristiti Dask-MPI knjižnicu kojom se stvara Dask klaster i putem

kojeg se distribuiraju poslovi korištenjem Client API-ja. Dva su načina na koji se ovo može postignuti:

  1. dask-mpi: Kreiranjem dask klastera u SGE skripti prije zvanja python programa
  2. initialize: Inicijalizacijom dask klastera unutar python programa

U prvom slučaju (nakon zvanja dask modula u SGE skripti) potrebno je pozvati dask-mpi naredbu prije izvršavanja

python programa, dok se u python skripti treba inicijalizirati klijent s kreiranom scheduler.json datotekom:

7.0-ghcr
(tick)


Note
titleKorištenje aplikacije na Supeku

Python aplikacije i knjižnice na Supeku su dostavljene u obliku kontejnera i zahtijevaju korištenje wrappera kao što je opisano ispod.

Više informacija o python aplikacijama i kontejnerima na Supeku možete dobiti na sljedećim poveznicama:

Dokumentacija

Korištenje

Za korištenje na Supeku, skriptu python potrebno je pokrenuti wrapperom dask-launcher.sh kojim se stvara Dask klaster i putem kojeg se distribuiraju poslovi korištenjem Client API-ja.

Code Block
languagebash
titlePrimjer skripte PBS
linenumberstrue
#!/bin/bash
Code Block
languagebash
titledask-mpi SGE primjer
linenumberstrue
collapsetrue
...

# aktiviraj dask
module load scientific/dask/2023.7.0-ghcr

# pokreni dask klaster putem dask-mpi
mpirun -np $NSLOTS dask-mpi \
    --nthreads 1 \
    --interface ib0 \
    --worker-class distributed.Worker \
    --scheduler-file scheduler.json &

# pokreni python program
python moj_program.pypython program
dask-launcher.sh moj_program.py


Code Block
languagepy
titlePrimjer skripte python
linenumberstrue
# pozovi python modul
import os
from dask.distributed import Client

# spoji klijenta
client = Client(os.environ['SCHEDULER_ADDRESS'])

# ostatak programa
...

Primjeri

Ispod se nalaze primjeri korištenja Daska za najpoznatije ML knjižnice (SciKit, PyTorch i TensorFlow) kao i njegovo izvorno ML sučelje Dask-ML .

Dask-ML

Code Block
languagebash
titledask-kmeans.sh
Code Block
languagepy
titledask-mpi python primjer
linenumberstrue
collapsetrue
# pozovi python modul
from dask.distributed import Client!/bin/bash

#PBS -l select=4:ncpus=4
#PBS -l place=scatter

# spoji klijentaenvironment
clientmodule = Client(scheduler_file='scheduler.json')load scientific/dask/2023.7.0-ghcr

# ostatak programa
...

U drugom slučaju, dask klaster se inicijalizira unutar python skripte dok se u SGE poziva putem naredbe mpirun:

Warning

Pri inicijalizaciji u python skripti putem dask_mpi.initialize modula, nužno je izvršiti inicijalizaciju prije pozivanja

dask.distributed.Client modula kao što je navedeno ispod

cd
cd ${PBS_O_WORKDIR:-""}

# run
dask-launcher.sh dask-kmeans.py


Code Block
languagepy
titledask-kmeans.py
Code Block
languagebash
titleinitialize SGE primjer
linenumberstrue
collapsetrue
...

# aktiviraj dask
module load dask

# pokreni python program
mpirun -np $NSLOTS python moj_program.py
Code Block
languagepy
titleinitialize python primjer
linenumberstrue
collapsetrue
# pozovi i inicijaliziraj klaster
from dask_mpi import initialize
initialize()

# pozovi i definiraj klijenta
from dask.distributed import Client
client = Client()

# izvrši program
...

Primjeri

Primjeri obrade tipičnog dataframea, korištenja algoritma K sredina ili izabira najboljeg ML modela podnošenjem

na *mpi paralelnu okolinu se nalaze ispod.

Dataframe

Code Block
languagebash
titledataframe.sge
linenumberstrue
collapsetrue
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 4

# aktiviraj dask
module load dask
    
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
    --nthreads 1 \
    --interface ib0 \
    --worker-class distributed.Worker \
    --scheduler-file scheduler.json &

# pričekaj
sleep 10

# potjeraj python skriptu
python example.py
Code Block
languagepy
titledataframe.py
linenumberstrue
collapsetrue
import time
import dask

from dask.distributed import Client

if __name__ == '__main__':

    # spoji klijenta putem datoteke scheduler.json
    client = Client(scheduler_file="scheduler.json"import os
import time
import pprint

from dask.distributed import Client
import dask_ml.datasets
import dask_ml.cluster

def main():

    # spoji klijenta putem datoteke scheduler.json
    client = Client(os.environ['SCHEDULER_ADDRESS'])

    # kreiraj podatke
    n_clusters = 10
    n_samples = 3*10**7
    n_chunks = int(os.environ['PMI_SIZE'])
    X, _ = dask_ml.datasets.make_blobs(centers = n_clusters,
                                       chunks = n_samples//n_chunks,
                                       n_samples = n_samples)

    # kreiraj dataframeizračunaj
    dfkm = dask_ml.datasetscluster.timeseriesKMeans(freq='10ms')

    # izračunaj
n_clusters=n_clusters)
     now = time.time()
    computed_df = df.describe().compute()
    df.info(memory_usage=True)km.fit(X)
    print('compute elapsedGB: %f' % (int(timeX.time()-now))

K-means

nbytes)/1073741824))
    print('elapsed fit: %f' % (time.time()-now))

    # shutdown
    client.shutdown()

if __name__ == '__main__':
    main()

scikit

Code Block
languagebash
titlescikit-svc.sh
linenumberstrue
collapsetrue
#!/bin/bash

#PBS -l select=4:ncpus=4
#PBS -l place=scatter

# environment
module load scientific/dask/2023.7.0-ghcr

# cd
cd ${PBS_O_WORKDIR:-""}

# run
dask-launcher.sh scikit-svc
Code Block
languagebash
titlekmeans.sge
linenumberstrue
collapsetrue
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 4

# aktiviraj modul
module load dask

# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
    --nthreads 1 \
    --worker-class distributed.Worker \
    --scheduler-file scheduler.json &

# pričekaj
sleep 10
    
# potjeraj python skriptu
python run_kmeans.py


Code Block
languagepy
titlekmeansscikit-svc.py
linenumberstrue
collapsetrue
# https://examples.dask.org/machine-learning/training-on-large-datasets.html
import os
import time
import timejoblib
import pprint
fromimport dask_mpinumpy importas initializenp

from dask.distributed import Client

import dask_ml.datasets
from sklearn.datasets import daskload_ml.clusterdigits
from sklearn.model_selection import  RandomizedSearchCV
importfrom matplotlibsklearn.pyplotsvm asimport pltSVC
    
if __name__ == '__main__'def main():

    # spojiclient
 klijenta putem datoteke scheduler.json
    client client = Client(scheduler_file="scheduler.json"os.environ['SCHEDULER_ADDRESS'])

    # kreirajdata
 podatke
   digits n_clusters = 10= load_digits()

    # model
    nparam_samplesspace = 10**4{
    n_chunks = int(os.environ['NSLOTS'])-2   'C': np.logspace(-6, 6, 30),
    X,   _ = dask_ml.datasets.make_blobs(
  'tol': np.logspace(-4, -1, 30),
      centers = n_clusters 'gamma': np.logspace(-8, 8, 30),
        n_samples = n_samples'class_weight': [None, 'balanced'],
    }
    chunksmodel = n_samples//n_chunks,SVC(kernel='rbf')
    )

search = RandomizedSearchCV(model,
     # izračunaj
    km = dask_ml.cluster.KMeans(n_clusters=n_clusters, oversampling_factor=10)
    now = time.time()
    km.fit(X)
    print('GB: %f' % (int(X.nbytes)/1073741824))
    print('elapsed fit: %f' % (time.time()-now))

Joblib

Code Block
languagebash
titlerun_joblib.sge
linenumberstrue
collapsetrue
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 8

# aktiviraj modul
module load dask

# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
    --nthreads 1 \
    --worker-class distributed.Worker \
    --scheduler-file scheduler.json &
    
# pričekaj
sleep 10
    
# potjeraj python skriptu
python run_joblib.py
Code Block
languagepy
titlerun_joblib.py
linenumberstrue
collapsetrue
# source
# https://ml.dask.org/joblib.html

import time
import numpy as np
from dask.distributed import Client

import joblib
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

if __name__ == '__main__':
    
    # client
    client = Client(scheduler_file="scheduler.json")
    
    # data
    digits = load_digits()

    # space
    param_space = {
        'C': np.logspace(-6, 6, 20),
        'gamma': np.logspace(-8, 8, 20),
        'tol': np.logspace(-4, -1, 20),
        'class_weight': [None, 'balanced'],
    }

    # fit
    model = SVC(kernel='rbf')
    search = RandomizedSearchCV(model, param_space, cv=10, n_iter=10**3, verbose=1)

    now = time.time()
    with joblib.parallel_backend('dask'):
        search.fit(digits.data, digits.target)
    elapsed = time.time()-now

    # print
    cv_results = pd.DataFrame(search.cv_results_)
    print(cv_results)
    print('elapsed: %is' % elapsed)

Performanse

...

   param_space,
                                cv=3,
                                n_iter=1000,
                                verbose=10)

    # fit
    with joblib.parallel_backend('dask'):
        search.fit(digits.data,
                   digits.target)

    # shutdown
    client.shutdown()

if __name__ == '__main__':
    main()

PyTorch

Code Block
languagebash
titlepytorch-skorch.sh
linenumberstrue
collapsetrue
#!/bin/bash

#PBS -l select=2:ngpus=2:ncpus=2
#PBS -l place=scatter

# environment
module load scientific/dask/2023.7.0-ghcr

# cd
cd ${PBS_O_WORKDIR:-""}

# run
dask-launcher.sh pytorch-skorch.py


Code Block
languagepy
titlepytorch-skorch.py
linenumberstrue
collapsetrue
import os
import time
import pprint
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

from dask.distributed import Client
from joblib import parallel_backend

from skorch import NeuralNetClassifier

from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV

class MyModule(nn.Module):
    def __init__(self, num_units=10, nonlin=nn.ReLU()):
        super().__init__()
        self.dense0 = nn.Linear(20, num_units)
        self.nonlin = nonlin
        self.dropout = nn.Dropout(0.5)
        self.dense1 = nn.Linear(num_units, num_units)
        self.output = nn.Linear(num_units, 2)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.dropout(X)
        X = self.nonlin(self.dense1(X))
        X = self.output(X)
        return X

def main():

    # client
    client = Client(os.environ['SCHEDULER_ADDRESS'])

    # vars
    n_samples = 256*100
    batch_size = 256
    max_epochs = 10

    # data
    X, y = make_classification(n_samples, 20, n_informative=10, random_state=0)
    X = X.astype(np.float32)
    y = y.astype(np.int64)

    # net
    net = NeuralNetClassifier(module = MyModule,
                              max_epochs = max_epochs,
                              criterion = nn.CrossEntropyLoss,
                              batch_size = batch_size,
                              train_split = None,
                              device = 'cuda')
    # search
    search = GridSearchCV(net,
                          param_grid = {'max_epochs': [1, 3, 10, 30],
                                        'module__num_units': [1, 10, 100, 1000],
                                        'module__nonlin': [nn.ReLU(), nn.Tanh()]},
                          scoring = 'accuracy',
                          error_score = 'raise',
                          refit = False,
                          verbose = 3,
                          cv = 3)

    # fit
    now = time.time()
    with parallel_backend('dask'):
        search.fit(X, y)
    print('fit elapsed in %0.2f' % (time.time()-now))

    # shutdown
    client.shutdown()

if __name__ == "__main__":
    main()

TensorFlow

Code Block
languagebash
titletensorflow-scikeras.sh
linenumberstrue
collapsetrue
#!/bin/bash

#PBS -l select=1:ngpus=2:ncpus=2
#PBS -l place=scatter

# environment
module load scientific/dask/2023.7.0-ghcr

# cd
cd ${PBS_O_WORKDIR:-""}

# run
export TF_FORCE_GPU_ALLOW_GROWTH="true"
dask-launcher.sh tensorflow-scikeras.py


Code Block
languagepy
titletensorflow-scikeras.py
linenumberstrue
collapsetrue
import os
import time
import pprint
import numpy as np

import tensorrt
from tensorflow import keras
from scikeras.wrappers import KerasClassifier

from dask.distributed import Client
from joblib import parallel_backend

from sklearn.datasets import make_classification
from sklearn.model_selection import GridSearchCV

def get_model(hidden_layer_dim, meta):
    # note that meta is a special argument that will be
    # handed a dict containing input metadata
    n_features_in_ = meta["n_features_in_"]
    X_shape_ = meta["X_shape_"]
    n_classes_ = meta["n_classes_"]

    model = keras.models.Sequential()
    model.add(keras.layers.Dense(n_features_in_, input_shape=X_shape_[1:]))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(hidden_layer_dim))
    model.add(keras.layers.Activation("relu"))
    model.add(keras.layers.Dense(n_classes_))
    model.add(keras.layers.Activation("softmax"))
    return model

def main():

    # client
    client = Client(os.environ['SCHEDULER_ADDRESS'])

    # vars
    n_samples = 256*1000
    batch_size = 256
    max_epochs = 10

    # data
    X, y = make_classification(1000, 20, n_informative=10, random_state=0)
    X = X.astype(np.float32)
    y = y.astype(np.int64)

    # clf
    clf = KerasClassifier(get_model,
                          loss="sparse_categorical_crossentropy",
                          hidden_layer_dim=100)

    # gs
    params = {
        "hidden_layer_dim": [50, 100, 200],
        "loss": ["sparse_categorical_crossentropy"],
        "optimizer": ["adam", "sgd"],
        "optimizer__learning_rate": [0.0001, 0.001, 0.1],
    }
    gs = GridSearchCV(clf,
                      param_grid = {
                          "hidden_layer_dim": [50, 100, 200],
                          "loss": ["sparse_categorical_crossentropy"],
                          "optimizer": ["adam", "sgd"],
                          "optimizer__learning_rate": [0.0001, 0.001, 0.1],
                      },
                      refit=False,
                      cv=3,
                      scoring='accuracy')

    # fit
    now = time.time()
    with parallel_backend('dask'):
        gs.fit(X, y)
    print('fit elapsed in %0.2f' % (time.time()-now))

if __name__ == "__main__":
    main()

...