万字详解!搜狐智能媒体基于 Zipkin 和 StarRocks 的微服务链路追踪实践

本文发表于: &{ new Date(1648396800000).toLocaleDateString() }

作者:翟东波 叶书俊

在微服务体系架构下,搜狐智能媒体使用 Zipkin 进行服务链路追踪(Tracing)的埋点采集,将采集的 Trace 信息存储到 StarRocks 中。通过 StarRocks 强大的 SQL 计算能力,对 Tracing 信息进行多维度的统计、分析等操作,提升了微服务监控能力,从简单统计的 Monitoring 上升到更多维度探索分析的 Observability。

全文主要分为三个部分:第一节主要介绍微服务下的常用监控方式,其中链路追踪技术,可以串联整个服务调用链路,获得整体服务的关键信息,对微服务的监控有非常重要的意义。第二节主要介绍搜狐智能媒体是如何构建链路追踪分析体系的,主要包括 Zipkin 的数据采集,StarRocks 的数据存储,以及根据应用场景对 StarRocks 进行分析计算等三个部分。第三节主要介绍搜狐智能媒体通过引入 Zipkin 和 StarRocks 进行链路追踪分析取得的一些实践效果。
 

#01 微服务架构中的链路追踪

近年来,企业 IT 应用架构逐步向微服务、云原生等分布式应用架构演进,在搜狐智能媒体内部,应用服务按照微服务、Docker、Kubernetes、Spring Cloud 等架构思想和技术方案进行研发运维,提升部门整体工程效率

微服务架构提升工程效率的同时,也带来了一些新的问题。微服务是一个分布式架构,它按业务划分服务单元,用户的每次请求不再是由某一个服务独立完成了,而是变成了多个服务一起配合完成。这些服务可能是由不同的团队、使用不同的编程语言实现,可能布在了不同的服务器、甚至不同的数据中心。如果用户请求出现了错误和异常,微服务分布式调用的特性决定了这些故障难以定位,相对于传统的单体架构,微服务监控面临着新的难题。

Logging、Metrics、Tracing

微服务监控可以包含很多方式,按照监测的数据类型主要划分为 Logging、Metrics 和Tracing 三大领域:

Logging

用户主动记录的离散事件,记录的信息一般是非结构化的文本内容,在用户进行问题分析判断时可以提供更为详尽的线索。

Metrics

具有聚合属性的采集数据,旨在为用户展示某个指标在某个时段的运行状态,用于查看一些指标和趋势。

Tracing

记录一次请求调用的生命周期全过程,其中包括服务调用和处理时长等信息,含有请求上下文环境,由一个全局唯一的 Trace ID 来进行标识和串联整个调用链路,非常适合微服务架构的监控场景。

图1

三者的关系如上图所示,这三者之间也是有重叠的,比如 Logging 可以聚合相关字段生成 Metrics 信息,关联相关字段生成 Tracing 信息;Tracing 可以聚合查询次数生成 Metrics 信息,可以记录业务日志生成 Logging 信息。一般情况下要在 Metrics 和 Logging 中增加字段串联微服务请求调用生命周期比较困难,通过 Tracing 获取 Metrics 和 Logging 则相对容易很多。

另外,这三者对存储资源有着不同的需求,Metrics 是天然的压缩数据,最节省资源;Logging 倾向于无限增加的,甚至会超出预期的容量;Tracing 的存储容量,一般介于 Metrics 和 Logging 两者之间,另外还可通过采样率进一步控制容量需求。

从 Monitoring 到 Observability

Monitoring tells you whether the system works. Observability lets you ask why it's not working. 

– Baron Schwarz微服务监控从数据分析层次,可以简单分为 Monitoring 和 Observability。

Monitoring

告诉你系统是否在工作,对已知场景的预定义计算,对各种监控问题的事前假设。对应上图 Known Knowns 和 Known Unknowns,都是事先假设可能会发生的事件,包括已经明白和不明白的事件。

Observability

可以让你询问系统为什么不工作,对未知场景的探索式分析,对任意监控问题的事后分析。对应上图 Unknown Knowns 和 Unknown Unknowns,都是事未察觉可能会发生的事件,包括已经明白和不明白的事件。

很显然,通过预先假设所有可能发生事件进行 Monitoring 的方式,已经不能满足微服务复杂的监控场景,我们需要能够提供探索式分析的 Observability 监控方式。在 Logging、Metrics 和 Tracing,Tracing 是目前能提供多维度监控分析能力的最有效方式。

 

Tracing

