[论文翻译]Spark Streaming 实时数据分析系统与大语言模型智能体应用研究


原文地址:https://arxiv.org/pdf/2501.14734


Research on the Application of Spark Streaming Real-Time Data Analysis System and large language model Intelligent Agents

Spark Streaming 实时数据分析系统与大语言模型智能体应用研究

Jialin Wang $^{1}$ and Zhihua Duan2 $^{1}$ Executive Vice President,Ferret Relationship Intelligence Burlingame, CA 94010, USA ji al in wang space@gmail.com https://www.linkedin.com/in/star space nlp/ 2 Intelligent Cloud Network Monitoring Department China Telecom Shanghai Company,Shanghai, China duanzh.sh@china telecom.cn

Jialin Wang $^{1}$ 和 Zhihua Duan2 $^{1}$ 执行副总裁,Ferret Relationship Intelligence,Burlingame, CA 94010, USA ji al in wang space@gmail.com https://www.linkedin.com/in/star space nlp/ 2 智能云网络监控部,中国电信上海公司,上海,中国 duanzh.sh@china telecom.cn

Abstract. This study explores the integration of Agent AI with LangGraph to enhance real-time data analysis systems in big data environments. The proposed framework overcomes limitations of static workflows, inefficient stateful computations, and lack of human intervention by leveraging LangGraph’s graph-based workflow construction and dynamic decision-making capabilities. LangGraph allows large language models (LLMs) to dynamically determine control flows, invoke tools, and assess the necessity of further actions, improving flexibility and efficiency. The system architecture incorporates Apache Spark Streaming, Kafka, and LangGraph to create a high-performance sentiment analysis system. LangGraph’s capabilities include precise state management, dynamic workflow construction, and robust memory check pointing, enabling seamless multi-turn interactions and context retention. Human-inthe-loop mechanisms are integrated to refine sentiment analysis, particularly in ambiguous or high-stakes scenarios, ensuring greater reliability and contextual relevance.

摘要。本研究探讨了将AI智能体与LangGraph集成以增强大数据环境中的实时数据分析系统。提出的框架通过利用LangGraph的基于图的工作流构建和动态决策能力,克服了静态工作流、低效的状态计算以及缺乏人工干预的局限性。LangGraph允许大语言模型动态确定控制流、调用工具并评估进一步行动的必要性,从而提高灵活性和效率。系统架构结合了Apache Spark Streaming、Kafka和LangGraph,构建了一个高性能的情感分析系统。LangGraph的功能包括精确的状态管理、动态工作流构建和强大的内存检查点,从而实现无缝的多轮交互和上下文保留。人工干预机制被集成以优化情感分析,特别是在模糊或高风险场景中,确保更高的可靠性和上下文相关性。

Key features such as real-time state streaming, debugging via LangGraph Studio, and efficient handling of large-scale data streams make this framework ideal for adaptive decision-making. Experimental results confirm the system’s ability to classify inquiries, detect sentiment trends, and escalate complex issues for manual review, demonstrating a synergistic blend of LLM capabilities and human oversight.

实时状态流、通过 LangGraph Studio 进行调试以及高效处理大规模数据流等关键特性,使该框架成为自适应决策的理想选择。实验结果证实了该系统具备分类查询、检测情感趋势以及将复杂问题升级为人工审核的能力,展示了大语言模型能力与人工监督的协同融合。

This work presents a scalable, adaptable, and reliable solution for realtime sentiment analysis and decision-making, advancing the use of Agent AI and LangGraph in big data applications.

该工作提出了一种可扩展、适应性强且可靠的解决方案,用于实时情感分析和决策制定,推动了大语言模型和AI智能体在大数据应用中的使用。

Keywords: Large Language Model · Agent · Langchain · ChatGPT · ERNIE-4 · GLM-4 · Qwen2.5 · Big Data · Spark Streaming · Real-time data analysis system · Sentiment Analysis .

关键词:大语言模型 · AI智能体 · Langchain · ChatGPT · ERNIE-4 · GLM-4 · Qwen2.5 · 大数据 · Spark Streaming · 实时数据分析系统 · 情感分析

1 Introduction

1 引言

In the internet era, there are higher requirements for the effectiveness and granularity of data analysis. At present, social applications, e-commerce and new media applications all have generated massive data. In order to mine and analyze the information value in data stream in real time, it is necessary to design a real-time data processing system for big data analysis.

互联网时代对数据分析的有效性和粒度提出了更高的要求。目前,社交应用、电商和新媒体应用都产生了海量数据。为了实时挖掘和分析数据流中的信息价值,需要设计一个用于大数据分析的实时数据处理系统。

The frameworks and technologies for big data processing have been developed continuously from Map Reduce, a parallel distributed processing framework based on Hadoop platform, to Storm streaming processing and Spark ecosystem, which provide distributed big data processing methods and programming interfaces, which have a great effect on the development of massive data analysis programs.

