[博客翻译]Elixir Broadway的无代理系统监控


原文地址:https://opsmaru.com/blog/agent-less-system-monitoring-with-elixir-broadway


无代理系统监控:使用Elixir Broadway实现

在当今的云计算环境中,系统监控是一个不可或缺的部分。它帮助我们了解系统性能、检测问题并进行优化。然而,传统的系统监控方法往往需要在每台机器上安装代理程序,这不仅增加了维护成本,还可能引入额外的复杂性。为了解决这个问题,我们正在开发一种全新的方式——通过Opsmaru平台内置的功能来实现无代理的系统监控。


背景

Opsmaru平台已经具备了对每个集群的基本健康监控功能,但这些功能仅能提供表面级别的信息,无法深入洞察系统的具体指标(如内存、磁盘、CPU和负载等)。为了弥补这一不足,我们需要一个解决方案,能够在无需安装额外代理的情况下收集和分析这些关键数据。


架构概览

Opsmaru的核心模块之一是Uplink,这是一个由我们开发并开源的组件,负责管理集群内的所有容器编排任务。通过LXD虚拟化API,Uplink可以轻松获取到运行中的实例状态及其相关指标。

基于此,我们的思路是扩展Uplink的功能,使其能够同时处理系统监控任务。由于我们熟悉Elixir语言的Broadway库,因此利用它来构建监控管道显得顺理成章。


数据获取

LXD提供了两个主要接口用于访问系统指标:

  1. /1.0/metrics:返回关于容器CPU时间使用的详细信息。
  2. /1.0/instances:包含更多实时数据,如CPU、磁盘、内存和网络使用情况。

例如,调用LXD.list_instances可以得到类似以下的结果:

[
  %Uplink.Clients.LXD.Instance{
    state: %{
      "cpu" => %{"usage" => 136630751000},
      "disk" => %{"root" => %{"total" => 0, "usage" => 40831488}},
      "memory" => %{
        "swap_usage" => 0,
        "swap_usage_peak" => 0,
        "total" => 0,
        "usage" => 1998848,
        "usage_peak" => 0
      },
      "network" => %{
        "eth0" => %{
          "counters" => %{
            "bytes_received" => 215421737,
            "bytes_sent" => 3562213,
            "packets_received" => 53456,
            "packets_sent" => 32510
          }
        }
      }
    }
  }
]

而对于/1.0/metrics接口,则会返回以Prometheus格式编码的数据。我们可以借助prometheus_parser库将其解析为易用的结构。


时间序列数据收集

接下来,我们需要决定将收集到的数据存储在哪里。经过评估,我们选择了Elastic Stack,因为它不仅擅长处理时间序列数据,还能支持其他功能(如AIOps、搜索等)。

为了避免单次采集点导致的信息偏差,我们会定期多次采集数据,并在发送至Elastic之前进行必要的预处理。


实现Broadway生产者

Broadway生产者的职责是以固定的时间间隔收集数据,并将其传递给后续的管道组件。以下是其核心代码片段:

初始化状态

def init(opts) do
  poll_interval = Keyword.get(opts, :poll_interval, 15_000)
  state = %{
    demand: 0,
    poll_interval: poll_interval,
    previous_cpu_metrics: [],
    cpu_60_metrics: [],
    cpu_300_metrics: [],
    cpu_900_metrics: []
  }
  Process.send_after(self(), :poll, poll_interval)
  {:producer, state}
end

这里定义了一些关键变量,用于记录上一次采集的数据以及不同时间段的CPU负载值(load 1、load 5和load 15)。

处理需求

def handle_demand(demand, state) when demand <= 0 do
  {:noreply, [], state}
end

def handle_demand(demand, state) do
  if ready_to_fetch?(state) do
    {messages, state} = load_metrics(demand, state)
    {:noreply, messages, state}
  else
    {:noreply, [], state}
  end
end

ready_to_fetch?函数确保我们不会过于频繁地采集数据,从而遵守设定的轮询间隔。

数据加载与转换

defp load_metrics(demand, state) do
  metrics = Metrics.for_instances()
  # 计算变化量并生成消息...
  messages = transform_metrics(metrics, state)
  {messages, updated_state}
end

通过调用Metrics.for_instances()获取当前实例的所有指标,并结合历史数据计算增量。


转换指标

在实际发送之前,我们需要对原始数据进行一系列转换操作,以便生成符合Elasticsearch要求的文档格式。

转换逻辑

defp transform_metrics(metrics, state) do
  metrics
  |> Enum.map(fn metric ->
    # 匹配对应的上一时刻数据...
    cpu_percentage = calculate_cpu_percentage(metric, state.previous_cpu_metrics)
    network_diff = calculate_network_diff(metric, state.previous_network_metrics)
    %{metric: metric, cpu: cpu_percentage, network: network_diff}
  end)
  |> Enum.map(&transform_message/1)
end

defp transform_message(data) do
  %Broadway.Message{
    data: data,
    acknowledger: Broadway.NoopAcknowledger.init()
  }
end

每条消息最终会被封装为Broadway.Message对象,其中包含完整的指标数据。


实现Broadway管道

最后一步是将处理好的数据发送到Elasticsearch。以下是管道模块的主要逻辑:

消息处理

def handle_message(_, %Message{data: data} = message, _) do
  memory = Document.memory(data)
  cpu = Document.cpu(data)
  load = Document.load(data)
  data = %{memory: memory, cpu: cpu, load: load}
  Message.put_data(message, data)
end

批量发送

def handle_batch(_, messages, _, context) do
  documents = to_ndjson(messages)
  monitors = Pipelines.get_monitors(context)
  Logger.info("Pushing metrics at #{DateTime.utc_now()}")
  Enum.each(monitors, fn monitor -> Metrics.push!(monitor, documents) end)
  messages
end

在这里,我们将多条消息组合为NDJSON格式字符串,并通过批量API提交到Elasticsearch。


总结

通过这种方式,我们成功实现了无代理的系统监控方案,避免了在客户集群中部署额外软件带来的复杂性和风险。未来,我们还可以进一步拓展这种技术,应用于Kubernetes集群或其他云原生环境。

如果你感兴趣,不妨亲自尝试一下!只需注册一个账户并创建集群,即可体验这套强大的监控工具。点击这里开始吧!