链路追踪 Tracing Analysis 为分布式应用的开发者提供了完整的调用链路还原、调用请求量统计、链路拓扑、应用依赖分析等工具,可以帮助开发者快速分析和诊断分布式应用架构下的性能瓶颈,提高微服务时代下的开发诊断效率。

Tracing 可以串联微服务中分布式请求的调用链路,在微服务监控体系中有着重要的作用。另外,Tracing 介于 Metrics 和 Logging 之间,既可以完成 Monitoring 的工作,也可以进行 Observability 的分析,提升监控体系建设效率。

系统模型

链路追踪(Tracing)系统,需要记录一次特定请求经过的上下游服务调用链路,以及各服务所完成的相关工作信息。如下图所示的微服务系统,用户向服务 A 发起一个请求,服务 A 会生成一个全局唯一的 Trace ID,服务 A 内部 Messaging 方式调用相关处理模块(比如跨线程异步调用等),服务 A 模块再通过 RPC 方式并行调用服务 B 和服务 C;服务 B 会即刻返回响应,但服务 C 会采用串行方式,先用 RPC 调用服务 D,再用 RPC 调用服务 E,然后再响应服务 A 的调用请求;服务 A 在内部两个模块调用处理完后,会响应最初的用户请求。最开始生成的 Trace ID 会在这一系列的服务内部或服务之间的请求调用中传递,从而将这些请求调用连接起来。另外,Tracing 系统还会记录每一个请求调用处理的 Timestamp、服务名等等相关信息。

注:服务内部串行调用对系统性能有影响,一般采用并行调用方式,后续章节将只考虑并行调用场景。

在 Tracing 系统中,主要包含 Trace 和 Span 两个基础概念,下图展示了一个由 Span 构成的 Trace。

  • Trace 指一个外部请求经过的所有服务的调用链路,可以理解为一个有服务调用组成的树状结构,每条链路都有一个全局唯一的 ID 来标识。
  • Span 指服务内部或服务之间的一次调用,即 Trace 树中的节点,如下图所示的由 Span 构成的 Trace 树,树中的 Span 节点之间存在父子关系。Span 主要包含 Span名称、Span ID、父 ID,以及 Timestamp、Dration(包含子节点调用处理的 duration)、业务数据等其他 log 信息。

Span 根据调用方式可以分为 RPC Span 和 Messaging Span:

RPC Span

由 RPC Tracing 生成,分为 Client 和 Server 两类 Span,分别由 RPC 服务调用的 Client 节点和 Server 节点记录生成,两者共享 Span ID、Parent Span ID 等信息,但要注意,这两个 Span 记录的时间是有偏差,这个偏差是服务间的调用开销,一般是由网络传输开销、代理服务或服务接口消息排队等情况引起的。

Messaging Span

由 Messaging Tracing 生成,一般用于 Tracing 服务内部调用,不同于 RPC Span,Messaging Span 之间不会共享 Span ID 等信息。

应用场景

根据 Tracing 的系统模型,可获得服务响应等各类 Metric 信息,用于 Alerting、DashBoard 查询等;也可根据 Span 组成的链路,分析单个或整体服务情况,发现服务性能瓶颈、网络传输开销、服务内异步调用设计等各种问题。如下图所示,相比于 Metrics 和 Logging,Tracing 可以同时涵盖监控的 Monitoring 和 Observability 场景,在监控体系中占据重要位置,Opentracing、Opencensus、Opentelemetry 等协会和组织都包含对 Tracing 的支持。

从微服务的角度,Tracing 记录的 Span 信息可以进行各种维度的统计和分析。下图基于 HTTP API 设计的微服务系统为例,用户查询 Service1的 /1/api 接口,Service1 再请求 Service2 的 /2/api,Service2 内部异步并发调用 msg2.1 和 msg2.2msg2.1 请求 Service3的 /3/api接口,msg2.2 请求 Service4 的 /4/api接口,Service3 内部调用 msg3Service4 再请求 Service5 的 /5/api,其中 Service5 没有进行 Tracing 埋点,无法采集 Service5 的信息。

图 6

针对上图的微服务系统,可以进行如下两大类的统计分析操作:

服务内分析

关注单个服务运行情况,比如对外服务接口和上游接口查询的性能指标等,分析场景主要有:

1、上游服务请求

如 Service1 提供的 /1/api ,Service4 提供的 /4/api等,统计获得次数、QPS、耗时百分位数、出错率、超时率等等 metric 信息。

2、下游服务响应

如 Service1 请求的 /2/api 、Service4 请求的 /5/api等,统计查询次数、QPS、耗时百分位数、出错率、超时率等等 Metric 信息。

3、服务内部处理