大数据处理的框架和技术从基于Hadoop平台的并行分布式处理框架MapReduce,到Storm流处理和Spark生态系统不断发展,提供了分布式大数据处理方法和编程接口,对海量数据分析程序的发展产生了巨大影响。

Radix calculation is often used to implement data analysis, that is to say, calculating the number of different elements in the data [1].For example, to count the number of independent IP addresses within 5 minutes of a website. We can use an accurate radix calculation algorithm to work out this problem, but it needs to cache all IP addresses within 5 minutes, which consumes too much resources. While the radix estimation algorithm is an algorithm based on probabilistic statistics theory to estimate the radix of different elements in a given data set, which improves the computational efficiency by sacrificing certain data accuracy[2].

基数计算常被用于实现数据分析,即计算数据中不同元素的数量 [1]。例如,统计一个网站在5分钟内的独立IP地址数量。我们可以使用精确的基数计算算法来解决这个问题,但它需要缓存5分钟内的所有IP地址,这会消耗过多资源。而基数估计算法是一种基于概率统计理论的算法,用于估计给定数据集中不同元素的基数,通过牺牲一定的数据精度来提高计算效率 [2]。

The main contents of this paper are as follows: Section 2 designs a realtime data acquisition and analysis system based on Spark Streaming; Section 3 compares and analyses the accurate calculation methods and estimation methods of radix calculation for stateful computing operations, and verifies that the radix estimation method based on HyperLog+is more suitable for real-time statistics of radix estimation for big data; Section 4 makes a summary about this paper.

本文主要内容如下:第2节设计了一个基于Spark Streaming的实时数据采集与处理系统;第3节对比分析了有状态计算操作基数计算的精确计算方法和估算方法,验证了基于HyperLog+的基数估算方法更适合大数据基数估算的实时统计;第4节对本文进行了总结。

With the rapid development of large model technology, real-time data analysis based on intelligent agents is becoming increasingly important. This paper will delve into the construction of an efficient real-time data analysis system through advanced technologies such as Apache Spark, Spark Streaming, Large Language Model intelligent agents, and Apache Kafka. Apache Spark, as a powerful distributed computing framework, not only handles large-scale datasets but also supports real-time data stream processing. With real-time analysis based on large model intelligent agents, it can provide dynamic decision support.

随着大模型技术的快速发展,基于AI智能体的实时数据分析变得越来越重要。本文将深入探讨如何通过Apache Spark、Spark Streaming、大语言模型AI智能体以及Apache Kafka等先进技术构建高效的实时数据分析系统。Apache Spark作为一种强大的分布式计算框架,不仅能够处理大规模数据集,还支持实时数据流处理。结合基于大模型智能体的实时分析,它可以提供动态决策支持。

2 Real-time Data Acquisition and Analysis System

2 实时数据采集与分析系统

Real-time data acquisition and analysis system not only needs to meet the requirements of concurrency, but also needs to ensure real-time data processing and certain data disaster tolerance guarantees. Based on open source system, this paper designs a real-time data acquisition and analysis system including data acquisition service, data queue service, and data analysis service [3,4]. The overall architecture includes data acquisition client-side, data acquisition server OpenResty, Kafka (Distributed Publish/Read Message System) Cluster and Data Analysis and Computing Program based on Spark Streaming-based

实时数据采集与分析系统不仅要满足并发的需求,还需要确保实时数据处理和一定的数据容灾保障。基于开源系统,本文设计了一个实时数据采集与分析系统,包括数据采集服务、数据队列服务和数据分析服务 [3,4]。整体架构包括数据采集客户端、数据采集服务器 OpenResty、Kafka(分布式发布/读取消息系统)集群和基于 Spark Streaming 的数据分析与计算程序。

2.1 Client-Side of Data Acquisition and its Format Definition

2.1 数据采集的客户端及其格式定义

In order to make it easier for third-party applications to implement data integration and uniform data processing, the Client-Side of Data Acquisition (Open Resty) is designed, and the format of the collected data in the client is defined. According to the subject of the event, the category of the event, the attributes involved in the event, the time of the event, the location of the event and the result of the event, the log information is defined, which mainly includes several domains: application identification domain, device information domain, user identification domain, action event domain, action object domain, action time domain, action geography domain and action result domain, as shown in Table 1.

为了便于第三方应用程序实现数据集成和统一数据处理,设计了数据采集客户端(Open Resty),并定义了客户端采集数据的格式。根据事件的主体、事件的类别、事件涉及的属性、事件的时间、事件的地点和事件的结果,定义了日志信息,主要包括以下几个域:应用标识域、设备信息域、用户标识域、动作事件域、动作对象域、动作时间域、动作地理域和动作结果域,如表 1 所示。

Table 1. Definition of Log Information Domain.

表 1: 日志信息域定义

日志信息域 描述
应用识别域 定义应用的标识、版本和类型。
设备信息域 定义设备的操作系统、分辨率、型号、分辨率及用户代理。
用户ID域 定义设备标识和用户标识。
行为事件域 定义常见的浏览事件、异常事件、播放相关事件、搜索、评论、分享及其他事件,同时支持自定义事件。
行为对象域 定义与相应事件域相关的业务属性,如节点识别、URL地址、流地址、播放时间点等。
行为时间域 定义事件的开始和结束时间。
行为地理域 定义事件发生的经纬度、网络类型、IP地址等。
行为结果域 定义事件的结果。

