Intelligent Spark Agents: A Modular LangGraph Framework for Scalable, Visualized, and Enhanced Big Data Machine Learning Workflows
智能 Spark 智能体:用于可扩展、可视化和增强的大数据机器学习工作流的模块化 LangGraph 框架
Abstract. This paper presents a Spark-based modular LangGraph framework, designed to enhance machine learning workflows through scalability, visualization, and intelligent process optimization. At its core, the framework introduces Agent AI, a pivotal innovation that leverages Spark’s distributed computing capabilities and integrates with LangGraph for workflow orchestration.
摘要。本文介绍了一个基于 Spark 的模块化 LangGraph 框架,旨在通过可扩展性、可视化和智能流程优化来增强机器学习工作流。该框架的核心是引入了 AI智能体 (Agent AI),这一关键创新利用了 Spark 的分布式计算能力,并与 LangGraph 集成以实现工作流编排。
Agent AI facilitates the automation of data preprocessing, feature engineering, and model evaluation while dynamically interacting with data through Spark SQL and DataFrame agents. Through LangGraph’s graphstructured workflows, the agents execute complex tasks, adapt to new inputs, and provide real-time feedback, ensuring seamless decision-making and execution in distributed environments. This system simplifies machine learning processes by allowing users to visually design workflows, which are then converted into Spark-compatible code for high-performance execution.
Agent AI 通过 Spark SQL 和 DataFrame 智能体动态与数据交互,促进了数据预处理、特征工程和模型评估的自动化。通过 LangGraph 的图结构工作流,智能体执行复杂任务,适应新输入,并提供实时反馈,确保在分布式环境中无缝决策和执行。该系统通过允许用户可视化设计工作流,简化了机器学习流程,这些工作流随后被转换为 Spark 兼容代码以进行高性能执行。
The framework also incorporates large language models through the LangChain ecosystem, enhancing interaction with unstructured data and enabling advanced data analysis. Experimental evaluations demonstrate significant improvements in process efficiency and s cal ability, as well as accurate data-driven decision-making in diverse application scenarios.
该框架还通过 LangChain 生态系统整合了大语言模型,增强了与非结构化数据的交互能力,并实现了高级数据分析。实验评估表明,该框架在流程效率和可扩展性方面有显著提升,同时在不同应用场景中实现了准确的数据驱动决策。
This paper emphasizes the integration of Spark with intelligent agents and graph-based workflows to redefine the development and execution of machine learning tasks in big data environments, paving the way for scalable and user-friendly AI solutions.
本文强调将 Spark 与智能体 (AI Agent) 和图基工作流相结合,重新定义大数据环境中机器学习任务的开发和执行,为可扩展且用户友好的 AI 解决方案铺平道路。
Keywords: Large Language Model· Agent · LangChain · LangGraph ChatGPT · ERNIE-4 · GLM-4 · Big Data · Machine learning · Apache Spark · Data analysis.
关键词:大语言模型 · AI智能体 · LangChain · LangGraph ChatGPT · ERNIE-4 · GLM-4 · 大数据 · 机器学习 · Apache Spark · 数据分析
1 Introduction
1 引言
The development of information technology brings convenience to life and fastgrowing data. With the maturity of big data analysis technology represented by machine learning, big data has tremendous effect on social and economic life and provided a lot of help for business decision-making. For example, in the e-commerce industry, Taobao recommends suitable goods professionally to users after analyzing from large amounts of transaction data; in the advertising industry, online advertising predicts users’ preferences by tracking users’ clicks to improve users’ experience.
信息技术的发展为生活带来了便利和快速增长的数据。随着以机器学习为代表的大数据分析技术的成熟,大数据对社会和经济生活产生了巨大影响,并为商业决策提供了大量帮助。例如,在电子商务行业,淘宝通过分析大量交易数据,向用户专业推荐合适的商品;在广告行业,在线广告通过追踪用户的点击来预测用户的偏好,从而提升用户体验。
However, the traditional business relational data management system has been unable to deal with the characteristics of big data including large capacity, diversity, and high-dimension . [1] In order to solve the problem of big data analysis, distributed computing has been widely used, among which the Apache Hadoop [2] is one of the widely used distributed systems in recent years. Hadoop adopts MapReduce as a rigorous computing framework. The emergence of Hadoop has promoted the popularity of large-scale data processing platforms. Spark [3], a big data architecture developed by AMPLab of the University of Berkeley, is also widely used. Spark integrates batch analysis, flow analysis, SQL processing, graph analysis, and machine learning applications. Compared with Hadoop, Spark is fast, flexible, and fault-tolerant, which is the ideal choice to run machine learning analysis programs. However, Spark is a tool for developers, which requires analysts to have certain computer skills and spend a lot of time creating, deploying and maintaining systems.
然而,传统的关系型商业数据管理系统已无法应对大数据的大容量、多样性和高维度等特性 [1]。为了解决大数据分析的问题,分布式计算得到了广泛应用,其中 Apache Hadoop [2] 是近年来广泛使用的分布式系统之一。Hadoop 采用 MapReduce 作为严格的计算框架。Hadoop 的出现推动了大规数据处理平台的普及。由加州大学伯克利分校 AMPLab 开发的大数据架构 Spark [3] 也得到了广泛应用。Spark 集成了批处理分析、流分析、SQL 处理、图分析和机器学习应用。与 Hadoop 相比,Spark 快速、灵活且具有容错性,是运行机器学习分析程序的理想选择。然而,Spark 是面向开发者的工具,要求分析师具备一定的计算机技能,并花费大量时间创建、部署和维护系统。
The results of machine learning depend heavily on data quality and model logic, so this paper designs and implements a Park-based flow machine learning analysis tool in order to enable analysts to concentrate on the process itself and not spend energy on compiling, running, and parallel i zing the analysis program. Formally, each machine learning analysis task is decomposed into different stages and is composed of components, which reduces the user’s learning cost. In technology, general algorithms are encapsulated into component packages for reuse, and the training process is differentiated by setting parameters, which reduces the time cost of creating machine learning analysis programs. Users can flexibly organize their own analysis process by dragging and pulling algorithm components to improve the efficiency of application creation and execution.
机器学习的结果在很大程度上依赖于数据质量和模型逻辑,因此本文设计并实现了一个基于Park的流程机器学习分析工具,旨在让分析人员能够专注于流程本身,而不必花费精力在编译、运行和并行化分析程序上。形式上,每个机器学习分析任务被分解为不同的阶段,并由组件组成,这降低了用户的学习成本。在技术上,通用算法被封装成组件包以供重用,并通过设置参数来区分训练过程,从而减少了创建机器学习分析程序的时间成本。用户可以通过拖拽算法组件灵活组织自己的分析流程,以提高应用程序创建和执行的效率。
Spark, as a powerful distributed computing system, has significant advantages in the field of big data processing and analysis. With the rapid development of large model technology, the application of Spark combined with large model agents has gradually become a hot topic in research. Large model agents can more accurately perceive the environment, react and make judgments, and form and execute decisions. Supported by Spark, large model agents can process large-scale datasets, achieving efficient data analysis and decision-making. LangGraph is an agent workflow framework officially launched by LangChain, which defines workflows based on graph structures, supports complex loops and conditional branches, and provides fine-grained agent control. The integration of Spark large model agents with the advanced tools and workflow management of the Langchain and LangGraph frameworks is driving innovative applications in various fields, including large model data analysis.
Spark 作为一个强大的分布式计算系统,在大数据处理和分析领域具有显著优势。随着大模型技术的快速发展,Spark 结合大模型智能体的应用逐渐成为研究热点。大模型智能体能够更准确地感知环境、做出反应和判断,并形成和执行决策。在 Spark 的支持下,大模型智能体能够处理大规模数据集,实现高效的数据分析和决策。LangGraph 是 LangChain 官方推出的智能体工作流框架,它基于图结构定义工作流,支持复杂的循环和条件分支,并提供细粒度的智能体控制。Spark 大模型智能体与 Langchain 和 LangGraph 框架的先进工具和工作流管理相结合,正在推动包括大模型数据分析在内的各个领域的创新应用。
This paper will show the characteristics of the tool by comparing with the currently existing products, and then make a detailed description on the system architecture design, business model with use cases, and function operation by indepth system modules. At the same time, this paper will also provide a technical summary and look forward to the future research directions based on LangGraph for Spark agents.
本文将通过与现有产品的对比展示该工具的特点,并深入系统模块详细描述系统架构设计、业务模型与用例以及功能操作。同时,本文还将提供技术总结,并基于 LangGraph 对 Spark 智能体的未来研究方向进行展望。
2 Brief Introduction of Related Technologies
2 相关技术简介
2.1 AML Machine Learning Service
2.1 AML 机器学习服务
Azure Machine Learning (AML) [4] is a Web-based machine learning service launched by Microsoft on its public cloud Azure, which contains more than 20 algorithms for classification, regression, and clustering based on supervised learning and unsupervised learning, and is still increasing. However, AML is based on Hadoop and can only be used in Azure. Different from AML, the tool in this paper is designed and implemented based on Spark, therefore, it can be deployed on different virtual machines or cloud environments.
Azure Machine Learning (AML) [4] 是 Microsoft 在其公有云 Azure 上推出的基于 Web 的机器学习服务,其中包含基于监督学习和无监督学习的 20 多种分类、回归和聚类算法,并且仍在不断增加。然而,AML 基于 Hadoop,只能在 Azure 中使用。与 AML 不同,本文中的工具是基于 Spark 设计和实现的,因此可以部署在不同的虚拟机或云环境中。
2.2 Responsive Apache Zeppline Based on Spark
2.2 基于 Spark 的响应式 Apache Zeppelin
Apache Zeppline [5] is a Spark-based responsive data analysis system, whose goal is to build interactive, visualized, and shareable Web applications that integrate multiple algorithmic libraries. It has become an open source, notebookbased analysis tool that supports a large number of algorithmic libraries and languages. However, Zeppelin does not provide a user-friendly graphical interface, and all analyzers require users to write scripts to submit and run, which improves the user’s programming technical requirements. The tools mentioned in this paper provide component graphics tools and a large number of machine learning algorithms, which make users simply and quickly define the machine learning process and run to get results.
Apache Zeppelin [5] 是一个基于 Spark 的响应式数据分析系统,其目标是构建交互式、可视化且可共享的 Web 应用程序,这些应用程序集成了多种算法库。它已成为一个开源的、基于笔记本的分析工具,支持大量的算法库和语言。然而,Zeppelin 并未提供用户友好的图形界面,所有分析器都要求用户编写脚本来提交和运行,这提高了用户的编程技术要求。本文提到的工具提供了组件图形工具和大量的机器学习算法,使用户能够简单快速地定义机器学习过程并运行以获取结果。
2.3 Haflow big data Analysis Service Platform
2.3 Haflow 大数据分析服务平台
In reference [6], Haflow, a big data analysis service platform, is introduced. The system uses component design, which can make users drag and drop the process analysis program, and open an extended interface, which enables developers to create custom analysis algorithm components. At present, Haflow only supports MapReduce algorithm components of Hadoop platform but the tool mentioned in this paper is based on Haflow, so that it can support Spark’s component application and provides a large number of machine learning algorithms running in Spark environment.
在文献 [6] 中,介绍了大数据分析服务平台 Haflow。该系统采用组件化设计,用户可以通过拖拽的方式构建分析流程,并开放了扩展接口,使开发者能够创建自定义的分析算法组件。目前,Haflow 仅支持 Hadoop 平台的 MapReduce 算法组件,但本文提到的工具基于 Haflow,使其能够支持 Spark 的组件应用,并提供大量在 Spark 环境下运行的机器学习算法。
2.4 Large Language Models Usher in a New Era of Data Management and Analysis
2.4 大语言模型引领数据管理与分析的新时代
With the rapid advancement of artificial intelligence technology, large model technology has become a hot topic in the field of AI today. In the realm of data management and analysis, the emergence of large language models (LLMs), such as GPT-4o, Llama 3.2, ERNIE -4, GLM-4, and other large models, has initiated a new era filled with challenges. These large models possess powerful semantic understanding and efficient dialogue management capabilities, which have a profound impact on data ingestion, data lakes, and data warehouses. The natural language understanding functionality based on large language models simplifies data management tasks and enables advanced analysis of big data, promoting innovation and efficiency in the field of big data.
随着人工智能技术的快速发展,大模型技术已成为当今AI领域的热门话题。在数据管理和分析领域,大语言模型(LLMs)的出现,如GPT-4o、Llama 3.2、ERNIE-4、GLM-4等大模型,开启了一个充满挑战的新时代。这些大模型具备强大的语义理解和高效的对话管理能力,对数据摄取、数据湖和数据仓库产生了深远的影响。基于大语言模型的自然语言理解功能简化了数据管理任务,并实现了对大数据的深度分析,推动了大数据领域的创新和效率提升。
2.5 LangChain and LangGraph Frameworks
2.5 LangChain 和 LangGraph 框架
LangChain is a large-scale model application development framework designed for creating applications based on Large Language Models (LLMs). It is compatible with a variety of language models and integrates seamlessly with OpenAI ChatGPT. The LangChain framework offers tools and agents that "chain" different components together to create more advanced LLM use cases, including Prompt templates, LLMs, Agents, and Memory, which help users to improve efficiency when facing challenges and generating meaningful responses.
LangChain 是一个为基于大语言模型 (Large Language Models, LLMs) 创建应用程序而设计的大规模模型应用开发框架。它兼容多种语言模型,并与 OpenAI ChatGPT 无缝集成。LangChain 框架提供了工具和智能体,将不同的组件“链”在一起,以创建更高级的大语言模型用例,包括提示模板 (Prompt templates)、大语言模型、智能体 (Agents) 和记忆 (Memory),这些组件帮助用户在面对挑战时提高效率并生成有意义的响应。
LangGraph is a framework within the LangChain ecosystem that allows developers to define cyclic edges and nodes within a graph structure and provides state management capabilities. Each node in the graph makes decisions and performs actions based on the current state, enabling developers to design agents with complex control flows, including single agents, multi-agents, hierarchical structures, and sequential control flows, which can robustly handle complex scenarios in the real world.
LangGraph 是 LangChain 生态系统中的一个框架,允许开发者在图结构中定义循环边和节点,并提供状态管理功能。图中的每个节点根据当前状态做出决策并执行操作,使开发者能够设计具有复杂控制流的智能体,包括单智能体、多智能体、分层结构和顺序控制流,从而能够稳健地处理现实世界中的复杂场景。
The combination of Spark with LangChain and LangGraph allows data enthusiasts from various backgrounds to effectively engage in data-driven tasks. Through Spark SQL agents and Spark DataFrame agents, users are enabled to interact with, explore, and analyze data using natural language.
Spark 与 LangChain 和 LangGraph 的结合使得来自不同背景的数据爱好者能够有效地参与数据驱动的任务。通过 Spark SQL 智能体和 Spark DataFrame 智能体,用户可以使用自然语言与数据进行交互、探索和分析。
3 Process-oriented Machine Learning Analysis Tool Based on Spark
3 基于 Spark 的面向过程的机器学习分析工具
3.1 Machine Learning Process
3.1 机器学习流程
This paper aims at designing a process machine learning tool for data analysts, so it is necessary to implement the functions of common machine learning processes. Machine learning can be divided into supervised learning and unsupervised learning, mainly depending on whether there are specific labels. Labels are the purpose of observation data or the prediction object. Observation data are the samples used to train and test machine learning models. Feature is the attribute of observation data. Machine learning algorithm mainly trains the prediction rules from the characteristics of observation data.
本文旨在为数据分析师设计一个流程机器学习工具,因此需要实现常见机器学习流程的功能。机器学习可以分为监督学习和无监督学习,主要取决于是否存在特定的标签。标签是观察数据的目的或预测对象。观察数据是用于训练和测试机器学习模型的样本。特征是观察数据的属性。机器学习算法主要从观察数据的特征中训练预测规则。
In practice, machine learning process is composed of a series of stages, including data preprocessing, feature processing, model fitting, and result verification or prediction. For example, classify a group of text documents including word segmentation, cleaning, feature extraction, training classification model, and output classification results [7].
在实践中,机器学习过程由一系列阶段组成,包括数据预处理、特征处理、模型拟合以及结果验证或预测。例如,对一组文本文档进行分类,包括分词、清洗、特征提取、训练分类模型以及输出分类结果 [7]。