服务对外接口在内部可能会被分拆为多个 Span,可以按照 Span Name 进行分组聚合统计,发现耗时最长的 Span 等,如 Service2 接口 /2/api ,接口服务内部 Span 包括 /2/api 的 Server Span,call2.1 对应的 Span 和 call2.2 对应的 Span,通过 Span 之间的依赖关系可以算出这些 Span 自身的耗时 Duraion,进行各类统计分析。

服务间分析

在进行微服务整体分析时,我们将单个服务看作黑盒,关注服务间的依赖、调用链路上的服务热点等,分析场景主要有:

1、服务拓扑统计

可以根据服务间调用的 Client Span 和 Server Span,获得整个服务系统的拓扑结构,以及服务之间调用请求次数、Duration 等统计信息。

2、调用链路性能瓶颈分析

分析某个对外请求接口的调用链路上的性能瓶颈,这个瓶颈可能是某个服务内部处理开销造成的,也可能是某两个服务间的网络调用开销等等原因造成的。

对于一次调用涉及到数十个以上微服务的复杂调用请求,每次出现的性能瓶颈很可能都会不一样,此时就需要进行聚合统计,算出性能瓶颈出现频次的排名,分析出针对性能瓶颈热点的服务或服务间调用。

以上仅仅是列举的部分分析场景,Tracing 提供的信息其实可以支持更多的 Metric 统计和探索式分析场景,本文不再一一例举。

 

#02 基于 Zipkin 和 StarRocks 构建链路追踪分析系统

链路追踪系统主要分为数据采集、数据存储和分析计算三大部分,目前使用最广泛的开源链路追踪系统是 Zipkin,它主要包括数据采集和分析计算两大部分,底层的存储依赖其他存储系统。搜狐智能媒体在构建链路追踪系统时,最初采用 Zipkin + ElasticSearch 得方式进行构建,后增加 StarRocks 作为底层存储系统,并基于 StarRocks 进行分析统计,系统总体架构如下图。

 

数据采集

Zipkin 支持客户端全自动埋点,只需将相关库引入应用程序中并简单配置,就可以实现 Span 信息自动生成,Span 信息通过 HTTP 或 Kafka 等方式自动进行上传。Zipkin 目前提供了绝大部分语言的埋点采集库,如 Java 语言的 Spring Cloud 提供了 Sleuth 与 Zipkin 进行深度绑定,对开发人员基本做到透明使用。为了解决存储空间,在使用时一般要设置 1/100 左右的采样率,Dapper 的论文中提到即便是 1/1000 的采样率,对于跟踪数据的通用使用层面上,也可以提供足够多的信息。

数据模型

对应 图 6,下面给出了 Zipkin Span 埋点采集示意图 (图 8),具体流程如下:

图 8 
  1. 用户发送给 Service1 的 Request 中,不含有 Trace 和 Span 信息,Service1 会创建一个 Server Span,随机生成全局唯一的 TraceID(如图中的 X)和 SpanId(如图中的 A,此处的 和 会使用相同的值),记录 Timestamp 等信息;Service1 在给用户返回 Response 时,Service1 会统计 Server Span 的处理耗时 Duration,会将包含 TraceID、SpanID、Timestamp、Duration 等信息的 Server Span 完整信息进行上报。
  2. Service1 向 Service2 发送的请求,会创建一个 Client Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 B),记录 Timestamp 等信息,同时 Service1 会将 Trace ID(X)和 SpanID(B)传递给 Service2(如在 HTTP 协议的 HEADER 中添加 TraceID 和 SpanID 等相关字段);Service1 在收到 Service2 的响应后,Service1 会处理 Client Span 相关信息,并将 Client Span 进行上报
  3. Service2 收到 Service1 的 Request 中,包含 Trace(X)和 Span(B)等信息,Service2 会创建一个 Server Span,使用 X 作为 Trace ID,B 作为 SpanID,内部调用msg2.1 和 msg2.2 同时,将 Trace ID(X)和 SpanID(B)传递给它们;Service2 在收到 msg2.1 和 msg2.2 的返回后,Service1 会处理 Server Span 相关信息,并将此 Server Span 进行上报
  4. Service2 的 msg2.1 和 msg2.2 会分别创建一个 Messaging Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 和 F),记录 Timestamp 等信息,分别向 Service3 和 Service4 发送请求;msg2.1 和 msg2.2 收到响应后,会分别处理 Messaging Span 相关信息,并将两个 Messaging Span 进行上报
  5. Service2 向 Service3 和 Service4 发送的请求,会各创建一个 Client Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 和 G),记录 Timestamp 等信息,同时 Service2 会将 Trace ID(X)和 SpanID(或 G)传递给 Service3 和 Service4Service12 在收到 Service3 和 Service3 的响应后,Service2 会分别处理 Client Span 相关信息,并将两个 Client Span 进行上报
  6. Service3 收到 Service2 的Request中,包含 Trace(X)和Span(D)等信息,Service3 会创建一个 Server Span,使用 作为 Trace ID,作为 SpanID,内部调用 msg3Service3 在收到 msg3 的返回后,Service3 会处理此 Server Span 相关信息,并将此 Server Span 进行上报
  7. Service3 的 msg3 会分别创建一个 Messaging Span,使用 X 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 E),记录 Timestamp 等信息,msg3 处理完成后,处理此 Messaging Span 相关信息,并将此 Messaging Span 进行上报
  8. Service4 收到 Service2 的 Request 中,包含 Trace(X)和 Span(G)等信息,Service4 会创建一个 Server Span,使用 X 作为 Trace ID,作为 SpanID,再向 Service5 发送请求;Service4 在收到 Service5 的响应后,Service4 会处理此 Server Span 相关信息,并将此 Server Span 进行上报
  9. Service4 向 Service5 发送的请求,会创建一个 Client Span,使用 作为 Trace ID,随机生成全局唯一的 SpanID(如图中的 H),记录 Timestamp 等信息,同时 Service4 会将 Trace ID(X)和 SpanID(H)传递给 Service5Service4 在收到 Service5 的响应后,Service4 会处理 Client Span 相关信息,并将此 Client Span 进行上报