2.2 Real-time Data Acquisition Service

2.2 实时数据采集服务

Nginx is a high-performance HTTP server, which has the advantages of low memory occupation and high stability compared with Apache, Lighthttpd, and other HTTP servers. OpenResty is a high-performance Web platform based on Nginx server and Lua interpreter, which makes full use of Nginx’s non-blocking I/O model and uses Lua scripting language to call various C and Lua modules supported by Nginx. It not only makes HTTP client connection requests but also provides consistent, high-performance responses to remote backends such as MySQL, Redis, and Kafka.

Nginx 是一个高性能的 HTTP 服务器,与 Apache、Lighthttpd 等其他 HTTP 服务器相比,具有内存占用低、稳定性高的优势。OpenResty 是一个基于 Nginx 服务器和 Lua 解释器的高性能 Web 平台,它充分利用了 Nginx 的非阻塞 I/O 模型,并使用 Lua 脚本语言调用 Nginx 支持的各种 C 和 Lua 模块。它不仅处理 HTTP 客户端连接请求,还能对 MySQL、Redis 和 Kafka 等远程后端提供一致的高性能响应。

Based on OpenResty’s real-time data acquisition service, the format of the report log is checked to avoid abnormal log entry and polluting the system through Lua module based on the defined log record format. At the same time, the user agent information and IP address information in the log are supplemented and processed. Through the Kafka Lua producer client, the processed log is released to different Kafka partitions for data collection.

基于 OpenResty 的实时数据采集服务,根据定义的日志记录格式,通过 Lua 模块检查报告日志的格式,避免异常日志条目和污染系统。同时,对日志中的用户代理信息和 IP 地址信息进行补充和处理。通过 Kafka Lua 生产者客户端,将处理后的日志发布到不同的 Kafka 分区进行数据收集。

2.3 Real-time Data Cache Queue

2.3 实时数据缓存队列

Kafka is an easy-to-extend topic-based publish/subscribe message queuing system, which mainly consists of Producer, Broker, and Consumer [5]. Producer is responsible for collecting and sending messages to Broker. Broker receives messages from Producer and persists them. Consumer is the user of messages and gets messages from Broker. Kafka, as a buffer of data aggregation, is the core of the whole data architecture, which can provide data for multiple consumers and enable real-time data to serve multiple scenarios and services.

Kafka 是一个易于扩展的基于主题的发布/订阅消息队列系统,主要由 Producer、Broker 和 Consumer 组成 [5]。Producer 负责收集并将消息发送到 Broker。Broker 接收来自 Producer 的消息并持久化存储。Consumer 是消息的使用者,从 Broker 获取消息。Kafka 作为数据聚合的缓冲区,是整个数据架构的核心,能够为多个消费者提供数据,并实现实时数据服务于多种场景和服务。

The Kafka cluster decouples the real-time data flow and achieves asynchronous processing between the producer and the consumer of the data. As a Producer of real-time data, the data acquisition server, OpenResty, publishes to Kafka cluster topics based on event types. Spark Streaming, as a Consumer, reads data from Kafka cluster, forming a real-time data processing pipeline.

Kafka 集群解耦实时数据流,实现数据生产者与消费者之间的异步处理。作为实时数据生产者,数据采集服务器 OpenResty 根据事件类型向 Kafka 集群主题发布数据。Spark Streaming 作为消费者,从 Kafka 集群读取数据,形成实时数据处理流水线。

2.4 Real-time Data Analysis System Based on Spark Streaming

2.4 基于 Spark Streaming 的实时数据分析系统

Spark Streaming is a frame used for real-time calculation in Spark big data analysis system. The Spark Streaming splits real-time data streams into DStream, Disc ret i zed Stream. As a basic abstraction in Spark Streaming, internal DStream maintains a set of Resilient Distributed Datasets with discrete time axis as the key [6]. These RDD sequences respectively represent datasets in different time periods, and various operations on DStream will eventually be mapped to internal RDD, so as to achieve a seamless connection with Spark.

Spark Streaming 是 Spark 大数据分析系统中用于实时计算的框架。Spark Streaming 将实时数据流拆分为 DStream(离散化流)。作为 Spark Streaming 中的基本抽象,内部的 DStream 维护了一组以离散时间轴为键的弹性分布式数据集(Resilient Distributed Datasets)[6]。这些 RDD 序列分别表示不同时间段的数据集,对 DStream 的各种操作最终都会映射到内部的 RDD,从而实现与 Spark 的无缝连接。

2.5 Real-time Data Analysis and Classification

2.5 实时数据分析和分类

