Dask.distributed. Dask.distributed is a lightweight library for distributed computing in Python. It extends both the concurrent.futures and dask APIs to moderate sized clusters.

 

Chart Details

This chart will do the following:

  • 1 x Dask scheduler with port 8786 (scheduler) and 80 (Web UI) exposed on an external LoadBalancer
  • 3 x Dask workers that connect to the scheduler
  • 1 x Jupyter notebook with port 80 exposed on an external LoadBalancer
  • All using Kubernetes Deployments

Motivation

 

Distributed serves to complement the existing PyData analysis stack. In particular it meets the following needs:

  • Low latency: Each task suffers about 1ms of overhead. A small computation and network roundtrip can complete in less than 10ms.
  • Peer-to-peer data sharing: Workers communicate with each other to share data. This removes central bottlenecks for data transfer.
  • Complex Scheduling: Supports complex workflows (not just map/filter/reduce) which are necessary for sophisticated algorithms used in nd-arrays, machine learning, image processing, and statistics.
  • Pure Python: Built in Python using well-known technologies. This eases installation, improves efficiency (for Python users), and simplifies debugging.
  • Data Locality: Scheduling algorithms cleverly execute computations where data lives. This minimizes network traffic and improves efficiency.
  • Familiar APIs: Compatible with the concurrent.futures API in the Python standard library. Compatible with dask API for parallel algorithms
    Easy Setup: As a Pure Python package distributed is pip installable and easy to set up on your own cluster.

 

Architecture

Dask.distributed is a centrally managed, distributed, dynamic task scheduler. The central dask-scheduler process coordinates the actions of several dask-worker processes spread across multiple machines and the concurrent requests of several clients.

The scheduler is asynchronous and event driven, simultaneously responding to requests for computation from multiple clients and tracking the progress of multiple workers. The event-driven and asynchronous nature makes it flexible to concurrently handle a variety of workloads coming from multiple users at the same time while also handling a fluid worker population with failures and additions. Workers communicate amongst each other for bulk data transfer over TCP.

Internally the scheduler tracks all work as a constantly changing directed acyclic graph of tasks. A task is a Python function operating on Python objects, which can be the results of other tasks. This graph of tasks grows as users submit more computations, fills out as workers complete tasks, and shrinks as users leave or become disinterested in previous results.

Users interact by connecting a local Python session to the scheduler and submitting work, either by individual calls to the simple interface client.submit(function, *args, **kwargs) or by using the large data collections and parallel algorithms of the parent dask library. The collections in the dask library like dask.array and dask.dataframe provide easy access to sophisticated algorithms and familiar APIs like NumPy and Pandas, while the simple client.submit interface provides users with custom control when they want to break out of canned “big data” abstractions and submit fully custom workloads.

Futures

Dask supports a real-time task framework that extends Python’s concurrent.futures interface. This interface is good for arbitrary task scheduling, like dask.delayed, but is immediate rather than lazy, which provides some more flexibility in situations where the computations may evolve over time.

These features depend on the second generation task scheduler found in dask.distributed (which, despite its name, runs very well on a single machine).

 

Single Machine: Dask.distributed

The dask.distributed scheduler works well on a single machine. It is sometimes preferred over the default scheduler for the following reasons:

It provides access to asynchronous API, notably Futures
It provides a diagnostic dashboard that can provide valuable insight on performance and progress
It handles data locality with more sophistication, and so can be more efficient than the multiprocessing scheduler on workloads that require multiple processes.
You can create a dask.distributed scheduler by importing and creating a Client with no arguments. This overrides whatever default was previously set.

Tell us about a new Kubernetes application

Newsletter

Never miss a thing! Sign up for our newsletter to stay updated.

About

Discover and learn about everything Kubernetes

Navigation