上面整个 Trace X 调用链路会生成的 Span 记录如下图,每个 Span 主要会记录 Span Id、Parent Id、Kind(CLIENT 表示 RPC CLIENT 端 Span,SERVER 表示 RPC SERVER 端 SPAN,NULL 表示 Messaging SPAN),SN(Service Name),还会包含 Trace ID,时间戳、Duration 等信息。Service5 没有进行 Zipkin 埋点采集,因此不会有 Service5 的 Span 记录。

数据格式

设置了 Zipkin 埋点的应用服务,默认会使用 Json 格式向 Kafka 上报 Span 信息,上报的信息主要有如下几个注意点:

  1. 每个应用服务每次会上报一组 Span,组成一个 Json 数组上报
  2. Json 数组里包含不同 Trace的Span,即不是所有的 Trace ID都 相同
  3. 不同形式的接口(如 Http、Grpc、Dubbo 等),除了主要字段相同外,在 tags 中会各自记录一些不同的字段
[
  {
    "traceId": "3112dd04c3112036",
    "id": "3112dd04c3112036",
    "kind": "SERVER",
    "name": "get /2/api",
    "timestamp": 1618480662355011,
    "duration": 12769,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "remoteEndpoint": {
      "ipv4": "111.25.140.166",
      "port": 50214
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/2/api",
      "mvc.controller.class": "Controller",
      "mvc.controller.method": "get2Api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "b4bd9859c690160a",
    "name": "msg2.1",
    "timestamp": 1618480662357211,
    "duration": 11069,
    "localEndpoint": {
      "serviceName": "SERVICE2"
    },
    "tags": {
      "class": "MSG",
      "method": "msg2.1"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "3112dd04c3112036",
    "id": "c31d9859c69a2b21",
    "name": "msg2.2",
    "timestamp": 1618480662357201,
    "duration": 10768,
    "localEndpoint": {
      "serviceName": "SERVICE2"
    },
    "tags": {
      "class": "MSG",
      "method": "msg2.2"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "b4bd9859c690160a",
    "id": "f1659c981c0f4744",
    "kind": "CLIENT",
    "name": "get /3/api",
    "timestamp": 1618480662358201,
    "duration": 9206,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/3/api"
    }
  },
  {
    "traceId": "3112dd04c3112036",
    "parentId": "c31d9859c69a2b21",
    "id": "73cd1cab1d72a971",
    "kind": "CLIENT",
    "name": "get /4/api",
    "timestamp": 1618480662358211,
    "duration": 9349,
    "localEndpoint": {
      "serviceName": "SERVICE2",
      "ipv4": "172.24.132.32"
    },
    "tags": {
      "http.method": "GET",
      "http.path": "/4/api"
    }
  }
]

数据存储

Zipkin 支持 MySQL、Cassandra 和 ElasticSearch 三种数据存储,这三者都存在各自的缺点:

  • MySQL:采集的 Tracing 信息基本都在每天上亿行甚至百亿行以上,MySQL 无法支撑这么大数据量。
  • Cassandra:能支持对单个 Trace 的 Span 信息分析,但对聚合查询等数据统计分析场景支持不好
  • ElasticSearch:能支持单个 Trace 的分析和简单的聚合查询分析,但对于一些较复杂的数据分析计算不能很好的支持,比如涉及到 Join、窗口函数等等的计算需求,尤其是任务间依赖计算,Zipkin 目前还不能实时计算,需要通过离线跑 Spark 任务计算任务间依赖信息

我们在实践中也是首先使用 ElasticSearch,发现了上面提到的问题,比如 Zipkin 的服务依赖拓扑必须使用离线方式计算,便新增了 StarRocks 作为底层数据存储。将 Zipkin 的 trace 数据导入到StarRocks很方便,基本步骤只需要两步,CREATE TABLE + CREATE ROUTINE LOAD。

另外,在调用链路性能瓶颈分析场景中,要将单个服务看作黑盒,只关注 RPC SPAN,屏蔽掉服务内部的 Messaging Span,使用了 Flink 对服务内部 span 进行 ParentID 溯源,即从 RPC Client SPAN,一直追溯到同一服务同一 Trace ID 的 RPC Server SPAN,用 RPC Server SPAN 的 ID 替换 RPC Client SPAN 的parentId,最后通过Flink-Connector-StarRocks将转换后的数据实时写入StarRocks。

基于 StarRocks 的数据存储架构流程如下图所示。

CREATE TABLE

建表语句示例参考如下,有如下几点注意点:

  • 包括 Zipkin 和 zipkin_trace_perf 两张表,zipkin_trace_perf 表只用于调用链路性能瓶颈分析场景,其他统计分析都适用 Zipkin 表
  • 通过采集信息中的 Timestamp 字段,生成 dt、hr、min 时间字段,便于后续统计分析
  • 采用 DUPLICATE 模型、Bitmap 索引等设置,加快查询速度
  • Zipkin 表使用id作为分桶字段,在查询服务拓扑时,查询计划会优化为 Colocate Join,提升查询性能。

Zipkin

CREATE TABLE `zipkin` (
  `traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "Span ID",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `localEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_ipv4` varchar(16) NULL COMMENT "",
  `remoteEndpoint_port` varchar(16) NULL COMMENT "",
  `shared` int(11) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT "",
  `error_msg` varchar(1024) NULL COMMENT "",
  `tags_http_path` varchar(2048) NULL COMMENT "",
  `tags_http_method` varchar(1024) NULL COMMENT "",
  `tags_controller_class` varchar(100) NULL COMMENT "",
  `tags_controller_method` varchar(1024) NULL COMMENT "",
  INDEX service_name_idx (`localEndpoint_serviceName`) USING BITMAP COMMENT ''
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `parentId`, `id`, `timestamp`, `localEndpoint_serviceName`, `dt`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`id`) BUCKETS 100 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "100",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

zipkin_trace_perf

CREATE TABLE `zipkin_trace_perf` (
  `traceId` varchar(24) NULL COMMENT "",
  `id` varchar(24) NULL COMMENT "",
  `dt` int(11) NULL COMMENT "",
  `parentId` varchar(24) NULL COMMENT "",
  `localEndpoint_serviceName` varchar(512) NULL COMMENT "",
  `timestamp` bigint(20) NULL COMMENT "",
  `hr` int(11) NULL COMMENT "",
  `min` bigint(20) NULL COMMENT "",
  `kind` varchar(16) NULL COMMENT "",
  `duration` int(11) NULL COMMENT "",
  `name` varchar(300) NULL COMMENT "",
  `tag_error` int(11) NULL DEFAULT "0" COMMENT ""
) ENGINE=OLAP 
DUPLICATE KEY(`traceId`, `id`, `dt`, `parentId`, `localEndpoint_serviceName`)
COMMENT "OLAP"
PARTITION BY RANGE(`dt`)
(PARTITION p20220104 VALUES [("20220104"), ("20220105")),
 PARTITION p20220105 VALUES [("20220105"), ("20220106")))
DISTRIBUTED BY HASH(`traceId`) BUCKETS 32 
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-60",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "12",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);

ROUTINE LOAD

ROUTINE LOAD 创建语句示例如下:

CREATE ROUTINE LOAD zipkin_routine_load ON zipkin COLUMNS(
  id,
  kind,
  localEndpoint_serviceName,
  traceId,
  `name`,
  `timestamp`,
  `duration`,
  `localEndpoint_ipv4`,
  `remoteEndpoint_ipv4`,
  `remoteEndpoint_port`,
  `shared`,
  `parentId`,
  `tags_http_path`,
  `tags_http_method`,
  `tags_controller_class`,
  `tags_controller_method`,
  tmp_tag_error,
  tag_error = if(`tmp_tag_error` IS NULL, 0, 1),
  error_msg = tmp_tag_error,
  dt = from_unixtime(`timestamp` / 1000000, '%Y%m%d'),
  hr = from_unixtime(`timestamp` / 1000000, '%H'),
  `min` = from_unixtime(`timestamp` / 1000000, '%i')
) PROPERTIES (
  "desired_concurrent_number" = "3",
  "max_batch_interval" = "50",
  "max_batch_rows" = "300000",
  "max_batch_size" = "209715200",
  "max_error_number" = "1000000",
  "strict_mode" = "false",
  "format" = "json",
  "strip_outer_array" = "true",
  "jsonpaths" = "[\"$.id\",\"$.kind\",\"$.localEndpoint.serviceName\",\"$.traceId\",\"$.name\",\"$.timestamp\",\"$.duration\",\"$.localEndpoint.ipv4\",\"$.remoteEndpoint.ipv4\",\"$.remoteEndpoint.port\",\"$.shared\",\"$.parentId\",\"$.tags.\\\"http.path\\\"\",\"$.tags.\\\"http.method\\\"\",\"$.tags.\\\"mvc.controller.class\\\"\",\"$.tags.\\\"mvc.controller.method\\\"\",\"$.tags.error\"]"
)
FROM
  KAFKA (
    "kafka_broker_list" = "IP1:PORT1,IP2:PORT2,IP3:PORT3",
    "kafka_topic" = "XXXXXXXXX"
  );

Flink 溯源 Parent ID

针对调用链路性能瓶颈分析场景中,使用 Flink 进行 Parent ID 溯源,代码示例如下:

env
  // 添加kafka数据源
  .addSource(getKafkaSource())
  // 将采集到的Json字符串转换为JSONArray,
  // 这个JSONArray是从单个服务采集的信息,里面会包含多个Trace的Span信息
  .map(JSON.parseArray(_))
  // 将JSONArray转换为JSONObject,每个JSONObejct就是一个Span
  .flatMap(_.asScala.map(_.asInstanceOf[JSONObject]))
  // 将Span的JSONObject对象转换为Bean对象
  .map(jsonToBean(_))
  // 以traceID+localEndpoint_serviceName作为key对span进行分区生成keyed stream
  .keyBy(span => keyOfTrace(span))
  // 使用会话窗口,将同一个Trace的不同服务上的所有Span,分发到同一个固定间隔的processing-time窗口
  // 这里为了实现简单,使用了processing-time session窗口,后续我们会使用starrocks的UDAF函数进行优化,去掉对Flink的依赖
  .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  // 使用Aggregate窗口函数
  .aggregate(new TraceAggregateFunction)
  // 将经过溯源的span集合展开,便于调用flink-connector-starrocks
  .flatMap(spans => spans)
  // 使用flink-connector-starrocks sink,将数据写入starrocks中
  .addSink(
    StarRocksSink.sink(
      StarRocksSinkOptions.builder().withProperty("XXX", "XXX").build()))

分析计算

以 图 6 作为一个微服务系统用例,给出各个统计分析场景对应的 StarRocks SQL 语句。

服务内分析

上游服务请求指标统计

下面的 SQL 使用 Zipkin 表数据,计算服务 Service2 请求上游服务 Service3 和上游服务 Service4 的查询统计信息,按小时和接口分组统计查询指标

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'CLIENT'
      and dt = 20220105
    group by
      hr,
      name
  ) tmp
order by
  hr

下游服务响应指标统计

下面的 SQL 使用 Zipkin 表数据,计算服务 Service2 响应下游服务 Service1 的查询统计信息,按小时和接口分组统计查询指标。

select
  hr,
  name,
  req_count,
  timeout / req_count * 100 as timeout_rate,
  error_count / req_count * 100 as error_rate,
  avg_duration,
  tp95,
  tp99
from
  (
    select
      hr,
      name,
      count(1) as req_count,
      AVG(duration) / 1000 as avg_duration,
      sum(if(duration > 200000, 1, 0)) as timeout,
      sum(tag_error) as error_count,
      percentile_approx(duration, 0.95) / 1000 AS tp95,
      percentile_approx(duration, 0.99) / 1000 AS tp99
    from
      zipkin
    where
      localEndpoint_serviceName = 'Service2'
      and kind = 'SERVER'
      and dt = 20220105
    group by
      hr, 
      name
  ) tmp
order by
  hr

服务内部处理分析

下面的 SQL 使用 Zipkin 表数据,查询服务 Service2 的接口 /2/api,按 Span Name 分组统计 Duration 等信息。

with 
spans as (
  select * from zipkin where dt = 20220105 and localEndpoint_serviceName = "Service2"
),
api_spans as (
  select
    spans.id as id,
    spans.parentId as parentId,
    spans.name as name,
    spans.duration as duration
  from
    spans
    inner JOIN 
    (select * from spans where kind = "SERVER" and name = "/2/api") tmp 
    on spans.traceId = tmp.traceId
)
SELECT
  name,
  AVG(inner_duration) / 1000 as avg_duration,
  percentile_approx(inner_duration, 0.95) / 1000 AS tp95,
  percentile_approx(inner_duration, 0.99) / 1000 AS tp99
from
  (
    select
      l.name as name,
      (l.duration - ifnull(r.duration, 0)) as inner_duration
    from
      api_spans l
      left JOIN 
      api_spans r 
      on l.parentId = r.id
  ) tmp
GROUP BY
  name

服务间分析

服务拓扑统计

下面的 SQL 使用 Zipkin 表数据,计算服务间的拓扑关系,以及服务间接口 Duration 的统计信息。

with tbl as (select * from zipkin where dt = 20220105)
select 
  client, 
  server, 
  name,
  AVG(duration) / 1000 as avg_duration,
  percentile_approx(duration, 0.95) / 1000 AS tp95,
  percentile_approx(duration, 0.99) / 1000 AS tp99
from
  (
    select
      c.localEndpoint_serviceName as client,
      s.localEndpoint_serviceName as server,
      c.name as name,
      c.duration as duration
    from
    (select * from tbl where kind = "CLIENT") c
    left JOIN 
    (select * from tbl where kind = "SERVER") s 
    on c.id = s.id and c.traceId = s.traceId
  ) as tmp
group by 
  client,  
  server,
  name

调用链路性能瓶颈分析

下面的 SQL 使用 zipkin_trace_perf 表数据,针对某个服务接口响应超时的查询请求,统计出每次请求的调用链路中处理耗时最长的服务或服务间调用,进而分析出性能热点是在某个服务或服务间调用。

select
  service,
  ROUND(count(1) * 100 / sum(count(1)) over(), 2) as percent
from
  (
    select
      traceId,
      service,
      duration,
      ROW_NUMBER() over(partition by traceId order by duration desc) as rank4
    from
      (
        with tbl as (
          SELECT
            l.traceId as traceId,
            l.id as id,
            l.parentId as parentId,
            l.kind as kind,
            l.duration as duration,
            l.localEndpoint_serviceName as localEndpoint_serviceName
          FROM
            zipkin_trace_perf l
            INNER JOIN 
            zipkin_trace_perf r 
            on l.traceId = r.traceId
              and l.dt = 20220105
              and r.dt = 20220105
              and r.tag_error = 0     -- 过滤掉出错的trace
              and r.localEndpoint_serviceName = "Service1"
              and r.name = "/1/api"
              and r.kind = "SERVER"
              and r.duration > 200000  -- 过滤掉未超时的trace
        )
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (c_duration - s_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (c_duration - s_duration) desc) as rank2
            from
              (
                select
                  c.traceId as traceId,
                  c.id as id,
                  concat(c.localEndpoint_serviceName, "=>", ifnull(s.localEndpoint_serviceName, "?")) as service,
                  c.duration as c_duration,
                  ifnull(s.duration, 0) as s_duration
                from
                  (select * from tbl where kind = "CLIENT") c
                  left JOIN 
                  (select * from tbl where kind = "SERVER") s 
                  on c.id = s.id and c.traceId = s.traceId
              ) tmp1
          ) tmp2
        where
          rank2 = 1
        union ALL
        select
          traceId,
          id,
          service,
          duration
        from
          (
            select
              traceId,
              id,
              service,
              (s_duration - c_duration) as duration,
              ROW_NUMBER() over(partition by traceId order by (s_duration - c_duration) desc) as rank2
            from
              (
                select
                  s.traceId as traceId,
                  s.id as id,
                  s.localEndpoint_serviceName as service,
                  s.duration as s_duration,
                  ifnull(c.duration, 0) as c_duration,
                  ROW_NUMBER() over(partition by s.traceId, s.id order by ifnull(c.duration, 0) desc) as rank
                from
                  (select * from tbl where kind = "SERVER") s
                  left JOIN 
                  (select * from tbl where kind = "CLIENT") c 
                  on s.id = c.parentId and s.traceId = c.traceId
              ) tmp1
            where
              rank = 1
          ) tmp2
        where
          rank2 = 1
      ) tmp3
  ) tmp4
where
  rank4 = 1
GROUP BY
  service
order by
  percent desc

SQL 查询的结果如下图所示,在超时的 Trace 请求中,性能瓶颈服务或服务间调用的比例分布。

 

#03 实践效果

目前搜狐智能媒体已在 30+ 个服务中接入 Zipkin,涵盖上百个线上服务实例,1%  的采样率每天产生近 10亿 多行的日志。通过 Zipkin Server 查询 StarRocks,获取的 Trace 信息如下图所示:

通过 Zipkin Server 查询 StarRocks,获取的服务拓扑信息如下图所示:

基于 Zipkin  StarRocks 的链路追踪体系实践过程中,明显提升了微服务监控分析能力和工程效率:

提升微服务监控分析能力

  • 在监控报警方面,可以基于 StarRocks 查询统计线上服务当前时刻的响应延迟百分位数、错误率等指标,根据这些指标及时产生各类告警;
  • 在指标统计方面,可以基于 StarRocks 按天、小时、分钟等粒度统计服务响应延迟的各项指标,更好的了解服务运行状况;
  • 在故障分析方面,基于 StarRocks 强大的 SQL 计算能力,可以进行服务、时间、接口等多个维度的探索式分析查询,定位故障原因。

提升微服务监控工程效率

Metric 和 Logging 数据采集,很多需要用户手动埋点和安装各种采集器 Agent,数据采集后存储到 ElasticSearch 等存储系统,每上一个业务,这些流程都要操作一遍,非常繁琐,且资源分散不易管理。

而使用 Zipkin + StarRocks 的方式,只需在代码中引入对应库 SDK,设置上报的 Kafka 地址和采样率等少量配置信息,Tracing 便可自动埋点采集,通过 zikpin server 界面进行查询分析,非常简便。


#04 总结与展望

基于 Zipkin+StarRocks 构建链路追踪系统,能够提供微服务监控的 Monitoring 和 Observability 能力,提升微服务监控的分析能力和工程效率。

后续有几个优化点,可以进一步提升链路追踪系统的分析能力和易用性:

  1. 使用 StarRocks 的 UDAF、窗口函数等功能,将 Parent ID 溯源下沉到 StarRocks计算,通过计算后置的方式,取消对 Flink 的依赖,进一步简化整个系统架构。
  2. 目前对原始日志中的 tag s等字段,并没有完全采集,StarRocks 正在实现 Json 数据类型,能够更好的支持 tags 等嵌套数据类型。
  3. Zipkin Server 目前的界面还稍显简陋,我们已经打通了 Zipkin Server 查询 StarRokcs,后续会对 Zipkin Server 进行 U I等优化,通过 StarRocks 强大的计算能力实现更多的指标查询,进一步提升用户体验。

参考文档

《云原生计算重塑企业IT架构 - 分布式应用架构》:
https://developer.aliyun.com/article/717072

What is Upstream and Downstream in Software Development?
https://reflectoring.io/upstream-downstream/

Metrics, tracing, and logging:
https://peter.bourgon.org/blog/2017/02/21/metrics-tracing-and-logging.html

The 3 pillars of system observability:logs, metrics and tracing:
https://iamondemand.com/blog/the-3-pillars-of-system-observability-logs-metrics-and-tracing/

observability 3 ways: logging, metrics and tracing:
https://speakerdeck.com/adriancole/observability-3-ways-logging-metrics-and-tracing

Dapper, a Large-Scale Distributed Systems Tracing Infrastructure:
https://static.googleusercontent.com/media/research.google.com/en//archive/papers/dapper-2010-1.pdf

Jaeger:www.jaegertracing.io

Zipkin:https://zipkin.io/

opentracing.io:
https://opentracing.io/docs/

opencensus.io:
https://opencensus.io/

opentelemetry.io:
https://opentelemetry.io/docs/

Microservice Observability, Part 1: Disambiguating Observability and Monitoring:
https://bravenewgeek.com/microservice-observability-part-1-disambiguating-observability-and-monitoring/

How to Build Observable Distributed Systems:
https://www.infoq.com/presentations/observable-distributed-ststems/

Monitoring and Observability:
https://copyconstruct.medium.com/monitoring-and-observability-8417d1952e1c

Monitoring Isn't Observability:
https://orangematter.solarwinds.com/2017/09/14/monitoring-isnt-observability/

Spring Cloud Sleuth Documentation:
https://docs.spring.io/spring-cloud-sleuth/docs/current-SNAPSHOT/reference/html/getting-started.html#getting-started