Data analysis based on Spark Streaming usually includes two kinds of calculation: stateless calculation and stateful calculation. For example, when calculating the number of page views per 5 minutes and the number of independent IP, if the batch processing interval of Spark Streaming is 5 minutes, the number of PVs per day can be directly added up and calculated; while the number of independent IP needs to save all the IP numbers in 5 minutes before unified computing can be carried out. It will not only cost more resources to complete stateful computing, but also takes a long time to respond to the need for everyday and weekly multi-granularity statistics. Therefore, only by classifying data computing and adopting different calculation methods according to different classifications, can the analysis be more effective and real-time.

基于Spark Streaming的数据分析通常包括两种计算:无状态计算和有状态计算。例如,在计算每5分钟的页面浏览量(PV)和独立IP数时,如果Spark Streaming的批处理间隔为5分钟,每天的PV数可以直接累加计算;而独立IP数则需要保存5分钟内的所有IP号,然后进行统一计算。完成有状态计算不仅需要更多的资源,而且在处理日常和每周的多粒度统计需求时响应时间较长。因此,只有对数据计算进行分类,并根据不同分类采用不同的计算方法,才能使分析更加有效和实时。

2.6 Intelligent Agent Spark Streaming Analysis

2.6 智能体Spark Streaming分析

As shown in Figure 1, this study implements a Spark Streaming real-time analysis system architecture based on large language model agents, achieving research on sentiment analysis of user review data.

如图 1 所示,本研究实现了基于大语言模型 AI 智能体的 Spark Streaming 实时分析系统架构,实现了对用户评论数据的情感分析研究。


Fig. 1. Spark Streaming and Large Model Intelligent Agent Architecture Diagram.

图 1: Spark Streaming 与大语言模型 AI智能体架构图。

This system primarily employs the following key technologies to achieve efficient data processing and analysis:

本系统主要采用以下关键技术来实现高效的数据处理和分析:

  1. Sockets: Serving as the initial data access point, it facilitates network communication through the TCP/IP protocol for data transmission between clients and servers. In this study, restaurant review datasets and Yelp datasets are used as experimental data.

套接字 (Sockets):作为初始数据访问点,它通过 TCP/IP 协议促进网络通信,实现客户端和服务器之间的数据传输。本研究使用餐厅评论数据集和 Yelp 数据集作为实验数据。

  1. Spark Streaming: Apache Spark is an efficient distributed computing framework that offers fast and versatile data processing capabilities. The Spark Streaming module is specifically designed for handling real-time data streams. This experiment configures and deploys Spark-related services and containers using Docker, setting up a cluster architecture with a Spark master and workers.
  2. Spark Streaming:Apache Spark 是一种高效的分布式计算框架,提供快速且多功能的数据处理能力。Spark Streaming 模块专门用于处理实时数据流。本实验使用 Docker 配置和部署 Spark 相关服务和容器,设置包含 Spark master 和 workers 的集群架构。
  3. Spark Engine: Spark Streaming receives real-time data streams and processes them in batches. The Spark engine is responsible for handling these batched data and generating result streams. The Spark engine also interacts with large model agents.

Spark引擎:Spark Streaming 接收实时数据流,并将它们分批处理。Spark 引擎负责处理这些批处理数据并生成结果流。Spark 引擎还与大模型智能体进行交互。

  1. Large Model Agents: Providing sentiment analysis capabilities, these agents are based on large language models, enhancing the data processing and analysis capabilities for sentiment analysis, allowing the agents to more accurately understand and process natural language data. Models such as GPT-4, Qwen 2.5, ERNIE 4.0, and GLM-E 4 can be utilized.

大模型智能体 (Large Model Agents):这些智能体基于大语言模型,提供情绪分析能力,增强了数据处理和分析的能力,使智能体能够更准确地理解和处理自然语言数据。可以利用的模型包括 GPT-4、Qwen 2.5、ERNIE 4.0 和 GLM-E 4。

  1. Kafka: Apache Kafka is a distributed streaming platform that, when integrated with Spark Streaming, can efficiently handle real-time data streams. The processed data is output to Elastic search, which is responsible for storing and indexing the processed data, enabling rapid retrieval and visualization with Elastic search and Kibana.

Kafka:Apache Kafka 是一个分布式流处理平台,与 Spark Streaming 集成后,可以高效处理实时数据流。处理后的数据输出到 Elasticsearch,负责存储和索引处理后的数据,实现快速检索,并通过 Elasticsearch 和 Kibana 进行可视化。

This system constitutes a powerful real-time data processing and analysis application, capable of handling large-scale datasets and providing real-time sentiment analysis capabilities.

该系统构成了一个强大的实时数据处理和分析应用,能够处理大规模数据集并提供实时情感分析能力。

3 Accurate calculation algorithm and approximate estimation algorithm

3 精确计算算法与近似估计算法

