This work is supported by Continuum Analytics the XDATA Program and the Data Driven Discovery Initiative from the Moore Foundation

Summary

Dask just released version 0.14.0. This release contains some significant internal changes as well as the usual set of increased API coverage and bug fixes. This blogpost outlines some of the major changes since the last release January, 27th 2017.

  1. Structural sharing of graphs between collections
  2. Refactor communications system
  3. Many small dataframe improvements
  4. Top-level persist function

You can install new versions using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

Share Graphs between Collections

Dask collections (arrays, bags, dataframes, delayed) hold onto task graphs that have all of the tasks necessary to create the desired result. For larger datasets or complex calculations these graphs may have thousands, or sometimes even millions of tasks. In some cases the overhead of handling these graphs can become significant.

This is especially true because dask collections don’t modify their graphs in place, they make new graphs with updated computations. Copying graph data structures with millions of nodes can take seconds and interrupt interactive workflows.

To address this dask.arrays and dask.delayed collections now use special graph data structures with structural sharing. This significantly cuts down on the amount of overhead when building repetitive computations.

import dask.array as da

x = da.ones(1000000, chunks=(1000,))  # 1000 chunks of size 1000

Version 0.13.0

%time for i in range(100): x = x + 1
CPU times: user 2.69 s, sys: 96 ms, total: 2.78 s
Wall time: 2.78 s

Version 0.14.0

%time for i in range(100): x = x + 1
CPU times: user 756 ms, sys: 8 ms, total: 764 ms
Wall time: 763 ms

The difference in this toy problem is moderate but for real world cases this can difference can grow fairly large. This was also one of the blockers identified by the climate science community stopping them from handling petabyte scale analyses.

We chose to roll this out for arrays and delayed first just because those are the two collections that typically produce large task graphs. Dataframes and bags remain as before for the time being.

Communications System

Dask communicates over TCP sockets. It uses Tornado’s IOStreams to handle non-blocking communication, framing, etc.. We’ve run into some performance issues with Tornado when moving large amounts of data. Some of this has been improved upstream in Tornado directly, but we still want the ability to optionally drop Tornado’s byte-handling communication stack in the future. This is especially important as dask gets used in institutions with faster and more exotic interconnects (supercomputers). We’ve been asked a few times to support other transport mechanisms like MPI.

The first step (and probably hardest step) was to make Dask’s communication system is pluggable so that we can use different communication options without significant source-code changes. We managed this a month ago and now it is possible to add other transports to Dask relatively easily. TCP remains the only real choice today though there is also an experimental ZeroMQ option (which provides little-to-no performance benefit over TCP) as well as a fully in-memory option in development.

For users the main difference you’ll see is that tcp:// is now prepended many places. For example:

$ dask-scheduler
distributed.scheduler - INFO - -----------------------------------------------
distributed.scheduler - INFO -   Scheduler at:  tcp://192.168.1.115:8786
...

Variety of Dataframe Changes

As usual the Pandas API has been more fully covered by community contributors. Some representative changes include the following:

  1. Support non-uniform categoricals: We no longer need to do a full pass through the data when categorizing a column. Instead we categorize each partition independently (even if they have different category values) and then unify these categories only when necessary

    df['x'] = df['x'].astype('category')  # this is now fast
    
  2. Groupby cumulative reductions

    df.groupby('x').cumsum()
    
  3. Support appending to Parquet collections

    df.to_parquet('/path/to/foo.parquet', append=True)
    
  4. A new string and HTML representation of dask.dataframes. Typically Pandas prints dataframes on the screen by rendering the first few rows of data. However, Because Dask.dataframes are lazy we don’t have this data and so typically render some metadata about the dataframe

    >>> df  # version 0.13.0
    dd.DataFrame<make-ti..., npartitions=366, divisions=(Timestamp('2000-01-01
    00:00:00', freq='D'), Timestamp('2000-01-02 00:00:00', freq='D'),
    Timestamp('2000-01-03 00:00:00', freq='D'), ..., Timestamp('2000-12-31
    00:00:00', freq='D'), Timestamp('2001-01-01 00:00:00', freq='D'))>
    

    This rendering, while informative, can be improved. Now we render dataframes as a Pandas dataframe, but place metadata in the dataframe instead of the actual data.

    >>> df  # version 0.14.0
    Dask DataFrame Structure:
                           x        y      z
    npartitions=366
    2000-01-01       float64  float64  int64
    2000-01-02           ...      ...    ...
    ...                  ...      ...    ...
    2000-12-31           ...      ...    ...
    2001-01-01           ...      ...    ...
    Dask Name: make-timeseries, 366 tasks
    

    Additionally this renders nicely as an HTML table in a Jupyter notebook

Variety of Distributed System Changes

There have also been a wide variety of changes to the distributed system. I’ll include a representative sample here to give a flavor of what has been happening:

  1. Ensure first-come-first-served priorities when dealing with multiple clients
  2. Send small amounts of data through Channels. Channels are a way for multiple clients/users connected to the same scheduler to publish and exchange data between themselves. Previously they only transmitted Futures (which could in trun point to larger data living on the cluster). However we found that it was useful to communicate small bits of metadata as well, for example to signal progress or stopping critera between clients collaborating on the same workloads. Now you can publish any msgpack serializable data on Channels.

    # Publishing Client
    scores = client.channel('scores')
    scores.append(123.456)
    
    # Subscribing Client
    scores = client.channel('scores')
    while scores.data[-1] < THRESHOLD:
        ... continue working ...
    
  3. We’re better at estimating the size in data of SciPy Sparse matrices and Keras models. This allows Dask to make smarter choices about when it should and should not move data around for load balancing. Additionally Dask can now also serialize Keras models.
  4. To help people deploying on clusters that have a shared network file system (as is often the case in scientific or academic institutions) the scheduler and workers can now communicate connection information using the --scheduler-file keyword

    dask-scheduler --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    dask-worker --scheduler-file /path/to/scheduler.json
    
    >>> client = Client(scheduler_file='/path/to/scheudler.json')
    

    Previously we needed to communicate the address of the scheduler, which could be challenging when we didn’t know on which node the scheduler would be run.

Other

There are a number of smaller details not mentioned in this blogpost. For more information visit the changelogs and documentation

Additionally a great deal of Dask work over the last month has happened outside of these core dask repositories.

You can install or upgrade using Conda or Pip

conda install -c conda-forge dask distributed

or

pip install dask[complete] distributed --upgrade

Acknowledgements

Since the last 0.13.0 release on January 27th the following developers have contributed to the dask/dask repository:

  • Antoine Pitrou
  • Chris Barber
  • Daniel Davis
  • Elmar Ritsch
  • Erik Welch
  • jakirkham
  • Jim Crist
  • John Crickett
  • jspreston
  • Juan Luis Cano Rodríguez
  • kayibal
  • Kevin Ernst
  • Markus Gonser
  • Matthew Rocklin
  • Martin Durant
  • Nir
  • Sinhrks
  • Talmaj Marinc
  • Vlad Frolov
  • Will Warner

And the following developers have contributed to the dask/distributed repository:

  • Antoine Pitrou
  • Ben Schreck
  • bmaisonn
  • Brett Naul
  • Demian Wassermann
  • Israel Saeta Pérez
  • John Crickett
  • Joseph Crail
  • Malte Gerken
  • Martin Durant
  • Matthew Rocklin
  • Min RK
  • strets123

blog comments powered by Disqus