Fig. 1. Typical machine learning process.
图 1: 典型的机器学习流程。
These stages can be seen as black-box processes and packaged into components. Although there are many algorithmic libraries or software that provide programs for each stage, these programs are seldom prepared for large-scale data sets or distributed environments, and these programs are not naturally supporting process-oriented, requiring developers to connect each stage to form a complete process.
这些阶段可以视为黑箱过程并打包成组件。尽管有许多算法库或软件为每个阶段提供程序,但这些程序很少为大规模数据集或分布式环境做好准备,并且这些程序并不天然支持面向过程,需要开发者将每个阶段连接起来以形成完整流程。
Therefore, while providing a large number of machine learning algorithm components, the system also needs to complete the function of automatic process execution, taking into account the operational efficiency of the process.
因此,在提供大量机器学习算法组件的同时,系统还需要完成自动流程执行的功能,并兼顾流程的运行效率。
3.2 Design Process of Business Module
3.2 业务模块的设计流程
The system provides components to users as main business functions. The analysts can freely combine the existing components into different analysis processes. In order to be able to cover the commonly used machine learning processes, this system provides the following categories of business modules: input/output module, data preprocessing module, characteristic processing module, model fitting module, and the results predicted module. Different from other systems, the business module designed by this tool is using each stage of the process as a definition.
系统为用户提供组件作为主要业务功能。分析师可以自由地将现有组件组合成不同的分析流程。为了能够覆盖常用的机器学习流程,该系统提供了以下类别的业务模块:输入/输出模块、数据预处理模块、特征处理模块、模型拟合模块和结果预测模块。与其他系统不同,该工具设计的业务模块将流程的每个阶段作为一个定义。
- Input and output module. Used to realize data acquisition and writing, dealing with heterogeneous data sources, this module is the starting point and endpoint of the machine learning process. In order to be able to handle different types of data, this system provides data input or output functions of structured data (such as CSV data), unstructured data (such as TXT data), and semistructured data (such as HTML).
- 输入输出模块。用于实现数据采集和写入,处理异构数据源,该模块是机器学习过程的起点和终点。为了能够处理不同类型的数据,该系统提供了结构化数据(如CSV数据)、非结构化数据(如TXT数据)和半结构化数据(如HTML)的数据输入或输出功能。
- Data preprocessing module. This module includes data cleaning, filtering, the join/fork, and type change, etc. Data quality determines the upper limit of the accuracy of the machine learning model, so it is necessary to improve the data preprocessing process before feature extraction. This module can clean up null or abnormal values, change data types, and filter out unqualified data.
- 数据预处理模块。该模块包括数据清洗、过滤、连接/分叉以及类型转换等。数据质量决定了机器学习模型准确性的上限,因此在特征提取之前有必要改进数据预处理过程。该模块可以清理空值或异常值,更改数据类型,并过滤掉不合格的数据。
- Feature processing module. Feature processing is the most important link before modeling data, including feature selection and feature extraction. The system currently contains 25 commonly used feature processing algorithms.
- 特征处理模块。特征处理是数据建模前最重要的环节,包括特征选择和特征提取。系统目前包含25种常用的特征处理算法。
3.3 Feature Extraction and Classification
3.3 特征提取与分类
Feature selection is a multi-dimensional feature selection. The most valuable feature is selected by the algorithm. The selected feature is a subset of the original feature. According to the selected algorithm, it can be divided into information gain selector, chi-square information selector, and Gini coefficient selector.
特征选择是一种多维特征选择。通过算法选择最有价值的特征。所选特征是原始特征的子集。根据所选算法,可以分为信息增益选择器、卡方信息选择器和基尼系数选择器。
Feature extraction is to transform the features of observed data into new variables according to a certain algorithm. Compared with data preprocessing, the rules of data processing are more complex. The extracted features are the mapping of the original features, including the following categories:
特征提取是根据某种算法将观测数据的特征转换为新变量。与数据预处理相比,数据处理的规则更为复杂。提取的特征是原始特征的映射,包括以下几类:
- Standardized component. Standardization is an algorithm that maps numerical features of data to a unified dimension. Standardized features are unified to the same reference frame, which makes the training model more accurate and converges faster in the training process. Different standardized components use different statistics to map, such as normalizer components, Standard Scaler components, MinMax Scaler groups and so on.
- 标准化组件。标准化是一种将数据的数值特征映射到统一维度的算法。标准化后的特征被统一到相同的参考框架中,这使得训练模型更加准确,并在训练过程中更快地收敛。不同的标准化组件使用不同的统计量进行映射,例如归一化组件、标准缩放器组件、最小最大缩放器组件等。
- Text processing components. Text type features need to be mapped to new numeric type variables because they cannot be calculated directly. Common algorithms include TF-IDF components that index text by word segmentation, Tokenizer components for word segmentation, One Hot Encoder components for hot encoding, etc.
- 文本处理组件。文本类型的特征需要映射为新的数值类型变量,因为它们无法直接计算。常见的算法包括通过分词索引文本的 TF-IDF 组件、用于分词的 Tokenizer 组件、用于热编码的 One Hot Encoder 组件等。
- Dimension-reducing components. This kind of components compress the original feature information through a certain algorithm and express it with fewer features, such as PCA components of principal component analysis.
- 降维组件。这类组件通过某种算法压缩原始特征信息,并用较少的特征表示,例如主成分分析 (PCA) 的组件。
- Custom UDF components. Users can input the function of SQL custom feature processing.
- 自定义 UDF 组件。用户可以输入 SQL 自定义特征处理的函数。
- Model fitting module. Model training uses certain algorithms to learn data, and the obtained model can be used for subsequent data prediction. At present, the system provides a large number of supervised learning model components, which can be divided into classification models and regression models according to the different nature of observation data labels.
- 模型拟合模块。模型训练使用特定算法学习数据,获得的模型可用于后续数据预测。目前,系统提供了大量监督学习模型组件,根据观测数据标签性质的不同,可分为分类模型和回归模型。
- Results prediction module. This module includes two functions: results prediction and verification.
- 结果预测模块。该模块包括两个功能:结果预测和验证。
Through the above general business modules, users can create a variety of common machine learning analysis processes in the system environment.
通过上述通用业务模块,用户可以在系统环境中创建各种常见的机器学习分析流程。
The system provides a user interface through Web, and the overall architecture is mainly based on the MVC framework. At the same time, it provides business modules of machine learning and execution modules of processes. The system architecture is shown in Figure 2.
系统通过Web提供用户界面,整体架构主要基于MVC框架。同时,它提供了机器学习的业务模块和流程的执行模块。系统架构如图2所示。

