Task
Process numpy arrays in parallel
Solution
dask
Geophysical models get higher and higher resolutions, producing more and more data. However numpy arrays and pandas data frames only work with data that fit in to a memory. For many of us it means that before real analysis we have to somehow subsample or aggregate initial data with some heavy lifting tools (like cdo) and only then switch to convenience and beauty of python. These times might come to an end soon with introduction of dask - library that helps to parallelize computations on big chunks of data. This allows analyzing data that do not (or barely) fit in to your computer's memory as well as to utilize multiprocessing capabilities of your machine.
Below I will briefly describe dask arrays and show results of some simple benchmarks.
Instllation:¶
dask¶
If you don't use conda, it's time to start now. This will install dask and some additional libraries that allow visualization of dask task graphs.
conda install dask pydot networkx
If you still prefer pip:
pip install dask[array]
should do the trick, but I am not sure if the visualization will work
cdo¶
If you use Ubuntu 14.04, don’t install cdo from apt-get
, they are compiled without netCDF4 support. Instructions of how to do it properly can be found here. I am not sure if the problem appears also in the latest versions of Ubuntu.
Mac users should be able to install them with:
brew install cdo
Necessary imports
import os
import dask.array as da
from netCDF4 import Dataset
from IPython.display import Image
Download the data¶
As usual I am going to use good old NCEP reanalysis surface temperature data. This time we need to download all years and then merge them together in one file with cdo. Uncomment the code below to do so:
#for y in range(1948,2015):
# print(y)
# os.system('wget ftp://ftp.cdc.noaa.gov/Datasets/ncep.reanalysis/surface/air.sig995.{}.nc'.format(str(y)))
#!cdo mergetime air.sig995.*.nc ncep_temperature.nc
Small file¶
First just let’s play with a small file that easily fits in to the memory. My test computer is a 5-year-old machine with only 3Gb of memory and 2 cores. No SSD, just usual classical hard drive.
We are going to calculate overall mean for one year. This file is about 22 Mb, but array in memory will occupy about 60 Mb (due to netCDF4 compression). There are 4 time steps a day (1464 in total) and 73x144 points in every field.
f = Dataset('air.sig995.1948.nc')
air = f.variables['air'][:]
air.nbytes
air.shape
f.close()
numpy.array
This is our usual way with numpy array. Open the file, load the data, calculate the mean:
%%timeit
f = Dataset('air.sig995.1948.nc')
numpyresult = f.variables['air'][:].mean()
f.close()
cdo
Same thing with cdo for comparison
%%timeit
!cdo -s fldmean -timmean air.sig995.1948.nc t.nc
dask.array
And finally dask arrays:
%%timeit
f = Dataset('air.sig995.1948.nc')
daskarray = da.from_array(f.variables['air'], chunks=(500,73,144))
daskresult = daskarray.mean().compute()
f.close()
Seems to be just a tiny bit faster, but not significantly. Let's stop here and have a look at what we did. Before doing any calculations we construct a dask array with from array
method using handler for netCDF variable. As an additional parameter we provide chunks
, that indicate the way of how we split our array in to parts. Then we create daskresult
variable by specifying some operation (mean
in our case) on daskarray
and calling compute()
. The later executes the computation, but the interesting part is happening before. Let's create daskresult
without computing it:
f = Dataset('air.sig995.1948.nc')
daskarray = da.from_array(f.variables['air'], chunks=(500,73,144))
daskresult = daskarray.mean()
What is created is a recipe of how to perform your operation in parallel. It can be accessed with .dask
attribute
daskresult.dask
In this form it is not crystal clear, so let's put it in to a graph:
daskresult._visualize()
Image(filename='mydask.png',width=300)
You should read it from the bottom to the top: initial array is splitted into three parts (remember our chunks
size), then mean is performed for each of this parts (in parallel) and then results are finally composed in to one number. What happen when you call compute()
is this recipe gets executed by dask own parallel scheduler. However you can write your own scheduler that is better for your specific task or system.
Calculation of the total mean was sort of a trivial task, but when you do something a bit trickier, your recipes become more complicated:
(daskarray - daskarray.mean())._visualize()
Image(filename='mydask.png',width=300)
Or you can get even more wild:
((daskarray.T - daskarray.mean())/daskarray.std())._visualize()
Image(filename='mydask.png',width=400)
So in essence dask allows you to create the task graph, which then can be executed by the parallel scheduler (from dask or from you).
Or you can think of dask as of just a faster way to do your numpy.array operation :)
File that do not fit in to the memory¶
Here I just going to show results that I get with dask on my relatively old computer with the file that does not fit in to the memory (it's about 4 Gb, I have only 3 Gb). What you should look at is only Wall time
, for cdo CPU times
doesn’t make sense.
Here we just compute overall mean
numpy.array
Didn't even try to compute, immediately rise MemoryError
#%%timeit
#f = Dataset('ncep_temperature.nc')
#numpyresult = f.variables['air'][:].mean()
#f.close()
cdo
%%time
!cdo -s fldmean -timmean ./ncep_temperature.nc t.nc
dask.array
%%time
f = Dataset('ncep_temperature.nc')
daskarray = da.from_array(f.variables['air'], chunks=(5000,73,144))
daskresult = daskarray.mean().compute()
f.close()
A bit faster, but not very impressive. Let's try something a bit more complicated - std:
cdo
%%time
!cdo -s timstd ./ncep_temperature.nc t.nc
dask.array
%%time
f = Dataset('ncep_temperature.nc')
daskarray = da.from_array(f.variables['air'], chunks=(5000,73,144))
daskresult = daskarray.std(axis=0).compute()
f.close()
This is already something - more than 1.5 times faster. How about creating global mean for every time step?
%%time
!cdo -s fldmean ./ncep_temperature.nc t.nc
%%time
f = Dataset('ncep_temperature.nc')
daskarray = da.from_array(f.variables['air'], chunks=(5000,73,144))
daskresult = daskarray.mean(axis=(1,2)).compute()
f.close()
Maybe not two times, but still about 30 seconds faster, and if you process a lot of files this might sum up in to some good hours :)
I also have tried to do this test on a faster computer with many cores (24) and a lot of memory (256 Gb). Results are even more impressive. For some of the tests I was able to get more than 15 times faster speed compared with cdo. So even if your big arrays fit in to the memory, you can seriously benefit from parallel computations with dask.
Matthew Rocklin, lead developer of dask, recently give a very nice presentation at PyData Berlin, it's really worth to watch if you are interested in dask.
Comments !