Radix calculation method is a method to determine the number of different elements in a data stream, which consists of exact calculation algorithm and approximate estimation algorithm. Accurate calculation algorithm can usually be easily calculated by linear space complexity O(N) algorithm, but it often requires a lot of memory as well as long time, which makes itself often unable to meet the needs to process massive data. Therefore, the radix estimation method for mass data with less resources comes into being. Generally speaking, radix estimation algorithms are mainly Linear Counting, LogLog Counting, Hyper Log Log Counting [2] and Adaptive Counting [1]. Hyper Log Log $^{++}$ [7] is an algorithm based on the Hyper Log Log Counting algorithm with low error. This paper mainly estimates the number of independent IP per day based on HyperLog+ algorithm.

基数计算法是一种确定数据流中不同元素数量的方法,它由精确计算算法和近似估计算法组成。精确计算算法通常可以通过线性空间复杂度 O(N) 算法轻松计算,但它通常需要大量内存以及长时间,这使得其本身往往无法满足处理海量数据的需求。因此,用较少的资源对海量数据进行基数估算的方法应运而生。一般来说,基数估计算法主要有 Linear Counting, LogLog Counting, Hyper Log Log Counting [2] 和 Adaptive Counting [1]。Hyper Log Log $^{++}$ [7] 是一种基于 Hyper Log Log Counting 算法的低误差算法。本文主要基于 HyperLog+ 算法估算每日独立 IP 的数量。

3.1 Accurate Computing Algorithms

3.1 精确计算算法

The accurate radix calculation method, which is based on Spark Streaming, calculates the number of IP that appears for the first time every day in each batch according to the historical status, and then adds the number of IP in each batch, which refers to the number of independent IP every day. The specific process and pseudocode are shown in figure 2 and figure 3 respectively:

基于 Spark Streaming 的精确基数计算方法根据历史状态计算每天每批次中首次出现的 IP 数量,然后将每批次的 IP 数量相加,即为每天的独立 IP 数量。具体过程和伪代码分别如 图2 和 图3 所示:


Fig. 2. Accurate calculation method flow for calculating the number of independent IPs per day based on Spark Streaming statistics. Fig. 3. Pseudocode based on Spark Streaming.

图 2: 基于 Spark Streaming 统计的每日独立IP数量精确计算方法流程。图 3: 基于 Spark Streaming 的伪代码。

3.2 Radix Estimation Method

3.2 基数估计方法

Spark Streaming-based Radix Estimation Method saves a Hyper Log Log++ object to the history state per day and the IP of each batch will be added to the corresponding Hyper Log Log++ object. When calculating, call Hyper Log Log++ objects to obtain the number of daily independent IP and the specific process and pseudocode are shown in figure 4 and figure 5 respectively:

基于Spark Streaming的基数估计方法每天将Hyper Log Log++对象保存到历史状态,每批次的IP将被添加到相应的Hyper Log Log++对象中。在计算时,调用Hyper Log Log++对象以获取每日独立IP的数量,具体过程和伪代码分别如图4和图5所示:


Fig. 4. Hyper Log Log $^{++}$ Estimation Method for Statistics of Daily Independent IP Number.

图 4: 每日独立 IP 数量统计的 Hyper Log Log$^{++}$估计方法。

Fig. 5. Pseudo-code based on Hyper Log Log++.

图 5: 基于 Hyper Log Log++ 的伪代码。

4 Analysis of Spark Machine Learning Algorithms

4 Spark 机器学习算法分析

The machine learning library of Apache Spark (MLlib) is a key component of the Spark ecosystem, covering various machine learning tasks such as classification, regression, clustering, and collaborative filtering. Leveraging the distributed computing architecture of Apache Spark, MLlib can scale to more computing nodes with the increase of data volume, processing larger datasets. This not only enhances the capability of data processing but also accelerates the model training and inference process on large datasets.

Apache Spark 的机器学习库 (MLlib) 是 Spark 生态系统的关键组件,涵盖了分类、回归、聚类和协同过滤等各种机器学习任务。借助 Apache Spark 的分布式计算架构,MLlib 可以随着数据量的增加扩展到更多的计算节点,处理更大的数据集。这不仅增强了数据处理能力,还加速了在大数据集上的模型训练和推理过程。

Spark MLlib provides a comprehensive library of algorithms, supporting a variety of machine learning algorithms, including linear regression, logistic regression, decision trees, random forests, and k-means clustering, among others. These algorithms can adapt to different business scenarios and data characteristics, meeting a diverse range of analytical needs. Additionally, MLlib supports key aspects of feature engineering, enabling the construction of complex pipelines involving multiple machine learning techniques, such as feature extraction, feature transformation, and feature selection.

Spark MLlib 提供了一个全面的算法库,支持多种机器学习算法,包括线性回归、逻辑回归、决策树、随机森林和 k-means 聚类等。这些算法可以适应不同的业务场景和数据特征,满足多样化的分析需求。此外,MLlib 还支持特征工程的关键方面,能够构建涉及多种机器学习技术的复杂管道,例如特征提取、特征转换和特征选择。

Spark MLlib can be seamlessly integrated with Spark Streaming, enabling machine learning analysis on real-time data streams. This is of significant practical value for business scenarios requiring real-time feedback. It not only improves the efficiency of data processing but also provides strong support for real-time data analysis.