Fig. 2. System architecture diagram and flow chart.
图 2: 系统架构图和流程图。
Users create formal machine learning processes through the Web interface provided by the system and submit them to the system. The system converts the received original processes into logical flow charts and verifies the validity of the flow charts. Validation of process validity is a necessary part of the analysis process before actual execution. When the process has obvious errors such as logic or data mismatch, it can immediately return the error, rather than wait for the execution of the corresponding component to report the error, which improves the efficiency of the system.
用户通过系统提供的Web界面创建正式的机器学习流程,并将其提交给系统。系统将接收到的原始流程转换为逻辑流程图,并验证流程图的有效性。流程有效性的验证是实际执行前分析过程中的必要部分。当流程存在逻辑或数据不匹配等明显错误时,系统可以立即返回错误,而不是等待相应组件的执行来报告错误,从而提高了系统的效率。
The execution engine of the system is the key module, which implements the multi-user and multi-task process execution function. It translates the validated logical flow chart into the corresponding execution model, which is the data structure identifiable by the system and used to schedule the corresponding business components. The translation of the execution model is a complex process, which will be introduced in detail in Section 4.3.
系统的执行引擎是关键模块,它实现了多用户和多任务的流程执行功能。它将经过验证的逻辑流程图转换为相应的执行模型,这是系统可识别的数据结构,用于调度相应的业务组件。执行模型的转换是一个复杂的过程,将在第4.3节中详细介绍。
3.5 Architecture Design of Spark Agent Based on LangGraph
3.5 基于 LangGraph 的 Spark Agent 架构设计
As shown in Figure 3, the architecture of the agent implementation that combines Apache Spark with LangChain and LangGraph is designed to enhance the level of intelligence in data processing and decision-making. This architectural diagram displays a Spark-based agent system capable of accomplishing complex tasks by integrating various technological components.
如图 3 所示,将 Apache Spark 与 LangChain 和 LangGraph 结合的智能体实现架构旨在提升数据处理和决策的智能化水平。该架构图展示了一个基于 Spark 的智能体系统,能够通过集成多种技术组件来完成复杂任务。

