Skip to Content

Dask

Dask is a scalable data analytics framework for Python with interfaces to numpy, pandas and scitkit-learn. It forms one of the backbones for RAPIDS.

Dask on Blue Waters

An old version of dask is included in bwpy and for single node parallelism this can be used via:

from dask.distributed import Client, progress
client = Client(processes=False, threads_per_worker=4,
                n_workers=8, memory_limit='8GB')

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))

Newer versions can be installed manually using pip for example inside of virtual environments. Yet for the interesting use case of using jupyter with Dask this fails since jupyter (actually the ipython kernels) tend to escape from the virtual env and do not see the manually installed copy of dask. Instead of using pip's --user flag and scattering installed software into $HOME/.local we will use Python's $PYTHONUSERBASE environment variable to create a custom virtual environment.

Installing dask

We start from a fresh login with default modules loaded:

Currently Loaded Modulefiles:
  1) modules/3.2.10.4                      17) PrgEnv-cray/5.2.82
  2) eswrap/1.3.3-1.020200.1280.0          18) cray-mpich/7.7.4
  3) cce/8.7.7                             19) craype-interlagos
  4) craype-network-gemini                 20) torque/6.0.4
  5) craype/2.5.16                         21) moab/9.1.2-sles11
  6) cray-libsci/18.12.1                   22) openssh/7.5p1
  7) udreg/2.3.2-1.0502.10518.2.17.gem     23) xalt/0.7.6.local
  8) ugni/6.0-1.0502.10863.8.28.gem        24) scripts
  9) pmi/5.0.14                            25) OpenSSL/1.0.2m
 10) dmapp/7.0.1-1.0502.11080.8.74.gem     26) cURL/7.59.0
 11) gni-headers/4.0-1.0502.10859.7.8.gem  27) git/2.17.0
 12) xpmem/0.1-2.0502.64982.5.3.gem        28) wget/1.19.4
 13) dvs/2.5_0.9.0-1.0502.2188.1.113.gem   29) user-paths
 14) alps/5.2.4-2.0502.9774.31.12.gem      30) gnuplot/5.0.5
 15) rca/1.0.0-2.0502.60530.1.63.gem       31) darshan/3.1.3
 16) atp/2.0.4

then install dask like so:

module load bwpy/2.0.4
module swap cray-mpich/7.7.4 cray-mpich/7.5.0
module load bwpy-mpi
# helps for jupyter
module load gcc/5.3.0

# run our own virtualenv to work around jupyter escaping otherwise
export PYTHONUSERBASE=$PWD
# use python3.6 to get dask new enough for Security.temporary() to exist
export EPYTHON=python3.6

# limit package versions to avoid issues with setuptools
pip install --user aiohttp==3.6.3
pip install --user jupyter-server-proxy==1.6.0
jupyter serverextension enable --user jupyter_server_proxy

# even the most current distributed only lists 0.6.0 as a requirement.
# msgpack 1.0.0 with dask < 2.11 causes failures, see:
# https://github.com/dask/distributed/issues/3491
pip install --user msgpack==0.6.0
pip install --user bokeh==1.0.0
# apparently pandas 0.23 has bugs with python3.6 that lead to
# AttributeError: module 'pandas' has no attribute 'core'
pip install --user pandas==0.23.4
#pip install --user bottleneck==1.2.1 pandas==0.25.0
# this warns about the existing (very old) dask, but the warning can be ignored
# fix dask, dask-mpi and distributed versions to versions known to work
pip install --user --force-reinstall dask==2.21.0
pip install --user distributed==2.21.0 dask_mpi==2.21.0

Dask security

The default dask setup does not require any authentication to connect to the dask cluster. This would allow any user on the cluster to connect to the dask scheduler and run arbitrary code with the dask user's access. To avoid this, dask can use TLS authentication to restrict access to the scheduler and worker and can also encrypt all network traffic.

On Blue Waters, the network itself is trustworthy, so encryption is not required, yet authentication is. Using {{dask-mpi }} we construct a temporary set of security certificates and use them for all nodes in the dask cluster.

# add some authentication so that not everyone can connect to the Dask
# scheduler / workers and read our files
from distributed.security import Security
# require authentication but use "null" encryption for speed
security = Security.temporary()
# newer temporay may take arguments, ours does not yet
security.tls_ciphers = "eNULL"
from mpi4py import MPI
security = MPI.COMM_WORLD.bcast(security)

