You are viewing an old version of this page. View the current version.
Compare with Current
View Page History
« Previous
Version 2
Next »
Uvod
Kao što je navedeno u službenoj dokumentaciji, Dask je "fleksibilna knjižnica namijenjena paralelizaciji
proračuna u Pythonu". Osim što je usmjerena razvoju paralelnog koda, glavna uloga je omogućiti lagano
skaliranje tipičnih data science problema i aplikacija na klaster, koje se tipično razvijaju na osobnim
računalima. Ovo postiže kroz kroz imitaciju poznatijih API-ja usmjerenih obradi podataka (poput Numpya i
Pandasa), oslanjanje na integriran raspoređivač poslova i činjenicu da je u potpunosti napisana u Pythonu.
Glavna su joj sučelja:
- Array - obrada tenzora s Numpy API-jem
- DataFrame - obrada strukturiranih podataka s Pandas API-jem
- Bag - obrada lista i iteratora namijenjeno tekstualnim podacima, JSON datotekama ili Python objektima
- Delayed - direktna paralelizacija koda funkcijama ili dekoratorima
- Futures - paralelizacija namijenjena izvršavanju poslova koji se mogu vršiti istodobno
Jedna od srodnih knjižnica je i Dask-ML, koja je namijenjena distribuiranom strojnom učenju putem poznatog
scikit API-ja. Više o tipičnim problemima koji se rješavaju i primjerima korištenja svakog od sučelja možete naći
na online stranicama Daska.
Dostupne verzije
Verzija | Modul |
---|
2022.11.1 | dask/2022.11.1 |
Korištenje
Za širenje na Isabelli putem SGE-a, potrebno je koristiti Dask-MPI knjižnicu kojom se stvara Dask klaster i putem
kojeg se distribuiraju poslovi korištenjem Client API-ja. Primjer obrade tipičnog dataframea ili korištenja algoritma
K sredina podnošenjem na *mpi paralelnu okolinu se nalaze ispod.
Dataframe
#$ -cwd
#$ -o output/
#$ -e output/
#$ -P SRCE-18-0000-11-2222
#$ -pe *mpi 4
# aktiviraj dask
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--interface ib0 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python example.py
import time
import dask
from dask.distributed import Client
if __name__ == '__main__':
# spoji klijenta putem datoteke scheduler.json
client = Client(scheduler_file="scheduler.json")
# kreiraj dataframe
df = dask.datasets.timeseries(freq='10ms')
# izračunaj
now = time.time()
computed_df = df.describe().compute()
df.info(memory_usage=True)
print('compute elapsed: %f' % (time.time()-now))
K-means
#$ -cwd
#$ -o output/
#$ -e output/
#$ -pe *mpi 4
# aktiviraj modul
module load dask
# pokreni dask klaster
mpirun -np $NSLOTS dask-mpi \
--nthreads 1 \
--worker-class distributed.Worker \
--scheduler-file scheduler.json &
# pričekaj
sleep 10
# potjeraj python skriptu
python run_kmeans.py
# https://examples.dask.org/machine-learning/training-on-large-datasets.html
import time
from dask_mpi import initialize
from dask.distributed import Client
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt
if __name__ == '__main__':
# spoji klijenta putem datoteke scheduler.json
client = Client(scheduler_file="scheduler.json")
# kreiraj podatke
n_clusters = 10
n_samples = 10**4
n_chunks = int(os.environ['NSLOTS'])-2
X, _ = dask_ml.datasets.make_blobs(
centers = n_clusters,
n_samples = n_samples,
chunks = n_samples//n_chunks,
)
# izračunaj
km = dask_ml.cluster.KMeans(n_clusters=n_clusters, oversampling_factor=10)
now = time.time()
km.fit(X)
print('GB: %f' % (int(X.nbytes)/1073741824))
print('elapsed fit: %f' % (time.time()-now))
Broj jezgara | Dataframe | K-means |
---|
4 | 111 | 352 |
8 | 98 | 158 |
16 | 84 | 93 |
32 | 16 | 137 |
64 | 17 | 81 |