Spark MLlib 可以与 Spark Streaming 无缝集成,实现对实时数据流的机器学习分析。这对于需要实时反馈的业务场景具有重要的实用价值。它不仅提高了数据处理的效率,还为实时数据分析提供了强有力的支持。

5 Analysis of experimental results

5 实验结果分析

Three experiments were carried out according to three conditions of 3000, 5000, and 10000 log records being produced in each batch. Each experiment was run 12 batches, totaling 36 batches. In each batch, 1/2 of the number of IP is the same. In the experiment, the batch processing interval of Spark Streaming was 5 minutes, i.e. 3000, 5000, or 10000 log records were generated every 5 minutes. The size of each log record was 1,296 bytes, and the IP address in the log is 15 bytes.

根据每批产生 3000、5000 和 10000 条日志记录的三种条件进行了三个实验。每个实验运行 12 批,共计 36 批。每批中 1/2 的 IP 数量相同。实验中,Spark Streaming 的批处理间隔为 5 分钟,即每 5 分钟生成 3000、5000 或 10000 条日志记录。每条日志记录的大小为 1,296 字节,日志中的 IP 地址为 15 字节。

Accurate Radix calculation method and Radix estimation method based on Hyper Log Log++ are run on the log respectively. In the process, the program is deployed and submitted in local mode, and the number of independent IP, the processing time of each batch, and the size of the file size under the checkpoint path are counted.

基于 Hyper Log Log++ 的精确基数计算方法和基数估计方法分别在日志上运行。在此过程中,程序以本地模式部署并提交,统计独立 IP 数量、每批次的处理时间以及检查点路径下文件的大小。

The calculation results of the error percentage of the number of independent IP counted in each batch in the estimation counting method are shown in Table 2. From Table 2, we can see that compared with the accurate calculation method, the error rate of the estimation method is less than $1.5%$ . For the number of daily independent IP in each batch, the error can be neglected. Moreover, from the perspective of the number of log records processed in batches, the more the number of logs, the fewer the relative error is.

表 2 中展示了估计计数方法中每批次独立 IP 数量计算结果的误差百分比。从表 2 可以看出,与精确计算方法相比,估计方法的误差率小于 $1.5%$。对于每批次的日独立 IP 数量,误差可以忽略不计。此外,从批次处理的日志记录数量来看,日志数量越多,相对误差越小。

Bat ch Estimat ed tAccurat e CalculatCalculat Percent age of error Bat ch ed CalculatCalculat EstimatAccurat e Percent ageof error Bat ch Estimat ed CalculatCalculat Accurat e Percent ageof error
1 2 3000 3000 0 14 2500 5000 2500 5000 0 0 25 26 5000 10001 5000 10000 0 0.01
3 4500 4500 0 15 7501 7500 0.01 27 15019 15000 0.13
4 6001 6000 0.02 16 9999 10000 0.01 28 20158 20000 0.79
5 7501 7500 0.01 17 12437 12500 0.5 29 25054 25000 0.22
6 9001 9000 0.01 18 14987 15000 0.09 30 30232 30000 0.77
7 10502 10500 0.02 19 17536 17500 0.21 31 35171 35000 0.49
8 12002 12000 0.02 20 19973 20000 0.14 32 40197 40000 0.49
9 13652 13500 1. 11 21 22416 22500 0.37 33 45222 45000 0.49
10 15142 15000 0.94 22 24862 25000 0.55 34 50427 50000 0.85
11 16651 16500 0.91 23 27556 27500 0.2 35 55419 55000 0.76
12 18224 18000 1.23 24 30082 30000 0.27 36 60537 60000 0.9

The statistical results of processing time for each batch are shown in Fig 6. From Fig 6, we can see that with the same quantities of logs, the average processing time for each batch of the accurate calculation method is twice as long as that of the radix estimation method based on Hyper Log Log++. Furthermore, as the number of log records increases, the average processing time per batch of the accurate calculation method increases more than that of the radix estimation method based on Hyper Log Log++. It can be seen that in the batch of the third experiment, the batch processing time of the accurate calculation method is longer than the batch processing interval of Spark Streaming, resulting in a larger scheduling delay.

每批处理时间的统计结果如图 6 所示。从图 6 可以看出,在相同数量的日志下,精确计算法每批的平均处理时间是基于 Hyper Log Log++ 的基数估计法的两倍。此外,随着日志记录数量的增加,精确计算法每批的平均处理时间比基于 Hyper Log Log++ 的基数估计法增加得更多。可以看出,在第三次实验的批次中,精确计算法的批次处理时间超过了 Spark Streaming 的批次处理间隔,导致调度延迟更大。


Fig. 6. Batch processing time statistics.


图 6: 批处理时间统计。

Figure 7 shows the change of checkpoint occupancy with batches. From Figure 7, it can be seen that the storage space usage based on Hyper Log Log++ tends to be stable with small fluctuation, and the absolute amount of occupancy space is much lower than the accurate calculation method, whose occupancy space increases dramatically with the increasing number of log records.

