# 使用Project Ray扩展AI，Spark的继任者

###### 下载PDF：https://medium.com/@ravishankar.nair/scaling-ai-with-project-ray-the-successor-to-spark-7b63570a0f60

Usually a project at Berkeley lasts for about five years. Around 2006, some smart users/students were complaining about Hadoop, the elephant project which introduced Map Reduce compute paradigm to common man. Researchers at AMPlab (Algorithms, Machines and People) were looking for a better alternative or solution. They added that long delay in latency that was needed every time they did an iteration and the tight coupling to Map-Sort-Reduce structure. The pain in not having the ability to operate on anything other than the so called Hadoop File System was another concern among many other smaller disappointments. Matei Zaharia wanted some kind of caching system that does not require to go to disk every time and that was the genesis of Spark. Today, it is one of the most widely used unified compute engine. When the founders left Berkeley to start Databricks, AMPLab too closed down, as project was very successfully completed.

# Hadoop 与 Spark (Hadoop vs Spark)

Assume that you are staying in a fictitious hotel where only basic amenities are provided. You need to go to a nearby restaurant for breakfast, another one for lunch, yet another for dinner. For laundry services, you rely on an external shop. For an ATM, you need to go to a nearby shopping mall. Isn’t this a pain ? This is what Hadoop’s offering to the community. For Map-Reduce applications, plain vanilla Hadoop with HDFS, for OLAP applications we need to go to Hive, For OLTP workloads, we rely on HBase, For data ingestion we depend on technologies like Sqoop or Flume and so one and so forth. So Hadoop ecosystem is like this:

Figure 2: Spark — Unified Compute and Analytics — All in One

# 设计师的困境：快速还是并发？ (Designer’s dilemma : Fast or Concurrent?)

This is not way an issue or problem with Spark. It is how Spark is designed. The designers gave priority for simplicity. Spark is monumentally faster that MapReduce, it still retains some core elements of MapReduce’s batch-oriented workflow paradigm. For details, please go through these links 1 and 2 , though I am summarizing here. If there is a function called *slowFunction *which takes 5 seconds to execute and a fastFunction which takes one second to execute.

Step 1:

Start Spark shell with one core on local mode

Figure 3: spark-shell on local mode with one core

Let’s write two functions as below

Figure 4: A slow function and fast function

Observe the output below:

Figure 5: Without explicit constructs like future (in Scala), the output is sequential

Not to be surprised: as mentioned before, Spark is deigned for speed. Using an asynchronous IO paradigm will make spark harder to use, harder to maintain, harder to debug, will increase the number of failure modes it has to deal with and these does not fit in with what spark wants to be: easy, small, lightweight. If you want to take some challenge, you can use async features with foreachPartitionAsync, but makes debugging difficult from personal experience. Async APIs are limited too. Spark is synchronous by design.

What we ideally want is the concurrent execution, meaning without waiting for each slowfunction to be complete, let all fastfunction be executed and then let the system complete slowfunctionon its own. In this way, if I have a pipeline of complex functions, some of the tasks are already complete.

Note that in Spark, the executor JVMs will have tasks and each task is generally just a single thread which is running the serialized code written for that particular task. The code within the task will be single-threaded and synchronous unless you code something to have it not be synchronous.(for example using limited async functions or using Futures )

The question is, does this approach fit well for machine learning and reinforcement learning applications.

# 入门：同步与异步 (A Primer: Synchronous vs Asynchronous)

Regardless of the parallel architecture used, parallel programs possess the problem of controlling access to shared resources among the computing nodes. These resources could be files on disks, any physical device that the program has access to, or simply some data in the memory relevant to the computations.

Reading from shared data normally does not pose any problems. However, when changing the resources’ state, e.g., writing to a file or changing a variable in memory, race conditions occur among computing nodes, in which the resulting state depends on the sequence of uncontrollable events. Sections of a program which contain shared resources are called critical sections. The programmer has to ensure consistent results by removing the race conditions via mutual exclusions. You use *semaphores (*Remembering Edsger W. Dijkstra now!) or mutex kind of structures to avoid these. You know the famous synchronous blocks in Java for example, in multithreading applications.

