无代理系统监控:使用Elixir Broadway实现
在当今的云计算环境中,系统监控是一个不可或缺的部分。它帮助我们了解系统性能、检测问题并进行优化。然而,传统的系统监控方法往往需要在每台机器上安装代理程序,这不仅增加了维护成本,还可能引入额外的复杂性。为了解决这个问题,我们正在开发一种全新的方式——通过Opsmaru平台内置的功能来实现无代理的系统监控。
背景
Opsmaru平台已经具备了对每个集群的基本健康监控功能,但这些功能仅能提供表面级别的信息,无法深入洞察系统的具体指标(如内存、磁盘、CPU和负载等)。为了弥补这一不足,我们需要一个解决方案,能够在无需安装额外代理的情况下收集和分析这些关键数据。
架构概览
Opsmaru的核心模块之一是Uplink
,这是一个由我们开发并开源的组件,负责管理集群内的所有容器编排任务。通过LXD虚拟化API,Uplink可以轻松获取到运行中的实例状态及其相关指标。
基于此,我们的思路是扩展Uplink的功能,使其能够同时处理系统监控任务。由于我们熟悉Elixir语言的Broadway库,因此利用它来构建监控管道显得顺理成章。
数据获取
LXD提供了两个主要接口用于访问系统指标:
/1.0/metrics
:返回关于容器CPU时间使用的详细信息。/1.0/instances
:包含更多实时数据,如CPU、磁盘、内存和网络使用情况。
例如,调用LXD.list_instances
可以得到类似以下的结果:
[
%Uplink.Clients.LXD.Instance{
state: %{
"cpu" => %{"usage" => 136630751000},
"disk" => %{"root" => %{"total" => 0, "usage" => 40831488}},
"memory" => %{
"swap_usage" => 0,
"swap_usage_peak" => 0,
"total" => 0,
"usage" => 1998848,
"usage_peak" => 0
},
"network" => %{
"eth0" => %{
"counters" => %{
"bytes_received" => 215421737,
"bytes_sent" => 3562213,
"packets_received" => 53456,
"packets_sent" => 32510
}
}
}
}
}
]
而对于/1.0/metrics
接口,则会返回以Prometheus格式编码的数据。我们可以借助prometheus_parser
库将其解析为易用的结构。
时间序列数据收集
接下来,我们需要决定将收集到的数据存储在哪里。经过评估,我们选择了Elastic Stack,因为它不仅擅长处理时间序列数据,还能支持其他功能(如AIOps、搜索等)。
为了避免单次采集点导致的信息偏差,我们会定期多次采集数据,并在发送至Elastic之前进行必要的预处理。
实现Broadway生产者
Broadway生产者的职责是以固定的时间间隔收集数据,并将其传递给后续的管道组件。以下是其核心代码片段:
初始化状态
def init(opts) do
poll_interval = Keyword.get(opts, :poll_interval, 15_000)
state = %{
demand: 0,
poll_interval: poll_interval,
previous_cpu_metrics: [],
cpu_60_metrics: [],
cpu_300_metrics: [],
cpu_900_metrics: []
}
Process.send_after(self(), :poll, poll_interval)
{:producer, state}
end
这里定义了一些关键变量,用于记录上一次采集的数据以及不同时间段的CPU负载值(load 1、load 5和load 15)。
处理需求
def handle_demand(demand, state) when demand <= 0 do
{:noreply, [], state}
end
def handle_demand(demand, state) do
if ready_to_fetch?(state) do
{messages, state} = load_metrics(demand, state)
{:noreply, messages, state}
else
{:noreply, [], state}
end
end
ready_to_fetch?
函数确保我们不会过于频繁地采集数据,从而遵守设定的轮询间隔。
数据加载与转换
defp load_metrics(demand, state) do
metrics = Metrics.for_instances()
# 计算变化量并生成消息...