[博客翻译]Elixir Broadway的无代理系统监控


原文地址:https://opsmaru.com/blog/agent-less-system-monitoring-with-elixir-broadway


无代理系统监控:使用Elixir Broadway实现

在当今的云计算环境中,系统监控是一个不可或缺的部分。它帮助我们了解系统性能、检测问题并进行优化。然而,传统的系统监控方法往往需要在每台机器上安装代理程序,这不仅增加了维护成本,还可能引入额外的复杂性。为了解决这个问题,我们正在开发一种全新的方式——通过Opsmaru平台内置的功能来实现无代理的系统监控。


背景

Opsmaru平台已经具备了对每个集群的基本健康监控功能,但这些功能仅能提供表面级别的信息,无法深入洞察系统的具体指标(如内存、磁盘、CPU和负载等)。为了弥补这一不足,我们需要一个解决方案,能够在无需安装额外代理的情况下收集和分析这些关键数据。


架构概览

Opsmaru的核心模块之一是Uplink,这是一个由我们开发并开源的组件,负责管理集群内的所有容器编排任务。通过LXD虚拟化API,Uplink可以轻松获取到运行中的实例状态及其相关指标。

基于此,我们的思路是扩展Uplink的功能,使其能够同时处理系统监控任务。由于我们熟悉Elixir语言的Broadway库,因此利用它来构建监控管道显得顺理成章。


数据获取

LXD提供了两个主要接口用于访问系统指标:

  1. /1.0/metrics:返回关于容器CPU时间使用的详细信息。
  2. /1.0/instances:包含更多实时数据,如CPU、磁盘、内存和网络使用情况。

例如,调用LXD.list_instances可以得到类似以下的结果:

[
  %Uplink.Clients.LXD.Instance{
    state: %{
      "cpu" => %{"usage" => 136630751000},
      "disk" => %{"root" => %{"total" => 0, "usage" => 40831488}},
      "memory" => %{
        "swap_usage" => 0,
        "swap_usage_peak" => 0,
        "total" => 0,
        "usage" => 1998848,
        "usage_peak" => 0
      },
      "network" => %{
        "eth0" => %{
          "counters" => %{
            "bytes_received" => 215421737,
            "bytes_sent" => 3562213,
            "packets_received" => 53456,
            "packets_sent" => 32510
          }
        }
      }
    }
  }
]

而对于/1.0/metrics接口,则会返回以Prometheus格式编码的数据。我们可以借助prometheus_parser库将其解析为易用的结构。


时间序列数据收集

接下来,我们需要决定将收集到的数据存储在哪里。经过评估,我们选择了Elastic Stack,因为它不仅擅长处理时间序列数据,还能支持其他功能(如AIOps、搜索等)。

为了避免单次采集点导致的信息偏差,我们会定期多次采集数据,并在发送至Elastic之前进行必要的预处理。


实现Broadway生产者

Broadway生产者的职责是以固定的时间间隔收集数据,并将其传递给后续的管道组件。以下是其核心代码片段:

初始化状态

def init(opts) do
  poll_interval = Keyword.get(opts, :poll_interval, 15_000)
  state = %{
    demand: 0,
    poll_interval: poll_interval,
    previous_cpu_metrics: [],
    cpu_60_metrics: [],
    cpu_300_metrics: [],
    cpu_900_metrics: []
  }
  Process.send_after(self(), :poll, poll_interval)
  {:producer, state}
end

这里定义了一些关键变量,用于记录上一次采集的数据以及不同时间段的CPU负载值(load 1、load 5和load 15)。

处理需求

def handle_demand(demand, state) when demand <= 0 do
  {:noreply, [], state}
end

def handle_demand(demand, state) do
  if ready_to_fetch?(state) do
    {messages, state} = load_metrics(demand, state)
    {:noreply, messages, state}
  else
    {:noreply, [], state}
  end
end

ready_to_fetch?函数确保我们不会过于频繁地采集数据,从而遵守设定的轮询间隔。

数据加载与转换

defp load_metrics(demand, state) do
  metrics = Metrics.for_instances()
  # 计算变化量并生成消息...