图 7 显示了检查点占用空间随批次变化的情况。从图 7 中可以看出,基于 Hyper Log Log++ 的存储空间使用趋于稳定且波动较小,且占用空间的绝对量远低于精确计算方法,后者的占用空间随日志记录数量的增加而急剧增加。

According to the above analysis, it can see that the HyperLog+ based radix estimation method has obvious advantages over the accurate calculation method in processing time and checkpoint occupancy space, with the error rate of below $1.5%$ , which can be basically neglected. Therefore, the Hyper Log Log+ radix estimation method is more suitable for real-time statistics of radix estimation of big data.

根据上述分析可以看出,基于HyperLog+的基数估计方法在处理时间和检查点占用空间方面相较于精确计算方法具有明显优势,其误差率低于$1.5%$,基本可以忽略不计。因此,Hyper Log Log+基数估计方法更适合用于大数据的实时基数统计。


Fig. 7. Checkpoint occupancy statistics.

图 7: Checkpoint 占用统计数据。

6 Discussion

6 讨论

6.1 Limitations of Real-Time Systems

6.1 实时系统的局限性

The real-time system proposed in the paper has limitations in dynamic workflow management, stateful computing, and human intervention, but these can be improved by using LangGraph to implement workflows with conditions and loops, state saving, and mechanisms for human review and feedback, thereby enhancing system efficiency.

本文提出的实时系统在动态工作流管理、状态计算和人工干预方面存在局限性,但可以通过使用 LangGraph 来实现带条件和循环的工作流、状态保存以及人工审核和反馈机制,从而提高系统效率。

  1. Dynamic Workflow Management Current Limitation: The paper lacks an explicit mechanism for handling dynamic and adaptive workflows. It discusses static pipelines for data processing, sentiment analysis, and radix estimation, but does not address flexibility in changing tasks or branching logic in workflows.

动态工作流管理的当前局限性:该论文缺乏处理动态和自适应工作流的明确机制。它讨论了数据处理、情感分析和基数估计的静态管道,但没有解决工作流中任务变更或分支逻辑的灵活性。

Improvement Using LangGraph:Cycles and Branching: Use LangGraph to implement workflows with conditionals and loops. For example: If data is incomplete, trigger preprocessing workflows. If errors occur in sentiment analysis, retry or escalate to a human operator. Control l ability: Enable fine-grained control over the execution flow, ensuring robust handling of diverse data conditions or model requirements. Rationale: This improves system resilience, allowing it to handle edge cases and dynamic data streams efficiently.

使用 LangGraph 改进:循环与分支:使用 LangGraph 实现对条件判断和循环的工作流。例如:如果数据不完整,触发预处理工作流;如果情感分析中出错,则重试或升级至人工操作员。控制能力:启用对执行流程的细粒度控制,确保对多样化数据条件或模型需求的稳健处理。理由:这提高了系统的弹性,使其能够高效处理边缘情况和动态数据流。

  1. Persistence for Stateful Computations Current Limitation: The paper’s architecture does not address state persistence explicitly. Stateful computations like IP tracking rely on in-memory or ephemeral mechanisms, which could lead to inefficiencies or data loss in distributed environments.
  2. 有状态计算的持久性 当前限制:论文的架构没有明确解决状态持久性问题。像IP跟踪这样的有状态计算依赖于内存或临时机制,在分布式环境中可能会导致效率低下或数据丢失。

Improvement Using LangGraph:Built-in Persistence: Implement state-saving after each step of the graph. For example: Save intermediate results of HyperLogLog++ estimations. Persist checkpoint data in case of failures, enabling seamless recovery. Error Recovery and Time Travel: Add the ability to rewind workflows for debugging or replaying historical data. Rationale: Enhances reliability, particularly for long-running processes, while reducing the risk of data loss.

使用 LangGraph 的改进:内置持久化:在图的每个步骤后实现状态保存。例如:保存 HyperLogLog++ 估计的中间结果。在发生故障时持久化检查点数据,实现无缝恢复。错误恢复与时间旅行:添加回滚工作流的能力,以便调试或重放历史数据。理由:提高可靠性,特别是对于长时间运行的进程,同时降低数据丢失的风险。

  1. Human-in-the-Loop for Enhanced Sentiment Analysis Current Limitation: Sentiment analysis in the paper relies entirely on LLMs without any mechanism for human oversight or correction, which may lead to inaccuracies in edge cases or domain-specific contexts.
  2. 人机协同增强情感分析当前限制:论文中的情感分析完全依赖于大语言模型,没有任何人工监督或纠正机制,这可能导致边缘案例或特定领域情境中的不准确。

Improvement Using LangGraph:Human-in-the-Loop: Incorporate LangGraph’s feature to pause workflows for human review and input. For example: When sentiment analysis confidence is low or results are ambiguous, notify a human operator for validation or refinement. Use human feedback to improve model performance iterative ly. Rationale: Improves accuracy and contextual relevance of sentiment analysis, especially in high-stakes applications.