Fig. 3. Architectural Design of Spark Agent Based on LangGraph.

图 3: 基于 LangGraph 的 Spark Agent 架构设计
LangChain and LangGraph frameworks are introduced on top of Spark. LangChain is a framework for building and deploying large language models, enabling agents to interact with users or other systems through natural language. LangGraph provides a graph-structured workflow, allowing agents to plan and execute tasks in a more flexible manner. The core of the agent is the LLM (Large Language Model), which is responsible for understanding and generating natural language. The LLM receives instructions (Instruction), observations (Observation), and feedback (FeedBack) from LangChain and integrates these with data insights from Spark to form thoughts (Thought). These thoughts are then translated into specific action plans (Plan and Action), which are executed by the action executor (Action Executor).
LangChain 和 LangGraph 框架在 Spark 之上被引入。LangChain 是一个用于构建和部署大语言模型的框架,使 AI智能体能够通过自然语言与用户或其他系统进行交互。LangGraph 提供了一个图结构的工作流,允许 AI智能体以更灵活的方式规划和执行任务。AI智能体的核心是大语言模型 (LLM),它负责理解和生成自然语言。LLM 从 LangChain 接收指令 (Instruction)、观察 (Observation) 和反馈 (FeedBack),并将这些与来自 Spark 的数据洞察相结合,形成思考 (Thought)。这些思考随后被转化为具体的行动计划 (Plan and Action),并由行动执行器 (Action Executor) 执行。
The action executor is tightly integrated with Spark, processing structured data through Spark DataFrame Agent and Spark SQL Agent, performing complex data analysis and data processing tasks, and feeding the results back to the LLM. The LLM adjusts its action plans based on this feedback and new observations, forming a closed-loop learning and optimization process that further refines its performance and decision-making quality.
动作执行器与Spark紧密集成,通过Spark DataFrame Agent和Spark SQL Agent处理结构化数据,执行复杂的数据分析和数据处理任务,并将结果反馈给大语言模型。大语言模型根据此反馈和新观察调整其行动计划,形成一个闭环学习和优化过程,进一步提升其性能和决策质量。
4 Research on System Implementation and Key Technologies
4 系统实现与关键技术研究
4.1 Storage of Intermediate Data
4.1 中间数据的存储
Frame-based Contingent Storage Architecture In the whole process of machine learning, data is in the state of flow, and components with sequence dependence need to transfer intermediate data. In order to avoid the problem of heterogeneity of intermediate data, the system stipulates that components communicate with each other using a unified determinant storage structure based on DataFrame [8], which is a Spark-supported distributed data set and is classified as the main data set. Conceptually, it is similar to the "table" of relational database, but a lot of optimization has been done on its operation execution at Spark. In this way, the relationship of structured data is retained, special data attributes are defined, features and label are specified as the head of data required in the model fitting stage, so as to facilitate the validation and execution of the process.
基于帧的应急存储架构
在机器学习的整个过程中,数据处于流动状态,具有序列依赖性的组件需要传输中间数据。为了避免中间数据的异构性问题,系统规定组件之间使用基于DataFrame的统一确定性存储结构进行通信 [8],这是一种Spark支持的分布式数据集,被归类为主数据集。从概念上讲,它类似于关系数据库的“表”,但在Spark上对其操作执行进行了大量优化。通过这种方式,保留了结构化数据的关系,定义了特殊的数据属性,并将特征和标签指定为模型拟合阶段所需的数据头,以便于流程的验证和执行。
This determinant storage structure can be quickly persisted to the intermediate data storage layer by the whole system, and quickly restored to the required data objects when the later components are used.
这种确定性存储结构可以被整个系统快速持久化到中间数据存储层,并在后续组件使用时快速恢复到所需的数据对象。
Alluxio Virtual Distributed Storage System Intermediate data needs different management in different life cycles. When components process the previous data, that is to say, during the generation phase of intermediate data, the system records the generation location of intermediate data and transfers it to the next component. After the execution of the process, all the intermediate data generated by the process will no longer be used and will be deleted by the system. At the same time, the intermediate data storage space of a single process has a specified upper limit. When too much intermediate data is generated, the resource manager of the process will use the Least Recently Used algorithm (LRU) [9]to clear the data, so as to prevent the overflow of memory caused by too much intermediate data.
Alluxio 虚拟分布式存储系统
中间数据在不同的生命周期需要不同的管理。当组件处理前一个数据时,即在中间数据的生成阶段,系统会记录中间数据的生成位置并将其传递给下一个组件。在流程执行完毕后,该流程生成的所有中间数据将不再使用,并由系统删除。同时,单个流程的中间数据存储空间有指定的上限。当生成的中间数据过多时,流程的资源管理器会使用最近最少使用算法 (LRU) [9] 来清理数据,以防止因中间数据过多而导致内存溢出。
In order to ensure the IO efficiency of intermediate data, Alluxio[10] is used as the intermediate storage reservoir to store all the intermediate data in memory. Alluxio is a virtual distributed storage system based on memory, which can greatly accelerate the speed of data reading and writing.
为了确保中间数据的 IO 效率,Alluxio[10] 被用作中间存储池,将所有中间数据存储在内存中。Alluxio 是一个基于内存的虚拟分布式存储系统,可以极大地加速数据读写速度。
4.2 Implementation Method of Business Components of Machine Learning
4.2 机器学习业务组件的实现方法
Machine Learning Analysis Components Based on SparkMLlib In section 3.2, the design of the machine learning module is described in detail. These modules complete main data processing and modeling functions in the form of components. In order to quickly provide as many algorithmic components as possible, only a small part of the processing program components are programmed according to the characteristics of the machine learning process, such as input and output components, data cleaning components, and so on, and a large number of the component functions are automatically converted to Spark jobs using Spark MLlib [11], which is Spark’s own machine learning algorithm library, containing a large number of classification, regression, clustering, dimensionality reduction, and other algorithms. For example, to classify with the help of Random Forest, the Random Forest Classifier object with corresponding parameters is instantiated by the execution engine of the system according to the node information of the process. The fit method is used to fit the input data, and the corresponding Model object is generated. Then the model is serialized and saved through the intermediate data management module for subsequent prediction or verification components to use.
基于 Spark MLlib 的机器学习分析组件
在第 3.2 节中,详细描述了机器学习模块的设计。这些模块以组件的形式完成主要的数据处理和建模功能。为了快速提供尽可能多的算法组件,只有一小部分处理程序组件是根据机器学习过程的特点进行编程的,例如输入输出组件、数据清洗组件等,而大量的组件功能是通过 Spark MLlib [11] 自动转换为 Spark 作业的。Spark MLlib 是 Spark 自带的机器学习算法库,包含了大量的分类、回归、聚类、降维等算法。例如,要借助随机森林进行分类,系统的执行引擎会根据流程的节点信息实例化具有相应参数的随机森林分类器对象。使用 fit 方法对输入数据进行拟合,并生成相应的模型对象。然后通过中间数据管理模块将模型序列化并保存,以供后续的预测或验证组件使用。
Components of Sharing in Spark Context Execution Process There are two ways to run components in the process. One is to call as an independent Spark program and start the Spark Context once for each run. When the Spark program starts, it creates a context environment to determine the allocation of resources, such as how many threads and memory to be called, and then schedule tasks accordingly. The general machine learning process is composed of many components. It will take a lot of running time to start and switch the context. Another way is to share the same context for each process. The whole process can be regarded as a large Spark program. However, the execution engine of the system needs to create and manage the context for each process and release the context object to recover resources at the end of the process.
Spark 上下文执行过程中的共享组件
在过程中运行组件有两种方式。一种是将每个组件作为独立的 Spark 程序调用,并在每次运行时启动一次 Spark 上下文。当 Spark 程序启动时,它会创建一个上下文环境来确定资源的分配,例如调用多少线程和内存,然后相应地调度任务。一般的机器学习过程由许多组件组成。启动和切换上下文会花费大量的运行时间。另一种方式是为每个进程共享相同的上下文。整个过程可以看作是一个大型的 Spark 程序。然而,系统的执行引擎需要为每个进程创建和管理上下文,并在进程结束时释放上下文对象以回收资源。
In order to achieve context sharing, each component inherits Spark Job Life or its subclasses and implements the methods create Instance and execute. Figure 4 is the design and inheritance diagram of components classification, among which Transformers, Models, and Predictors are respectively the parent of data cleaning and data preprocessing model, learning and training model, validation and prediction model.
为了实现上下文共享,每个组件都继承 Spark Job Life 或其子类,并实现 createInstance 和 execute 方法。图 4 是组件分类的设计和继承图,其中 Transformers、Models 和 Predictors 分别是数据清洗和数据预处理模型、学习和训练模型、验证和预测模型的父类。