Since there is no global memory which is shared among all the different and distinct nodes of your Spark cluster, defining a global lock common to all the nodes is harder. Instead, one defines reader/writer policies when giving access to shared information. The simplest policy is to have a *single reader/single writer *policy, i.e., a *master-worker *framework, in which the master is responsible for all the reading and writing of the shared data, and the workers make requests to the master. In Spark, remember the broadcast variables which are read only and accumulators which can be used for associative functions across nodes. Isn't the SparkContext/Session responsible for creating these? One possible way of handling distributed computing is to exploit popular “serverless” systems, but none currently offers facilities for managing distributed, mutable state. Developers must resort to keeping all state in a database when using serverless systems, but the database can be a bottleneck and a single point of failure.

Synchronization mechanisms are also used to communicate global information that requires the attention of all computing nodes at the same time, also referred to as process synchronization, and to wait for a specific event to occur, i.e., event synchronization. However, they reduce the performance, and hence, the efficiency of parallel programs due to the idle waiting times in critical sections. Algorithms that require significant amount of synchronization among nodes are called *synchronous *algorithms, whereas those that can tolerate asynchrony are called *asynchronous *algorithms.

Why does it matter? Well, for most of the ETL workloads and usual scenarios, the option of synchronous or asynchronous does not make much a big challenge, but there is some class of problems which will have some enormous effect unless we use asynchronous mechanism — machine learning, deep learning or reinforcement learning. Why?

Traditionally, machine learning algorithms are designed under the assumption of synchronous operations. At each iteration of an algorithm, the goal is to find a new parameter that results in a decrease in the cost. In the popular gradient descent method using batches, for example, this is achieved by computing the gradient of the cost at the current variable and then taking a step in its negative direction. When the parameters are large, the task of computing the gradient can be split into smaller tasks and mapped to different computing nodes. In synchronous optimization algorithms, each node calculates their part of the gradient independently, and then, synchronizes with other nodes at the end of each iteration to calculate the new value. Such a synchronization means that the algorithm will be running at the pace of the slowest computing node. Moreover, it has the risk of bringing the algorithm a deadlock, a state in which each computing node is waiting for some other node, in case one of the nodes fails. In Spark like architectures, even if a retry occurs, the possibility for slowing down the overall task is significant in larger computations.

We can gain some advantages from asynchronous implementations of these algorithms. First, fewer global synchronization points will give reduced idle waiting times and alleviated congestion in interconnection networks. Second, fast computing nodes will be able to execute more updates in the algorithm. Similarly, the overall system will be more robust to individual node failures. However, on the flip side, asynchronous task runs the risk of rendering an otherwise convergent algorithm divergent. But studies showed that asynchronous optimization algorithms often converge under more restrictive conditions than their synchronous counter parts.

# Ray 项目 (Project Ray)

You need something much more like a just-in time, data-flow type architecture, where a task goes and all the tasks it depends on are ready and finished. Building a system that supports that, and retains all the desirable features of Hadoop and Spark, is the goal of project called Ray. Ray is from the successor to the AMPLab named RISELab. Ray will maintain state of computation among the various nodes in the cluster, but there will be as little state as possible, which will maximize robustness.

Another important aspect of Ray is unify all aspects of a machine learning lifecycle, just like how Spark unified individual siloed components which were prominent in Hadoop based ecosystem. AI applications need support for distributed training, distributed reinforcement learning, model serving, hyper-parameter search, data processing, and streaming. All these problems are right now independent and separated into specialized distributed systems.

Ray 的另一个重要方面是统一机器学习生命周期的所有方面，就像 Spark 如何统一在基于 Hadoop 的生态系统中突出的单个孤立组件一样。 AI 应用程序需要支持分布式训练，分布式强化学习，模型服务，超参数搜索，数据处理和流传输。 目前，所有这些问题都是独立的，并分为专用的分布式系统。

Figure 6 : Siloed aspects of AI applications

What we need is a unified architecture which can handle all these

Figure 7 : Project Ray

