Vector Aggregate
官网地址:https://vector.dev/
聚合文档地址:https://vector.dev/docs/reference/configuration/transforms/aggregate/
解释:
[transforms.my_transform_id]
type = "aggregate" # 本次的transforms类型
inputs = [ "my-source-or-transform-id" ] // 输入类型
interval_ms = 10_000 // 聚合时间
如果单独使用,看不到聚合效果,并且vector的聚合后会得到Metrics事件日志。如需其他日志类型,需要自己在TOML内写转换逻辑
Metrics:https://vector.dev/docs/reference/configuration/transforms/aggregate/#telemetry-metrics
官网例子(可以参考):
前提:你的日志为Metrics类型格式
[{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:44.223543Z"}},{"metric":{"counter":{"value":2.2},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"different.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":22.33},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:47.223543Z"}},{"metric":{"counter":{"value":44.55},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}}]
通过聚合函数后:
[sources.my_sources_id]
type = "file" # 其他均可,参考官方的sources
include = ["D:\\az\\Vector\\json\\srcInfo.json"] # 地址需要使用 \\ 转义
read_from = "beginning"
[transforms.my_transform_id]
type = "aggregate"
inputs = [ "my_sources_id" ]
interval_ms = 5_000
[sinks.my_sink_id]
type = "console"
input = ["my_transform_id"]
encoding.codec = "json"
得到的结果:
[{"metric":{"counter":{"value":3.3},"kind":"incremental","name":"counter.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":1.1},"kind":"incremental","name":"counter.1","tags":{"host":"different.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}},{"metric":{"counter":{"value":44.55},"kind":"absolute","name":"gauge.1","tags":{"host":"my.host.com"},"timestamp":"2021-07-12T07:58:45.223543Z"}}]
如果是其他类型的日志就不能使用以上的方式,这里提供一个例子:
data_dir = "D:\\az\\Vector\\json" # 定义的工作目录,但是没有用上
[log_schema]
timestamp_key = "@appname" # 聚合的主键,大多数采用时间
[sources.file_input]
type = "file" # required
include = ["D:\\az\\Vector\\json\\srcInfo.json"] # required
read_from = "beginning"
[transforms.parse_log_json]
type = "remap"
inputs = ["file_input"]
source = '''
. = parse_json!(string!(.message)) # 将消息转为json
'''
[transforms.log_to_met]
type = "log_to_metric" # 日志转为 metric 类型
inputs = [ "parse_log_json" ]
[[transforms.log_to_met.metrics]]
type = "counter"
field = "procid" # 必须指定一个字段,生成后是name = "procid"
#procid = "procid"
#name = "procid"
[transforms.log_to_met.metrics.tags]
tags = "{{.}}" # 将所有属性放入tags对象里面 如果没有这行,下面几行注释需要放开
#procid = "{{procid}}"
#appname = "{{appname}}"
#facility = "{{facility}}"
#hostname = "{{hostname}}"
#message = "{{message}}"
#msgid = "{{msgid}}"
#severity = "{{severity}}"
#version = "{{version}}"
[transforms.my_transform_agg]
type = "aggregate"
inputs = [ "log_to_met" ]
interval_ms = 2000
# 这行主要是做数据结果的处理 参数 vrl函数
[transforms.my_transform_convert]
type = "remap"
inputs = [ "my_transform_agg" ]
source = '''
appname= del(.tags.appname)
facility = del(.tags.facility)
hostname = del(.tags.hostname)
message = del(.tags.message)
msgid = del(.tags.msgid)
'''
[sinks.console]
type = "console"
inputs = [ "my_transform_convert" ]
target = "stdout"
[sinks.console.encoding]
codec = "json"