Fig. 4. Component class design and inheritance diagram.

图 4: 组件类设计和继承图。
4.3 Logical Analysis Flow of Machine Learning
4.3 机器学习的逻辑分析流程
Process Creation After the user has designed and submitted the machine learning analysis process through the graphical interface, the system will start to create the logical analysis process. First, the system will make a topological analysis of the original process and generate the logical flow chart expressed by the Directed Acyclic Graph (DAG). The logical flow chart includes the dependence and parallelism of each component, as well as the input and output information and parameters.
流程创建
在用户通过图形界面设计并提交机器学习分析流程后,系统将开始创建逻辑分析流程。首先,系统将对原始流程进行拓扑分析,并生成由有向无环图 (DAG) 表示的逻辑流程图。逻辑流程图包括每个组件的依赖关系和并行性,以及输入输出信息和参数。
Verification Steps After the logical structure of the current process is generated, the validity of the overall process will be verified. The specific steps are as follows:
验证步骤
在当前流程的逻辑结构生成后,将验证整个流程的有效性。具体步骤如下:
- Check the input and output of each node in the graph and other necessary parameter information, and return errors if missing. For example, component users of feature processing must define input column and output column;
- 检查图中每个节点的输入和输出以及其他必要的参数信息,如果缺失则返回错误。例如,特征处理的组件用户必须定义输入列和输出列;
- Check the integrity of the whole process to see if there is at least one input component and output component as the beginning and end, otherwise return the error; 3. Check whether there is self-circulation in the flow chart, otherwise return error; 4. Check whether each component conforms to the pre- and post-dependencies of machine learning process, for example, feature processing must be prior to model fitting, and return errors if it does not conform.
- 检查整个流程的完整性,确保至少有一个输入组件和输出组件作为开始和结束,否则返回错误;
- 检查流程图中是否存在自循环,否则返回错误;
- 检查每个组件是否符合机器学习过程的前后依赖关系,例如特征处理必须在模型拟合之前,如果不符合则返回错误。
After validating the process, the flow chart will be submitted to the execution engine. Firstly, the system needs to represent the logical flow chart as a model that can be executed directly, and then convert it into a machine learning algorithm component based on SparkMLlib, which can be executed serially or in parallel. This process is called process translation and execution.
在验证流程后,流程图将被提交到执行引擎。首先,系统需要将逻辑流程图表示为可以直接执行的模型,然后将其转换为基于 SparkMLlib 的机器学习算法组件,这些组件可以串行或并行执行。这个过程称为流程翻译与执行。
MLlib [11] is a distributed machine learning algorithm library with Spark built-in support, which optimizes the parallel storage and operation of largescale data and models. With Spark MLlib, a large number of efficient component programs can be developed quickly. This section will focus on how the system translates the process into an executable model to speed up the operation of the machine learning analysis process.
MLlib [11] 是一个内置 Spark 支持的分布式机器学习算法库,它优化了大规模数据和模型的并行存储与操作。通过 Spark MLlib,可以快速开发大量高效的组件程序。本节将重点介绍系统如何将流程转换为可执行模型,以加速机器学习分析过程的运行。
Translation Method for Multiple Join/fork Parallel Tasks Join component is a component that merges different data sets into the same data set, with a many-to-one relationship with the former component. Fork component is a component that applies the same data set to different process branches, with a one-to-many relationship with the later component. Join/fork component has a lot of applications in practice. Collaborative filtering algorithm for commodity recommendation, taken as an example, needs to join all kinds of related data such as transaction data, brand data, birth and residence information of users at the same time in order to depict user information. The specific user profile obtained is then forked to each commodity to get the corresponding preference probability [12].
多连接/分叉并行任务的翻译方法
连接组件是将不同数据集合并为同一数据集的组件,与前一个组件存在多对一关系。分叉组件则是将同一数据集应用于不同处理分支的组件,与后一个组件存在一对多关系。连接/分叉组件在实际应用中有广泛用途。以商品推荐的协同过滤算法为例,该算法需要同时连接交易数据、品牌数据、用户出生地和居住信息等各种相关数据,以描绘用户信息。随后,将获得的特定用户画像分叉至每个商品,以获取相应的偏好概率 [12]。
When multiple data sets join at the same time, in order to execute the process efficiently, divide-and-conquer algorithm is used to execute different join branches separately and merge them finally. When multiple process branches are generated from the same data set fork, parallel execution of each process branch will not affect the final model results. In a word, for machine learning processes with multiple join and fork tasks, it is necessary to execute in parallel as much as possible to improve operation efficiency.
当多个数据集同时进行连接时,为了高效执行流程,采用分治算法分别执行不同的连接分支并最终合并。当从同一数据集分叉生成多个流程分支时,每个流程分支的并行执行不会影响最终的模型结果。总之,对于具有多个连接和分叉任务的机器学习流程,应尽可能并行执行以提高操作效率。
Translation Method of Compound Process The translation method used when multiple join/fork parallel tasks occur in the process was introduced in the previous section. But the actual machine learning process is not a simple serial or parallel, but a combination of serial tasks and parallel tasks, so the actual machine learning process is more complex. The difficulty to convert complex processes into execution engines lies in executing the process as parallel as possible without disrupting the data dependencies between components. The following are the translation methods for composite processes:
复合流程的翻译方法
上一节介绍了当流程中出现多个连接/分叉并行任务时使用的翻译方法。但实际的机器学习流程并不是简单的串行或并行,而是串行任务和并行任务的组合,因此实际的机器学习流程更加复杂。将复杂流程转换为执行引擎的难点在于在不破坏组件之间数据依赖关系的情况下尽可能并行执行流程。以下是复合流程的翻译方法:
- The flowchart is traversed breadth first to determine the topological relationship between business components.
- 流程图按广度优先遍历以确定业务组件之间的拓扑关系。
- To divide the sub-processes of the same stage according to the stages of data preprocessing, feature processing, model fitting, and prediction.
- 根据数据预处理、特征处理、模型拟合和预测的阶段,划分同一阶段的子过程。
- Critical path algorithm is used to judge the internal execution of each subprocess to determine the hierarchical relationship of branches in the sub-process. 4. The branches of the same level obtained after the last step are optimized according to the algorithm in the previous section.
- 关键路径算法用于判断每个子进程的内部执行,以确定子进程中分支的层次关系。
- 上一步获得的同级分支根据前一节中的算法进行优化。
S: dataset P: data preprocessing & feature processing MF: model fitting
S: 数据集 P: 数据预处理与特征处理 MF: 模型拟合

