Research on the Application of Spark Streaming Real-Time Data Analysis System and large language model Intelligent Agents
Spark Streaming 实时数据分析系统与大语言模型智能体应用研究
Jialin Wang $^{1}$ and Zhihua Duan2 $^{1}$ Executive Vice President,Ferret Relationship Intelligence Burlingame, CA 94010, USA ji al in wang space@gmail.com https://www.linkedin.com/in/star space nlp/ 2 Intelligent Cloud Network Monitoring Department China Telecom Shanghai Company,Shanghai, China duanzh.sh@china telecom.cn
Jialin Wang $^{1}$ 和 Zhihua Duan2 $^{1}$ 执行副总裁,Ferret Relationship Intelligence,Burlingame, CA 94010, USA ji al in wang space@gmail.com https://www.linkedin.com/in/star space nlp/ 2 智能云网络监控部,中国电信上海公司,上海,中国 duanzh.sh@china telecom.cn
Abstract. This study explores the integration of Agent AI with LangGraph to enhance real-time data analysis systems in big data environments. The proposed framework overcomes limitations of static workflows, inefficient stateful computations, and lack of human intervention by leveraging LangGraph’s graph-based workflow construction and dynamic decision-making capabilities. LangGraph allows large language models (LLMs) to dynamically determine control flows, invoke tools, and assess the necessity of further actions, improving flexibility and efficiency. The system architecture incorporates Apache Spark Streaming, Kafka, and LangGraph to create a high-performance sentiment analysis system. LangGraph’s capabilities include precise state management, dynamic workflow construction, and robust memory check pointing, enabling seamless multi-turn interactions and context retention. Human-inthe-loop mechanisms are integrated to refine sentiment analysis, particularly in ambiguous or high-stakes scenarios, ensuring greater reliability and contextual relevance.
摘要。本研究探讨了将AI智能体与LangGraph集成以增强大数据环境中的实时数据分析系统。提出的框架通过利用LangGraph的基于图的工作流构建和动态决策能力,克服了静态工作流、低效的状态计算以及缺乏人工干预的局限性。LangGraph允许大语言模型动态确定控制流、调用工具并评估进一步行动的必要性,从而提高灵活性和效率。系统架构结合了Apache Spark Streaming、Kafka和LangGraph,构建了一个高性能的情感分析系统。LangGraph的功能包括精确的状态管理、动态工作流构建和强大的内存检查点,从而实现无缝的多轮交互和上下文保留。人工干预机制被集成以优化情感分析,特别是在模糊或高风险场景中,确保更高的可靠性和上下文相关性。
Key features such as real-time state streaming, debugging via LangGraph Studio, and efficient handling of large-scale data streams make this framework ideal for adaptive decision-making. Experimental results confirm the system’s ability to classify inquiries, detect sentiment trends, and escalate complex issues for manual review, demonstrating a synergistic blend of LLM capabilities and human oversight.
实时状态流、通过 LangGraph Studio 进行调试以及高效处理大规模数据流等关键特性,使该框架成为自适应决策的理想选择。实验结果证实了该系统具备分类查询、检测情感趋势以及将复杂问题升级为人工审核的能力,展示了大语言模型能力与人工监督的协同融合。
This work presents a scalable, adaptable, and reliable solution for realtime sentiment analysis and decision-making, advancing the use of Agent AI and LangGraph in big data applications.
该工作提出了一种可扩展、适应性强且可靠的解决方案,用于实时情感分析和决策制定,推动了大语言模型和AI智能体在大数据应用中的使用。
Keywords: Large Language Model · Agent · Langchain · ChatGPT · ERNIE-4 · GLM-4 · Qwen2.5 · Big Data · Spark Streaming · Real-time data analysis system · Sentiment Analysis .
关键词:大语言模型 · AI智能体 · Langchain · ChatGPT · ERNIE-4 · GLM-4 · Qwen2.5 · 大数据 · Spark Streaming · 实时数据分析系统 · 情感分析
1 Introduction
1 引言
In the internet era, there are higher requirements for the effectiveness and granularity of data analysis. At present, social applications, e-commerce and new media applications all have generated massive data. In order to mine and analyze the information value in data stream in real time, it is necessary to design a real-time data processing system for big data analysis.
互联网时代对数据分析的有效性和粒度提出了更高的要求。目前,社交应用、电商和新媒体应用都产生了海量数据。为了实时挖掘和分析数据流中的信息价值,需要设计一个用于大数据分析的实时数据处理系统。
The frameworks and technologies for big data processing have been developed continuously from Map Reduce, a parallel distributed processing framework based on Hadoop platform, to Storm streaming processing and Spark ecosystem, which provide distributed big data processing methods and programming interfaces, which have a great effect on the development of massive data analysis programs.
大数据处理的框架和技术从基于Hadoop平台的并行分布式处理框架MapReduce,到Storm流处理和Spark生态系统不断发展,提供了分布式大数据处理方法和编程接口,对海量数据分析程序的发展产生了巨大影响。
Radix calculation is often used to implement data analysis, that is to say, calculating the number of different elements in the data [1].For example, to count the number of independent IP addresses within 5 minutes of a website. We can use an accurate radix calculation algorithm to work out this problem, but it needs to cache all IP addresses within 5 minutes, which consumes too much resources. While the radix estimation algorithm is an algorithm based on probabilistic statistics theory to estimate the radix of different elements in a given data set, which improves the computational efficiency by sacrificing certain data accuracy[2].
基数计算常被用于实现数据分析,即计算数据中不同元素的数量 [1]。例如,统计一个网站在5分钟内的独立IP地址数量。我们可以使用精确的基数计算算法来解决这个问题,但它需要缓存5分钟内的所有IP地址,这会消耗过多资源。而基数估计算法是一种基于概率统计理论的算法,用于估计给定数据集中不同元素的基数,通过牺牲一定的数据精度来提高计算效率 [2]。
The main contents of this paper are as follows: Section 2 designs a realtime data acquisition and analysis system based on Spark Streaming; Section 3 compares and analyses the accurate calculation methods and estimation methods of radix calculation for stateful computing operations, and verifies that the radix estimation method based on HyperLog+is more suitable for real-time statistics of radix estimation for big data; Section 4 makes a summary about this paper.
本文主要内容如下:第2节设计了一个基于Spark Streaming的实时数据采集与处理系统;第3节对比分析了有状态计算操作基数计算的精确计算方法和估算方法,验证了基于HyperLog+的基数估算方法更适合大数据基数估算的实时统计;第4节对本文进行了总结。
With the rapid development of large model technology, real-time data analysis based on intelligent agents is becoming increasingly important. This paper will delve into the construction of an efficient real-time data analysis system through advanced technologies such as Apache Spark, Spark Streaming, Large Language Model intelligent agents, and Apache Kafka. Apache Spark, as a powerful distributed computing framework, not only handles large-scale datasets but also supports real-time data stream processing. With real-time analysis based on large model intelligent agents, it can provide dynamic decision support.
随着大模型技术的快速发展,基于AI智能体的实时数据分析变得越来越重要。本文将深入探讨如何通过Apache Spark、Spark Streaming、大语言模型AI智能体以及Apache Kafka等先进技术构建高效的实时数据分析系统。Apache Spark作为一种强大的分布式计算框架,不仅能够处理大规模数据集,还支持实时数据流处理。结合基于大模型智能体的实时分析,它可以提供动态决策支持。
2 Real-time Data Acquisition and Analysis System
2 实时数据采集与分析系统
Real-time data acquisition and analysis system not only needs to meet the requirements of concurrency, but also needs to ensure real-time data processing and certain data disaster tolerance guarantees. Based on open source system, this paper designs a real-time data acquisition and analysis system including data acquisition service, data queue service, and data analysis service [3,4]. The overall architecture includes data acquisition client-side, data acquisition server OpenResty, Kafka (Distributed Publish/Read Message System) Cluster and Data Analysis and Computing Program based on Spark Streaming-based
实时数据采集与分析系统不仅要满足并发的需求,还需要确保实时数据处理和一定的数据容灾保障。基于开源系统,本文设计了一个实时数据采集与分析系统,包括数据采集服务、数据队列服务和数据分析服务 [3,4]。整体架构包括数据采集客户端、数据采集服务器 OpenResty、Kafka(分布式发布/读取消息系统)集群和基于 Spark Streaming 的数据分析与计算程序。
2.1 Client-Side of Data Acquisition and its Format Definition
2.1 数据采集的客户端及其格式定义
In order to make it easier for third-party applications to implement data integration and uniform data processing, the Client-Side of Data Acquisition (Open Resty) is designed, and the format of the collected data in the client is defined. According to the subject of the event, the category of the event, the attributes involved in the event, the time of the event, the location of the event and the result of the event, the log information is defined, which mainly includes several domains: application identification domain, device information domain, user identification domain, action event domain, action object domain, action time domain, action geography domain and action result domain, as shown in Table 1.
为了便于第三方应用程序实现数据集成和统一数据处理,设计了数据采集客户端(Open Resty),并定义了客户端采集数据的格式。根据事件的主体、事件的类别、事件涉及的属性、事件的时间、事件的地点和事件的结果,定义了日志信息,主要包括以下几个域:应用标识域、设备信息域、用户标识域、动作事件域、动作对象域、动作时间域、动作地理域和动作结果域,如表 1 所示。
Table 1. Definition of Log Information Domain.
表 1: 日志信息域定义
日志信息域 | 描述 |
---|---|
应用识别域 | 定义应用的标识、版本和类型。 |
设备信息域 | 定义设备的操作系统、分辨率、型号、分辨率及用户代理。 |
用户ID域 | 定义设备标识和用户标识。 |
行为事件域 | 定义常见的浏览事件、异常事件、播放相关事件、搜索、评论、分享及其他事件,同时支持自定义事件。 |
行为对象域 | 定义与相应事件域相关的业务属性,如节点识别、URL地址、流地址、播放时间点等。 |
行为时间域 | 定义事件的开始和结束时间。 |
行为地理域 | 定义事件发生的经纬度、网络类型、IP地址等。 |
行为结果域 | 定义事件的结果。 |
2.2 Real-time Data Acquisition Service
2.2 实时数据采集服务
Nginx is a high-performance HTTP server, which has the advantages of low memory occupation and high stability compared with Apache, Lighthttpd, and other HTTP servers. OpenResty is a high-performance Web platform based on Nginx server and Lua interpreter, which makes full use of Nginx’s non-blocking I/O model and uses Lua scripting language to call various C and Lua modules supported by Nginx. It not only makes HTTP client connection requests but also provides consistent, high-performance responses to remote backends such as MySQL, Redis, and Kafka.
Nginx 是一个高性能的 HTTP 服务器,与 Apache、Lighthttpd 等其他 HTTP 服务器相比,具有内存占用低、稳定性高的优势。OpenResty 是一个基于 Nginx 服务器和 Lua 解释器的高性能 Web 平台,它充分利用了 Nginx 的非阻塞 I/O 模型,并使用 Lua 脚本语言调用 Nginx 支持的各种 C 和 Lua 模块。它不仅处理 HTTP 客户端连接请求,还能对 MySQL、Redis 和 Kafka 等远程后端提供一致的高性能响应。
Based on OpenResty’s real-time data acquisition service, the format of the report log is checked to avoid abnormal log entry and polluting the system through Lua module based on the defined log record format. At the same time, the user agent information and IP address information in the log are supplemented and processed. Through the Kafka Lua producer client, the processed log is released to different Kafka partitions for data collection.
基于 OpenResty 的实时数据采集服务,根据定义的日志记录格式,通过 Lua 模块检查报告日志的格式,避免异常日志条目和污染系统。同时,对日志中的用户代理信息和 IP 地址信息进行补充和处理。通过 Kafka Lua 生产者客户端,将处理后的日志发布到不同的 Kafka 分区进行数据收集。
2.3 Real-time Data Cache Queue
2.3 实时数据缓存队列
Kafka is an easy-to-extend topic-based publish/subscribe message queuing system, which mainly consists of Producer, Broker, and Consumer [5]. Producer is responsible for collecting and sending messages to Broker. Broker receives messages from Producer and persists them. Consumer is the user of messages and gets messages from Broker. Kafka, as a buffer of data aggregation, is the core of the whole data architecture, which can provide data for multiple consumers and enable real-time data to serve multiple scenarios and services.
Kafka 是一个易于扩展的基于主题的发布/订阅消息队列系统,主要由 Producer、Broker 和 Consumer 组成 [5]。Producer 负责收集并将消息发送到 Broker。Broker 接收来自 Producer 的消息并持久化存储。Consumer 是消息的使用者,从 Broker 获取消息。Kafka 作为数据聚合的缓冲区,是整个数据架构的核心,能够为多个消费者提供数据,并实现实时数据服务于多种场景和服务。
The Kafka cluster decouples the real-time data flow and achieves asynchronous processing between the producer and the consumer of the data. As a Producer of real-time data, the data acquisition server, OpenResty, publishes to Kafka cluster topics based on event types. Spark Streaming, as a Consumer, reads data from Kafka cluster, forming a real-time data processing pipeline.
Kafka 集群解耦实时数据流,实现数据生产者与消费者之间的异步处理。作为实时数据生产者,数据采集服务器 OpenResty 根据事件类型向 Kafka 集群主题发布数据。Spark Streaming 作为消费者,从 Kafka 集群读取数据,形成实时数据处理流水线。
2.4 Real-time Data Analysis System Based on Spark Streaming
2.4 基于 Spark Streaming 的实时数据分析系统
Spark Streaming is a frame used for real-time calculation in Spark big data analysis system. The Spark Streaming splits real-time data streams into DStream, Disc ret i zed Stream. As a basic abstraction in Spark Streaming, internal DStream maintains a set of Resilient Distributed Datasets with discrete time axis as the key [6]. These RDD sequences respectively represent datasets in different time periods, and various operations on DStream will eventually be mapped to internal RDD, so as to achieve a seamless connection with Spark.
Spark Streaming 是 Spark 大数据分析系统中用于实时计算的框架。Spark Streaming 将实时数据流拆分为 DStream(离散化流)。作为 Spark Streaming 中的基本抽象,内部的 DStream 维护了一组以离散时间轴为键的弹性分布式数据集(Resilient Distributed Datasets)[6]。这些 RDD 序列分别表示不同时间段的数据集,对 DStream 的各种操作最终都会映射到内部的 RDD,从而实现与 Spark 的无缝连接。
2.5 Real-time Data Analysis and Classification
2.5 实时数据分析和分类
Data analysis based on Spark Streaming usually includes two kinds of calculation: stateless calculation and stateful calculation. For example, when calculating the number of page views per 5 minutes and the number of independent IP, if the batch processing interval of Spark Streaming is 5 minutes, the number of PVs per day can be directly added up and calculated; while the number of independent IP needs to save all the IP numbers in 5 minutes before unified computing can be carried out. It will not only cost more resources to complete stateful computing, but also takes a long time to respond to the need for everyday and weekly multi-granularity statistics. Therefore, only by classifying data computing and adopting different calculation methods according to different classifications, can the analysis be more effective and real-time.
基于Spark Streaming的数据分析通常包括两种计算:无状态计算和有状态计算。例如,在计算每5分钟的页面浏览量(PV)和独立IP数时,如果Spark Streaming的批处理间隔为5分钟,每天的PV数可以直接累加计算;而独立IP数则需要保存5分钟内的所有IP号,然后进行统一计算。完成有状态计算不仅需要更多的资源,而且在处理日常和每周的多粒度统计需求时响应时间较长。因此,只有对数据计算进行分类,并根据不同分类采用不同的计算方法,才能使分析更加有效和实时。
2.6 Intelligent Agent Spark Streaming Analysis
2.6 智能体Spark Streaming分析
As shown in Figure 1, this study implements a Spark Streaming real-time analysis system architecture based on large language model agents, achieving research on sentiment analysis of user review data.
如图 1 所示,本研究实现了基于大语言模型 AI 智能体的 Spark Streaming 实时分析系统架构,实现了对用户评论数据的情感分析研究。
Fig. 1. Spark Streaming and Large Model Intelligent Agent Architecture Diagram.
图 1: Spark Streaming 与大语言模型 AI智能体架构图。
This system primarily employs the following key technologies to achieve efficient data processing and analysis:
本系统主要采用以下关键技术来实现高效的数据处理和分析:
- Sockets: Serving as the initial data access point, it facilitates network communication through the TCP/IP protocol for data transmission between clients and servers. In this study, restaurant review datasets and Yelp datasets are used as experimental data.
套接字 (Sockets):作为初始数据访问点,它通过 TCP/IP 协议促进网络通信,实现客户端和服务器之间的数据传输。本研究使用餐厅评论数据集和 Yelp 数据集作为实验数据。
- Spark Streaming: Apache Spark is an efficient distributed computing framework that offers fast and versatile data processing capabilities. The Spark Streaming module is specifically designed for handling real-time data streams. This experiment configures and deploys Spark-related services and containers using Docker, setting up a cluster architecture with a Spark master and workers.
- Spark Streaming:Apache Spark 是一种高效的分布式计算框架,提供快速且多功能的数据处理能力。Spark Streaming 模块专门用于处理实时数据流。本实验使用 Docker 配置和部署 Spark 相关服务和容器,设置包含 Spark master 和 workers 的集群架构。
- Spark Engine: Spark Streaming receives real-time data streams and processes them in batches. The Spark engine is responsible for handling these batched data and generating result streams. The Spark engine also interacts with large model agents.
Spark引擎:Spark Streaming 接收实时数据流,并将它们分批处理。Spark 引擎负责处理这些批处理数据并生成结果流。Spark 引擎还与大模型智能体进行交互。
- Large Model Agents: Providing sentiment analysis capabilities, these agents are based on large language models, enhancing the data processing and analysis capabilities for sentiment analysis, allowing the agents to more accurately understand and process natural language data. Models such as GPT-4, Qwen 2.5, ERNIE 4.0, and GLM-E 4 can be utilized.
大模型智能体 (Large Model Agents):这些智能体基于大语言模型,提供情绪分析能力,增强了数据处理和分析的能力,使智能体能够更准确地理解和处理自然语言数据。可以利用的模型包括 GPT-4、Qwen 2.5、ERNIE 4.0 和 GLM-E 4。
- Kafka: Apache Kafka is a distributed streaming platform that, when integrated with Spark Streaming, can efficiently handle real-time data streams. The processed data is output to Elastic search, which is responsible for storing and indexing the processed data, enabling rapid retrieval and visualization with Elastic search and Kibana.
Kafka:Apache Kafka 是一个分布式流处理平台,与 Spark Streaming 集成后,可以高效处理实时数据流。处理后的数据输出到 Elasticsearch,负责存储和索引处理后的数据,实现快速检索,并通过 Elasticsearch 和 Kibana 进行可视化。
This system constitutes a powerful real-time data processing and analysis application, capable of handling large-scale datasets and providing real-time sentiment analysis capabilities.
该系统构成了一个强大的实时数据处理和分析应用,能够处理大规模数据集并提供实时情感分析能力。
3 Accurate calculation algorithm and approximate estimation algorithm
3 精确计算算法与近似估计算法
Radix calculation method is a method to determine the number of different elements in a data stream, which consists of exact calculation algorithm and approximate estimation algorithm. Accurate calculation algorithm can usually be easily calculated by linear space complexity O(N) algorithm, but it often requires a lot of memory as well as long time, which makes itself often unable to meet the needs to process massive data. Therefore, the radix estimation method for mass data with less resources comes into being. Generally speaking, radix estimation algorithms are mainly Linear Counting, LogLog Counting, Hyper Log Log Counting [2] and Adaptive Counting [1]. Hyper Log Log $^{++}$ [7] is an algorithm based on the Hyper Log Log Counting algorithm with low error. This paper mainly estimates the number of independent IP per day based on HyperLog+ algorithm.
基数计算法是一种确定数据流中不同元素数量的方法,它由精确计算算法和近似估计算法组成。精确计算算法通常可以通过线性空间复杂度 O(N) 算法轻松计算,但它通常需要大量内存以及长时间,这使得其本身往往无法满足处理海量数据的需求。因此,用较少的资源对海量数据进行基数估算的方法应运而生。一般来说,基数估计算法主要有 Linear Counting, LogLog Counting, Hyper Log Log Counting [2] 和 Adaptive Counting [1]。Hyper Log Log $^{++}$ [7] 是一种基于 Hyper Log Log Counting 算法的低误差算法。本文主要基于 HyperLog+ 算法估算每日独立 IP 的数量。
3.1 Accurate Computing Algorithms
3.1 精确计算算法
The accurate radix calculation method, which is based on Spark Streaming, calculates the number of IP that appears for the first time every day in each batch according to the historical status, and then adds the number of IP in each batch, which refers to the number of independent IP every day. The specific process and pseudocode are shown in figure 2 and figure 3 respectively:
基于 Spark Streaming 的精确基数计算方法根据历史状态计算每天每批次中首次出现的 IP 数量,然后将每批次的 IP 数量相加,即为每天的独立 IP 数量。具体过程和伪代码分别如 图2 和 图3 所示:
Fig. 2. Accurate calculation method flow for calculating the number of independent IPs per day based on Spark Streaming statistics. Fig. 3. Pseudocode based on Spark Streaming.
图 2: 基于 Spark Streaming 统计的每日独立IP数量精确计算方法流程。图 3: 基于 Spark Streaming 的伪代码。
3.2 Radix Estimation Method
3.2 基数估计方法
Spark Streaming-based Radix Estimation Method saves a Hyper Log Log++ object to the history state per day and the IP of each batch will be added to the corresponding Hyper Log Log++ object. When calculating, call Hyper Log Log++ objects to obtain the number of daily independent IP and the specific process and pseudocode are shown in figure 4 and figure 5 respectively:
基于Spark Streaming的基数估计方法每天将Hyper Log Log++对象保存到历史状态,每批次的IP将被添加到相应的Hyper Log Log++对象中。在计算时,调用Hyper Log Log++对象以获取每日独立IP的数量,具体过程和伪代码分别如图4和图5所示:
Fig. 4. Hyper Log Log $^{++}$ Estimation Method for Statistics of Daily Independent IP Number.
图 4: 每日独立 IP 数量统计的 Hyper Log Log$^{++}$估计方法。
Fig. 5. Pseudo-code based on Hyper Log Log++.
图 5: 基于 Hyper Log Log++ 的伪代码。
4 Analysis of Spark Machine Learning Algorithms
4 Spark 机器学习算法分析
The machine learning library of Apache Spark (MLlib) is a key component of the Spark ecosystem, covering various machine learning tasks such as classification, regression, clustering, and collaborative filtering. Leveraging the distributed computing architecture of Apache Spark, MLlib can scale to more computing nodes with the increase of data volume, processing larger datasets. This not only enhances the capability of data processing but also accelerates the model training and inference process on large datasets.
Apache Spark 的机器学习库 (MLlib) 是 Spark 生态系统的关键组件,涵盖了分类、回归、聚类和协同过滤等各种机器学习任务。借助 Apache Spark 的分布式计算架构,MLlib 可以随着数据量的增加扩展到更多的计算节点,处理更大的数据集。这不仅增强了数据处理能力,还加速了在大数据集上的模型训练和推理过程。
Spark MLlib provides a comprehensive library of algorithms, supporting a variety of machine learning algorithms, including linear regression, logistic regression, decision trees, random forests, and k-means clustering, among others. These algorithms can adapt to different business scenarios and data characteristics, meeting a diverse range of analytical needs. Additionally, MLlib supports key aspects of feature engineering, enabling the construction of complex pipelines involving multiple machine learning techniques, such as feature extraction, feature transformation, and feature selection.
Spark MLlib 提供了一个全面的算法库,支持多种机器学习算法,包括线性回归、逻辑回归、决策树、随机森林和 k-means 聚类等。这些算法可以适应不同的业务场景和数据特征,满足多样化的分析需求。此外,MLlib 还支持特征工程的关键方面,能够构建涉及多种机器学习技术的复杂管道,例如特征提取、特征转换和特征选择。
Spark MLlib can be seamlessly integrated with Spark Streaming, enabling machine learning analysis on real-time data streams. This is of significant practical value for business scenarios requiring real-time feedback. It not only improves the efficiency of data processing but also provides strong support for real-time data analysis.
Spark MLlib 可以与 Spark Streaming 无缝集成,实现对实时数据流的机器学习分析。这对于需要实时反馈的业务场景具有重要的实用价值。它不仅提高了数据处理的效率,还为实时数据分析提供了强有力的支持。
5 Analysis of experimental results
5 实验结果分析
Three experiments were carried out according to three conditions of 3000, 5000, and 10000 log records being produced in each batch. Each experiment was run 12 batches, totaling 36 batches. In each batch, 1/2 of the number of IP is the same. In the experiment, the batch processing interval of Spark Streaming was 5 minute