We were having a separate distributed computing framework that solves some specific part of the machine learning lifecycle. Ray, a high performance distributed computing system, and with the built-in libraries on top of it to support all these types of workflows, we can avoid overheads and leverage performance of building on one system. They include RLlib for reinforcement learning and Tune for hyper-parameter tuning. Both demonstrate Ray’s unique capabilities. These libraries and other, custom applications written with Ray are already used in many production deployments. In programming languages like Java, we write classes and functions. Ray exposes functions as tasks, classes as actors. Functions are stateless, classes are stateful. Hence actors provide stateful compliment to tasks which are stateless. The kernel is written in C++

Ray uses Plasma, an in memory object store as well as Apache Arrow format for efficient transport and representation of data. Plasma was given to Apache Arrow committee for further development.

Ray 使用 Plasma(一种内存中对象存储)以及 Apache Arrow 格式来高效地传输和表示数据。 等离子体已交给 Apache Arrow 委员会进行进一步开发。

# Ray API (Ray API)

To demonstrate the power of Ray, let’s dive deep. The Ray API is carefully designed to enable users to scale their applications, even across a cluster, with minimal code changes. First, lets start installing Ray and we will use Python.

We have two interesting functions, one is echo which takes a string as parameter and returns as it is. Another is ohce, which takes a string as parameter and gives us the reverse of it. Lets use Ray API and expose these as remote services. Here is a summary:

First let’s have the following steps done:

#STEP 1: Create a new environment named "ray". I am using Anaconda
conda create --name ray python=3.5

#STEP 2: Activate the environment we created in STEP 1
conda activate ray

#STEP 3: Install Ray in this environment
pip install ray

#STEP 4: Start conda from your installation directory of anaconda
bin/anaconda-navigator

#STEP 5: Start Jupyter. If Jupyter not avalable, please install it


A service is basically a function or task in Ray. So we define the tasks as follows

Please see the normal functions and how it differs from Ray definitions. Next we initialize Ray and invoke the Ray functions. Again observe the difference

Normal output of regular functions and Ray remote onvocations

Now, see how we made the parallelism.

Ray tasks are executed in parallelRay

Next a more detailed example: We are going to predict house prices. We will use Ray for illustrating the concepts.

The collection contains 781 data records and it is available for download in CSV format in the code repository mentioned below. Among the 8 available features, for simplicity, we are going to focus on only two of them: the Size, and Price. For each of the 781 records, the Size, in square feet, will be our input features, and the Price our target values.

We will build a simple parameter server, which is a key-value store used for training machine learning models in a cluster. The values are the parameters of a machine-learning model. The **keys **index the model parameters. A house’s price depends on parameters such as the number of bedrooms, living area, location, etc. If we apply artificial learning to these parameters we can calculate house valuations in a given geographical area. In our case we have two parameters or features: size and price as mentioned above.

Figure 8: Our HousingPrizeParameterServer (Asynchronous)

The asynchronous parameter server itself is implemented as an actor (class), which exposes the methods push and pull(tasks). You start off with these functions, and then to turn them into remote tasks, you simply add the @ray.remote decorator to the functions to convert them into tasks. Then when you call them, Ray schedules them in the backend, calling a task is just like calling a function, but you need to add .remote. When developing an application, it’s really important to understand what code runs locally, versus what code runs in the cloud. This helps developers avoid bugs and quickly understand the code. Calling tasks instantly returns an object ID while the task executes in the background. The object ID acts as a future to the result of the remote task.

Next we start Ray by invoking init method.

Figure 9: Starting Ray — Omitted some parameters for convenience

Figure 10: Ray Dashboard (As shown in previous figure, default Ray URL is http://localhost:8265)

We now write a worker which defines a worker task, which take a parameter server as an argument and submits tasks to it.

Figure 11: The worker task for connecting to parameter server for updates

The compute_grad is the one which ideally should calculate the gradient. Above is the snippet showing the usage, you might want to write your complex logic to calculate it.

Finally, create your main code in which we initialize Ray, initialize workers and get the results:

Figure 12: Main code

The first line is to avoid error, if Ray is already running. You can solve the error by passing parameters to init as well.

# 代码 (Code)

The code discussed in the article is available here.