Fig. 5. Translation Method for Multiple Join/fork Parallel Tasks.

图 5: 多连接/分叉并行任务的翻译方法。
4.5 Key Technologies of Spark Agent Based on LangGraph
4.5 基于 LangGraph 的 Spark Agent 关键技术
The implementation of Spark agents based on LangGraph involves the following key steps:
基于 LangGraph 的 Spark 智能体实现涉及以下关键步骤:
- Within the LangChain framework, detailed interaction prompts are defined for Spark SQL, clarifying the role and functionality of the agent. The agent is designed to interact with the Spark SQL database, receive questions, construct and execute syntactically correct Spark SQL queries, analyze the results, and provide accurate answers based on these results. To improve efficiency and relevance, the query results are limited to a maximum of top k entries and can be sorted by relevant columns to highlight examples in the database. The agent only queries columns directly related to the question, uses specific database interaction tools, and relies on the information returned by these tools to construct the final response. Before executing the query, the agent performs a strict check of the query statement to ensure it is error-free. If problems are encountered, the agent rewrites the query statement and tries again. Additionally, the agent is strictly prohibited from executing any DML (Data Manipulation Language) statements, such as INSERT, UPDATE, DELETE, DROP, etc., to maintain the integrity of the database. For questions unrelated to the database, the agent will clearly respond with "I don’t know."
- 在 LangChain 框架内,为 Spark SQL 定义了详细的交互提示,明确了代理的角色和功能。该代理旨在与 Spark SQL 数据库交互,接收问题,构建并执行语法正确的 Spark SQL 查询,分析结果,并根据这些结果提供准确的答案。为了提高效率和相关性,查询结果限制为最多前 k 条记录,并可按相关列排序以突出数据库中的示例。代理仅查询与问题直接相关的列,使用特定的数据库交互工具,并依赖这些工具返回的信息构建最终响应。在执行查询之前,代理会对查询语句进行严格检查,确保其无误。如果遇到问题,代理会重写查询语句并再次尝试。此外,代理严格禁止执行任何 DML(数据操作语言)语句,如 INSERT、UPDATE、DELETE、DROP 等,以维护数据库的完整性。对于与数据库无关的问题,代理将明确回应“我不知道”。
- By instant i a ting Spark SQL Toolkit and passing the llm and toolkit as parameters to the create spark sql agent method, an instance of agent executor is constructed. This instance integrates four types of Spark SQL tools: QuerySpark
- 通过实例化 Spark SQL 工具包并将 llm 和工具包作为参数传递给 create spark sql agent 方法,构建了一个 agent executor 实例。该实例集成了四种类型的 Spark SQL 工具:QuerySpark
SQLTool, Info Spark SQL Tool, ListS park SQL Tool and Query Checker Tool, which provide the agent with the capability to interact effectively with the Spark SQL database.
SQLTool、Info Spark SQL Tool、ListS park SQL Tool 和 Query Checker Tool,这些工具为 AI智能体 提供了与 Spark SQL 数据库高效交互的能力。
- The agent executor’s invoke method is used to respond to user questions. Within the LangChain framework, the agent performs tasks through three core components: Thought (thinking), Action (action), and Observation (observation). Initially, the agent conducts an in-depth analysis and reasoning upon receiving input to determine the best course of action. Following this, the agent executes specific operations based on the results of its thinking. Then, the agent provides feedback and evaluates the outcomes of its actions, recording observations that serve as new inputs for the next round of thinking. Through the iterative cycle of these three steps, the agent can dynamically handle complex tasks and continuously optimize its behavior to achieve its goals.
- 代理执行器的调用方法用于响应用户问题。在LangChain框架中,代理通过三个核心组件执行任务:思考 (Thought)、行动 (Action) 和观察 (Observation)。首先,代理在接收到输入后进行深入分析和推理,以确定最佳行动方案。接着,代理根据思考结果执行具体操作。然后,代理提供反馈并评估其行动的结果,记录观察结果作为下一轮思考的新输入。通过这三个步骤的迭代循环,代理能够动态处理复杂任务,并不断优化其行为以实现目标。
5 Experimental analysis
5 实验分析
5.1 Experimentation Environment and Explanation of Experimentation Data
5.1 实验环境与实验数据说明
At present, the system is still in the prototype stage. In order to test the function of the system, this paper uses a four-core processor, 8G memory, and a 64-bit Ubuntu system to deploy a pseudo-distributed environment for the experiment.
目前,系统仍处于原型阶段。为了测试系统的功能,本文使用四核处理器、8G内存和64位Ubuntu系统部署了一个伪分布式环境进行实验。
The experimental data is from the public dataset of Kaggle [13]. Through the crime record data of Los Angeles city from 2003 to 2015, the crime category is modeled. In order to facilitate the process description, three original features are selected in this paper, and the common machine learning analysis method is used to create the process. The data characteristics of the features and labels are shown in Table 1. In conclusion, the characters and labels are mainly character strings, which require data preprocessing to extract features and map them to numerical features.
实验数据来自 Kaggle 的公开数据集 [13]。通过洛杉矶市 2003 年至 2015 年的犯罪记录数据,对犯罪类别进行建模。为了便于过程描述,本文选择了三个原始特征,并使用常见的机器学习分析方法创建过程。特征和标签的数据特征如表 1 所示。总之,字符和标签主要是字符串,需要进行数据预处理以提取特征并将其映射为数值特征。
Table 1 Description of data features
表 1: 数据特征描述
| 名称 | 描述 | 特征 | 示例 |
|---|---|---|---|
| DayOfWeek | 发生时间 | 仅限周一到周日 | Monday |
| PdDistrict | 发生区域 | 字符串类型,十个区域 | BAYVIEW |
| Address | 发生地址 | 长文本数据,包含标点符号 | 800 Block of INGE... |
| Category | 案件分类 | 多类别标签,字符串 | ARSON |
In order to convert the original features into numerical ei gen vectors that can be computed by the training model, a series of data preprocessing tasks are needed to be implemented. In Table 2, each feature processing method is illustrated. All parameters are set by default, and any changes will be noted.
为了将原始特征转换为可由训练模型计算的数值特征向量,需要实施一系列数据预处理任务。表 2 中展示了每种特征处理方法。所有参数均设置为默认值,如有更改将予以注明。
Table 2 Explanation of Data Preprocessing
表 2: 数据预处理说明
| 特征名称 (FeatureName) | 处理说明 (Explanation of Processing) | 处理后的特征 (Feature Processed) |
|---|---|---|
| DayOfWeek | 二值化,周末为1,其他为0 | 0或1 |
| PdDistrict | 独热编码,映射为独热编码 | [0, 9] |
| Address | Tokenizer,地址分段 | [O,block.... |
| Tok_output | TF-IDF,计算地址分段的重要性 | (100,[27,33... |
| Category | StringIndexer,映射到[0,38] | [0, 38] |
The feature obtained after pre-processing will be merged into feature vectors by Join components. After TF-IDF, the feature vectors have high dimensionality but are sparse. Chi Sq Selector is used to select 100 features fitting models with the largest chi-square information. Logistic Regression with LBFGS is used to fit the multi-classification model, and then the test data is predicted through the trained model, and save the output results as a CSV file. The interface of the above analysis process after system creation is shown in Fig. 6.
预处理后获得的特征将通过 Join 组件合并为特征向量。经过 TF-IDF 处理后,特征向量具有高维度但稀疏。Chi Sq Selector 用于选择 100 个具有最大卡方信息的特征来拟合模型。使用带有 LBFGS 的 Logistic Regression 来拟合多分类模型,然后通过训练好的模型预测测试数据,并将输出结果保存为 CSV 文件。系统创建后,上述分析过程的界面如图 6 所示。

Fig. 6. Created Flow Chart Interface.

图 6: 创建的流程图界面。
By comparing the predicted value of the test data with the actual label, it is found that the accuracy is about $72.54%$ . If more features are added to the process, the complexity of the model will increase, and the accuracy will also increase. With this system, machine learning processes can be created easily and quickly, and users can focus on the improvement of the analysis method.
通过比较测试数据的预测值与实际标签,发现准确率约为 $72.54%$。如果在过程中添加更多特征,模型的复杂度会增加,准确率也会提高。借助该系统,可以轻松快速地创建机器学习流程,用户可以将注意力集中在分析方法的改进上。
The parallel execution optimization of the process is introduced in Section 4. In order to test the effectiveness of the optimization method, the data of this experiment are randomly extracted and divided into ten groups of data including $10%$ , $20%$ , 30%... $100%$ , which are made to execute the analysis process in this experiment separately with the optimized method and the non-optimized method. No optimization refers to the sequential execution of components in the process to obtain the running time of each process in ms, as shown in Figure 7.
第4节介绍了流程的并行执行优化。为了测试优化方法的有效性,本实验的数据被随机抽取并分为十组,包括 $10%$、$20%$、30%... $100%$,分别使用优化方法和非优化方法执行本实验的分析流程。非优化指的是按顺序执行流程中的组件,以获取每个流程的运行时间(单位为毫秒),如图7所示。

Fig. 7. Comparison chart of time efficiency with optimization and with without optimization.

图 7: 优化与未优化的时间效率对比图。
It can be seen that, with the linear growth of data volume, the time of nonoptimized process execution increases faster, and the growth rate of time tends to increase in the later period. While with the increase of data volume, the time growth of the optimized process execution scheme is relatively slow, which shows the effectiveness of the system by implementing the optimization scheme.
可以看出,随着数据量的线性增长,未优化流程的执行时间增长更快,且后期时间增长速度趋于加快。而随着数据量的增加,优化后的流程执行方案的时间增长相对较慢,这表明通过实施优化方案,系统的有效性得到了体现。
5.4 Experimental Analysis of Spark Agent Based on LangGraph
5.4 基于 LangGraph 的 Spark Agent 实验分析
This paper uses California housing price information as a case study to explore the practical application of Spark agents based on LangGraph. The dataset provides a wealth of information for analysis, including Longitude, Latitude, Housing Median Age, Total Rooms, Total Bedrooms, Population, Households, Median Income, and Median House Value, as shown in Figure 8.
本文以加州房价信息为案例,探讨了基于LangGraph的Spark智能体的实际应用。该数据集为分析提供了丰富的信息,包括经度、纬度、房屋中位年龄、总房间数、总卧室数、人口、家庭数、收入中位数和房屋价值中位数,如图8所示。
In the practical project, the California dataset was loaded via Spark, and a Spark agent was constructed using LangGraph. This agent, powered by a largescale language model, is capable of processing and analyzing the data to provide in-depth insights into California’s housing prices. The LangGraph framework utilizes a StateGraph class to define a workflow graph, which is used to construct the execution process of an agent. It allows for conditional looping between different nodes, execution of various tasks, and decision-making on whether to
在实际项目中,加州数据集通过 Spark 加载,并使用 LangGraph 构建了一个 Spark 智能体。该智能体由大语言模型驱动,能够处理和分析数据,从而提供对加州房价的深入洞察。LangGraph 框架利用 StateGraph 类来定义一个工作流图,用于构建智能体的执行过程。它允许在不同节点之间进行条件循环、执行各种任务,并决定是否继续执行。

Fig. 8. California Housing Price Data

图 8: 加州房价数据
only showing top 20 rows
仅显示前20行
continue or terminate the workflow based on the response from the tools, as shown in Figure 9.
根据工具的响应继续或终止工作流程,如图 9 所示。

Fig. 9. Agent Workflow Based on LangGraph .
图 9: 基于 LangGraph 的 AI智能体工作流程
As shown in Figure 10, by invoking the agent executor.invoke method, complex data analysis tasks within the Spark environment can be executed to retrieve information about database tables or calculate average housing prices. During the execution of the agent, the parsing of the output from the Large Language Model (LLM) plays a crucial role. If there are issues with the parsing process, it can lead to interruptions in the analysis. To ensure the accuracy of data analysis, a more powerful language model can be introduced to enhance text parsing capabilities, thereby optimizing the performance of the agent.
如图 10 所示,通过调用 agent executor.invoke 方法,可以在 Spark 环境中执行复杂的数据分析任务,以检索数据库表的信息或计算平均房价。在执行 AI 智能体时,大语言模型 (LLM) 输出的解析起着至关重要的作用。如果解析过程出现问题,可能会导致分析中断。为了确保数据分析的准确性,可以引入更强大的语言模型来增强文本解析能力,从而优化 AI 智能体的性能。

Fig. 10. SparkAgent Operation Example.
图 10: SparkAgent 操作示例。
6 Conclusion
6 结论
In order to solve the problems that appear when data analysts use Spark to carry out machine learning analysis of large-scale data, this paper designs and implements a prototype of a distributed, flow-based Analysis System that supports multiple machine learning algorithms. In section 3 of this paper, the business model and architecture of the system was introduced as a whole. In section 4 of this paper, the key technologies from each module are described in detail, including the storage and management of intermediate data, the implementation of business components of machine learning, the creation and validation of machine learning processes, the translation and execution of machine learning processes. It also optimizes the execution of complex machine learning processes logically and translates the logical flow chart into a model that can be executed in parallel as efficiently as possible in the physical execution phase.
为了解决数据分析师使用 Spark 进行大规模数据机器学习分析时出现的问题,本文设计并实现了一个支持多种机器学习算法的分布式、基于流的分析系统原型。本文第 3 节整体介绍了系统的业务模型和架构。本文第 4 节详细描述了各模块的关键技术,包括中间数据的存储和管理、机器学习业务组件的实现、机器学习流程的创建和验证、机器学习流程的翻译和执行。此外,本文还对复杂机器学习流程的执行进行了逻辑优化,并将逻辑流程图转换为在物理执行阶段尽可能高效并行执行的模型。
At present, the system converts all Spark MLlib algorithms into components automatically, which will be required to expand the algorithm library in practice. Meanwhile, in the future, relevant research can be carried out in the aspect of data dependence; for example, the system can automatically slice the data set, allocate the processing tasks of different features of the same data set to different distributed nodes for parallel processing, and improve the performance efficiency of feature processing tasks and the utilization rate of distributed resources.
目前,系统自动将所有 Spark MLlib 算法转换为组件,这在实际应用中扩展算法库是必要的。同时,未来可以在数据依赖性方面进行相关研究;例如,系统可以自动切片数据集,将同一数据集的不同特征处理任务分配到不同的分布式节点进行并行处理,从而提高特征处理任务的性能效率和分布式资源的利用率。
The Spark agent based on LangGraph provides a powerful tool for big data analysis systems, which not only simplifies the data analysis process but also enhances the s cal ability of the system. With the continuous advancement of Spark agent technology, it will play an even more critical role in the fields of data analysis and machine learning in the future.
基于 LangGraph 的 Spark 智能体为大数据分析系统提供了强大的工具,不仅简化了数据分析流程,还提升了系统的可扩展性。随着 Spark 智能体技术的不断进步,它将在未来的数据分析和机器学习领域发挥更加关键的作用。