使用 LangGraph 改进人机协作:整合 LangGraph 的功能以暂停工作流程,进行人工审查和输入。例如:当情感分析的置信度较低或结果不明确时,通知人工操作员进行验证或优化。利用人工反馈迭代改进模型性能。理由:提高情感分析的准确性和上下文相关性,特别是在高风险应用中。

6.2 Improvement Solutions Based on LangGraph

6.2 基于LangGraph的改进解决方案

As shown in Figure 8.This study employs LangGraph technology to optimize and upgrade a big data analysis system. LangGraph, as an advanced tool for constructing large language model workflows, integrates large language models with graph-based workflows to implement an emotional analysis agent system. This system is not only capable of efficiently categorizing customer inquiries but also provides manual responses or escalates issues to higher-level processing procedures when it detects negative customer sentiment.

如图 8 所示。本研究采用 LangGraph 技术对大数据分析系统进行了优化和升级。LangGraph 作为构建大语言模型工作流程的先进工具,将大语言模型与基于图的工作流程相结合,实现了情感分析智能体系统。该系统不仅能够高效地分类客户查询,还能够在检测到客户负面情绪时提供手动响应或将问题升级至更高级别的处理程序。

The key technical features of the system include:

该系统的关键技术特性包括:

  1. State Management : Precisely define and manage the state of customer interactions.

状态管理:精确定义和管理客户交互的状态。

  1. Graph Construction : Utilize StateGraph to build workflows, designing nodes, edges, and conditional edges to represent complex support processes. 3. Memory Checkpoint Settings : Save state and context information in multiturn conversations to achieve coherence in dialogue and persistence of context. 4. Human in the Loop $\bar{\cdot}$ In cases requiring human intervention, the system can facilitate manual intervention, allowing for the setting and updating of responses to enhance the quality of responses.
  2. 图构建:利用 StateGraph 构建工作流,设计节点、边和条件边以表示复杂的支持流程。
  3. 内存检查点设置:在多轮对话中保存状态和上下文信息,以实现对话的连贯性和上下文的持久性。
  4. 人在循环 (Human in the Loop) $\bar{\cdot}$ 在需要人工干预的情况下,系统可以方便手动干预,允许设置和更新响应以提高响应质量。

By applying these technologies, the LangGraph system can handle complex customer interactions more flexibly while ensuring that manual intervention can be introduced in a timely manner when necessary, thereby improving the efficiency and quality of customer service.

通过应用这些技术,LangGraph系统能够更灵活地处理复杂的客户交互,同时确保在必要时能够及时引入人工干预,从而提升客户服务的效率与质量。


Fig. 8. Sentiment Analysis Based on LangGraph.

图 8: 基于 LangGraph 的情感分析

7 Conclusion

7 结论

This study demonstrates how Agent AI, empowered by LangGraph, transforms real-time data streaming systems by introducing dynamic control flow, persistent state management, and human-in-the-loop workflows. LangGraph’s ability to implement cycles and branching in workflows allows streaming systems to adapt to complex and evolving data processing requirements, ensuring reliable and efficient performance. By integrating LangGraph’s persistence features, the system supports seamless error recovery, memory retention, and iterative processing essential for high-stakes applications.

本研究展示了由 LangGraph 驱动的 AI 智能体如何通过引入动态控制流、持久状态管理和人机协作工作流来改变实时数据流系统。LangGraph 在工作流中实现循环和分支的能力使流系统能够适应复杂且不断变化的数据处理需求,确保可靠高效的表现。通过集成 LangGraph 的持久化特性,系统支持无缝的错误恢复、内存保留和迭代处理,这对于高风险的应用程序至关重要。

In conjunction with Apache Spark Streaming and Kafka, Agent AI leverages LangGraph’s streaming support and debugging tools to enhance sentiment analysis workflows. This integration enables real-time decision-making, adaptive task execution, and the refinement of insights through human collaboration. Experimental validations highlight the system’s capacity to dynamically manage workflows, process large-scale data efficiently, and deliver actionable intelligence.

Agent AI 结合 Apache Spark Streaming 和 Kafka,利用 LangGraph 的流媒体支持和调试工具,增强情感分析工作流程。此集成实现了实时决策、自适应任务执行以及通过人类协作优化洞察的能力。实验验证突出了系统动态管理工作流程、高效处理大规模数据并提供可操作智能的能力。

By blending LangGraph’s advanced capabilities with Spark Streaming, this framework establishes a robust foundation for scalable, intelligent, and adaptive streaming systems, advancing the application of Agent AI in big data environments.

通过将 LangGraph 的高级能力与 Spark Streaming 相结合,该框架为可扩展、智能和自适应的流处理系统奠定了坚实基础,推动了 AI 智能体在大数据环境中的应用。

References

参考文献

  1. WEI J L. Research and Implementation of Distributed OLAP Engine for Massive Data [D]. Journal of Northeastern University, 2015.

魏佳龙. 面向海量数据的分布式OLAP引擎研究与实现 [D]. 东北大学学报, 2015.

阅读全文(20积分)