# only way to get security into workers and scheduler for MPI
import dask.config
dask.config.set({\
    'distributed.comm.require-encryption': security.require_encryption,
    'distributed.comm.tls.ciphers': security.tls_ciphers,
    'distributed.comm.tls.ca-file': security.tls_ca_file,
    'distributed.comm.tls.client.key': security.tls_client_key,
    'distributed.comm.tls.client.cert': security.tls_client_cert,
    'distributed.comm.tls.scheduler.key': security.tls_scheduler_key,
    'distributed.comm.tls.scheduler.cert': security.tls_scheduler_cert,
    'distributed.comm.tls.worker.key': security.tls_worker_key,
    'distributed.comm.tls.worker.cert': security.tls_worker_cert})

Batch processing with dask

Dask supports a large variety of schedulers to start the workers that handle the actual heavy lifting. Here we will use dask-mpi which uses MPI ranks for the scheduler (manager), the workers and the client (user) code.

#!/bin/bash
#PBS -l walltime=0:15:0
#PBS -l nodes=3:xk:ppn=16
#PBS -q debug
#PBS -e batch.err
#PBS -o batch.out

cd $PBS_O_WORKDIR

module load bwpy/2.0.4
module swap cray-mpich/7.7.4 cray-mpich/7.5.0
module load bwpy-mpi
# helps for jupyter
module load gcc/5.3.0

set -x

# run our own virtualenv to work around jupyter escaping otherwise
export PYTHONUSERBASE=$PWD
export EPYTHON=python3.6

# pacify some Python warnings about utf8
export LC_ALL=C.UTF-8
export LANG=C.UTF-8

aprun -n 48 -d 1 python ./batch.py

and a dask Python code (adapted from the dask docs):

#!/usr/bin/env python

# add some authentication so that not everyone can connect to the Dask
# scheduler / workers and read our files
from distributed.security import Security
# require authentication but use "null" encryption for speed
security = Security.temporary()
# newer temporay may take arguments, ours does not yet
security.tls_ciphers = "eNULL"
from mpi4py import MPI
security = MPI.COMM_WORLD.bcast(security)

# only way to get security into workers and scheduler for MPI
import dask.config
dask.config.set({\
    'distributed.comm.require-encryption': security.require_encryption,
    'distributed.comm.tls.ciphers': security.tls_ciphers,
    'distributed.comm.tls.ca-file': security.tls_ca_file,
    'distributed.comm.tls.client.key': security.tls_client_key,
    'distributed.comm.tls.client.cert': security.tls_client_cert,
    'distributed.comm.tls.scheduler.key': security.tls_scheduler_key,
    'distributed.comm.tls.scheduler.cert': security.tls_scheduler_cert,
    'distributed.comm.tls.worker.key': security.tls_worker_key,
    'distributed.comm.tls.worker.cert': security.tls_worker_cert})

from dask_mpi import initialize
# nannies fail on BW due to MPI DMA support interfering with fork()
initialize(interface="ipogif0", protocol="tls", nanny=False, dashboard=False)
# if dashboard is needed one needs to use:
# dashboard=True, dashboard_address="127.0.0.1:8787"
# and a way to (securely) make that port accessible

from distributed import Client
client = Client(security=security)

import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))

import time
start = time.time()

m = x.mean()
m.compute()

stop = time.time()

print("I took %gs" % (stop-start))

Interactive processing with dask

Using a file to store scheduler connection information one can use dask-mpi to start a pool of workers including a scheduler then connect to it from any Python code, including from a jupyter notebook as show in the dask documentation.

However this method has the disadvantage that the dask dashboard is either inaccessible (if bound to 127.0.0.1) or accessible without authentication to the whole cluster (if bound to the Geminin interface). Since, on Blue Waters, it is impossible to use ssh to connect to a compute node, a different way to forward (proxy) the port and provide authentication is needed.

Here, we will use run the jupyter notebook server on the same compute node as the scheduler and use jupyter-server-proxy to forward the port.

This has two advantages:

  1. the notebook is not running on the login node providing more resources to it
  2. the proxy is done via jupyter so automatically benefits from any port forwarding to connect to the jupyter notebook server

jupyter-proxy-server lets one forward to arbitrary IP addresses (in our case 127.0.0.1) and ports (8787 in our case) using this syntax:

<notebook-base>/proxy/<host>:<port>

Jupyter notebook setup

First we set up jupyter's notebook server as show in the Python notebooks page in the "Jupyter notebooks on the login nodes" section.

Once the notebook server is running on the compute node we will use the same port forwarding as shown in the "Jupyter notebooks on compute nodes" section to connect to it, the only difference being that dask-mpi's startup routine redirects connection information to the *.ER file rather than the *.OU file.

Starting dask with Jupyter notebooks on compute nodes

The job submission script is identical to the batch job:

#!/bin/bash
#PBS -l walltime=0:15:0
#PBS -l nodes=3:xk:ppn=16
#PBS -q debug
#PBS -e interactive.err
#PBS -o interactive.out

cd $PBS_O_WORKDIR

module load bwpy/2.0.4
module swap cray-mpich/7.7.4 cray-mpich/7.5.0
module load bwpy-mpi
# helps for jupyter
module load gcc/5.3.0

set -x

# run our own virtualenv to work around jupyter escaping otherwise
export PYTHONUSERBASE=$PWD
export EPYTHON=python3.6

# check that serverextension is enabled
jupyter serverextension list

# pacify some Python warnings about utf8
export LC_ALL=C.UTF-8
export LANG=C.UTF-8

echo "Check in $PBS_JOBID.ER for jupyter forwarding information."

aprun -n 48 -d 1 python ./interactive.py

with the difference being in the Python script interactive.py:

#!/usr/bin/env python

# add some authentication so that not everyone can connect to the Dask
# scheduler / workers and read our files
from distributed.security import Security
# require authentication but use "null" encryption for speed
security = Security.temporary()
# newer temporay may take arguments, ours does not yet
security.tls_ciphers = "eNULL"
from mpi4py import MPI
security = MPI.COMM_WORLD.bcast(security)

# only way to get security into workers and scheduler for MPI
import dask.config
dask.config.set({\
    'distributed.comm.require-encryption': security.require_encryption,
    'distributed.comm.tls.ciphers': security.tls_ciphers,
    'distributed.comm.tls.ca-file': security.tls_ca_file,
    'distributed.comm.tls.client.key': security.tls_client_key,
    'distributed.comm.tls.client.cert': security.tls_client_cert,
    'distributed.comm.tls.scheduler.key': security.tls_scheduler_key,
    'distributed.comm.tls.scheduler.cert': security.tls_scheduler_cert,
    'distributed.comm.tls.worker.key': security.tls_worker_key,
    'distributed.comm.tls.worker.cert': security.tls_worker_cert})

from dask_mpi import initialize
# nannies fail on BW due to MPI DMA support interfering with fork()
initialize(interface="ipogif0", protocol="tls", nanny=False,
    dashboard=True, dashboard_address="127.0.0.1:8787")

# make scheduler connection available to children
from distributed import Client
client = Client()
import codecs, os, pickle
os.environ["DASK_SCHEDULER"] = client.scheduler_info()['address']
os.environ["DASK_SECURITY"] = codecs.encode(pickle.dumps(security), "base64").decode()

# start jupyter notebook server
from notebook.notebookapp import NotebookApp
app = NotebookApp()
app.initialize([])
app.start()

which starts the workers and scheduler in the same way as for the batch processing example, except that this time it explicitly enables the dashboard and binds it to port 8787 on the localhost (127.0.0.1) address.

The remaining lines (the client code) record scheduler contact information in the environment variables DASK_SCHEDULER and DASK_SECURITY and manually start a jupyter notebook.

Connecting to Jupyter notebooks on compute nodes

The *.ER file will contain lines like this:

[I 17:14:53.493 NotebookApp] The Jupyter Notebook is running at:
[I 17:14:53.493 NotebookApp] http://10.128.73.57:8888/

showing the Geminin network IP address (in this example 10.128.73.57) and port number to use to connect to the jupyter notebook server on the compute node. This IP address is accessible only from within the Blue Waters network and thus you must use ssh to forward a port on your workstation to this IP address and port as described in the Python notebooks page. Eg

ssh -L 127.0.0.1:35509:10.128.73.57:8888 username@h2ologin-duo.ncsa.illinois.edu

Jupyter notebook using dask

import codecs, os, pickle
from distributed import Client
# env variable is set in interactive.py to communicate with child process (us)
security = pickle.loads(codecs.decode(os.environ["DASK_SECURITY"].encode(), "base64"))
client = Client(os.environ["DASK_SCHEDULER"], security=security)
## Jupyter notebooks on compute node

Started via `notebook.notebookapp.NotebookApp()` as part of dask in
`interactive.py`, password and connection port set in
`$HOME/.jupyter/jupyter_notebook_config.py` as described on
https://bluewaters.ncsa.illinois.edu/pythonnotebooks .
## (Secure) dashboard connection via jupyter-sever-proxy

Installed via:
```
pip install --user jupyter-server-proxy
jupyter serverextension enable --user jupyter_server_proxy
```

Connect to Dask [Dashboard](/proxy/127.0.0.1:8787/status)
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
import time
start = time.time()

m = x.mean()
m.compute()

stop = time.time()
print("I took %gs" % (stop-start))
client.scheduler_info()