上一章我们介绍了分布式链路追踪的原理、opentracing 规范并对 jaeger 进行了简单介绍,这一章我们将介绍使用 jaeger 来进行分布式链路追踪的具体实现。

一、运行效果

由于很多同学没有接触过分布式链路追踪,所以我们这一次改变方式,先给大家看一下我们需要实现的效果,然后一一讲解怎么去实现。

代码有一个 examples 目录,里面放了一些简单的 demo。jaeger 就放了使用 jaeger 实现链路追踪的 demo,注意,在运行 demo 之前,我们需要先运行 jaeger(具体怎么运行可以参考上章,jaeger run 起来后默认在 localhost:16686 监听),我们先运行起来看看效果,如下:

cd examples/jaeger
go run server.go
### 另起终端
go run client.go

在 Jaeger UI 的最左边可以看到有 gorpc-client-jaeger、gorpc-server-jaeger 两个 Service,点击右边的 Trace 可以看到具体的调用链,我们不妨拿 client 举例,如下:

img

点击 /helloworld.Greeter/SayHello 可以看到 client 的具体的调用链,如下:

img

二、实现思路

上面的调用链路其实非常简单,要展示出上图,只需要分别为 client 和 server 上报一个 span 即可。那么我们如何为 client 和 server 上报 span 呢?这其实就是我们的核心问题。

我们知道,分布式链路追踪其实是类似 ”埋点“,既然是 ”埋点“,我们希望对框架代码来说,尽量做到 “无侵入”。要做到这一点,我们考虑上报使用拦截器实现,同时,将 jaeger 和框架解耦,用插件化的思想,将 jaeger 初始化这一步统一放到插件初始化里面去做。

三、插件化实现

上面说到了,jaeger 的初始化过程是使用插件化实现的。

tracing 类插件的初始化过程比较特殊,需要返回一个 Tracer,所以这里单独定义一类 TracingPlugin 用来实现 tracing 插件的初始化。

// TracingPlugin defines the standard for all tracing plug-ins
type TracingPlugin interface {
   Init(...Option) (opentracing.Tracer, error)
}

这里我们定义一个结构 Jaeger 来实现 TracingPlugin 接口

// Jaeger implements the opentracing specification
type Jaeger struct {
   opts *plugin.Options
}

实现 Init(…Option) (opentracing.Tracer, error) 接口,如下:

func (j *Jaeger) Init(opts ...plugin.Option) (opentracing.Tracer, error) {

	for _, o := range opts {
		o(j.opts)
	}

	if j.opts.TracingSvrAddr == "" {
		return nil, errors.New("jaeger init error, traingSvrAddr is empty")
	}

	return initJaeger(j.opts.TracingSvrAddr, JaegerServerName, opts ...)

}

func initJaeger(tracingSvrAddr string, jaegerServiceName string, opts ... plugin.Option) (opentracing.Tracer, error) {
   cfg := &config.Configuration{
      Sampler : &config.SamplerConfig{
         Type : "const",  // Fixed sampling
         Param : 1,       // 1= full sampling, 0= no sampling
      },
      Reporter : &config.ReporterConfig{
         LogSpans: true,
         LocalAgentHostPort: tracingSvrAddr,
      },
      ServiceName : jaegerServiceName,
   }

   tracer, _, err := cfg.NewTracer()
   if err != nil {
      return nil, err
   }

   opentracing.SetGlobalTracer(tracer)

   return tracer, err
}

上面一段代码核心其实主要做了两件事,第一是初始化 jaeger 的配置,并且通过这些初始化配置来创建一个 tracer 实例,第二是将这个 tracer 实例作为 opentracing 规范的实现。之前我们说到了 opentracing 只是一套规范,并没有进行链路追踪的具体实现,所以无论你是使用 jaeger 还是 zipkin 等其他链路追踪系统,你都需要进行 opentracing.SetGlobalTracer(tracer) ,将 tracer 设为 opentracing 的真正实现。

四、 jaeger 初始化

因为 client 和 server 都要生成 span 并且进行上报,所以 client 和 server 都需要对 jaeger 进行初始化。

server 初始化

server 侧 jaeger 的初始化过程,也是在插件初始化的步骤中完成的,之前我们说了插件初始化过程的 consul 初始化,server 的 Serve 方法中,在所有的服务 service 进行初始化之前,会去进行插件的初始化,如下:

func (s *Server) Serve() {

   err := s.InitPlugins()
   if err != nil {
      panic(err)
   }

   ...
   for _, service := range s.services {
      go service.Serve(s.opts)
   }  
}

InitPlugins 这个方法完成了所有的插件初始化操作,consul 和 jaeger 的初始化都是在这个方法中完成的。这里我们只专注于 jaeger 的初始化,如下:

func (s *Server) InitPlugins() error {
   // init plugins
   for _, p := range s.plugins {

      switch val := p.(type) {

      case plugin.ResolverPlugin :
        ...

      case plugin.TracingPlugin :

         pluginOpts := []plugin.Option {
            plugin.WithTracingSvrAddr(s.opts.tracingSvrAddr),
         }

         tracer, err := val.Init(pluginOpts ...)
         if err != nil {
            log.Errorf("tracing init error, %v", err)
            return err
         }

         s.opts.interceptors = append(s.opts.interceptors, jaeger.OpenTracingServerInterceptor(tracer, s.opts.tracingSpanName))

      default :

      }

   }

   return nil
}

上面这段代码在插件初始化过程中,假如发现是 tracing 类插件,则会调用插件的 Init 方法去初始化,然后使用 jaeger.OpenTracingServerInterceptor 将 tracer 封装为拦截器,并添加到 server 的拦截器列表中。

client 初始化

client 的初始化,是在往下游发起调用之前实现的,如下:

func main() {

	tracer, err := jaeger.Init("localhost:6831")
	if err != nil {
		panic(err)
	}

	opts := []client.Option {
		client.WithTarget("127.0.0.1:8000"),
		client.WithNetwork("tcp"),
		client.WithTimeout(2000 * time.Millisecond),
		client.WithInterceptor(jaeger.OpenTracingClientInterceptor(tracer, "/helloworld.Greeter/SayHello")),
	}
	c := client.DefaultClient
	req := &helloworld.HelloRequest{
		Msg: "hello",
	}
	rsp := &helloworld.HelloReply{}

	for i:= 1; i< 200; i ++ {
		err = c.Call(context.Background(), "/helloworld.Greeter/SayHello", req, rsp, opts ...)
		fmt.Println(rsp.Msg, err)
		time.Sleep(100 * time.Millisecond)
	}

}

上面一大段代码主要是有关键的两步操作,第一,进行 jaeger 初始化 ,如下:

tracer, err := jaeger.Init("localhost:6831")

第二,在 client 的 Option 中,使用 jaeger.OpenTracingClientInterceptor 将 tracer 封装为一个拦截器,并且添加到 client 的拦截器列表中。

五、拦截器方式实现 jaeger 的 span 生成

上面我们说到了,为了减少对框架代码的侵入性,我们采用拦截器的方式实现 span 的生成。

server 拦截器

server 端上报 span,主要的步骤是,先调用 tracer.Extract 解析 Span 的上下文信息,获得一个 SpanContext,接着调用 tracer.StartSpan 进行创建一个 server span,并且把 server span 放到上下文 context 中进行透传,jeager 会自动对 span 进行上报。代码如下:

// OpenTracingServerInterceptor packaging jaeger tracer as a server interceptor
func OpenTracingServerInterceptor(tracer opentracing.Tracer, spanName string) interceptor.ServerInterceptor {

   return func(ctx context.Context, req interface{}, handler interceptor.Handler) (interface{}, error) {

      mdCarrier := &jaegerCarrier{}

      spanContext, err := tracer.Extract(opentracing.HTTPHeaders, mdCarrier)
      if err != nil && err != opentracing.ErrSpanContextNotFound {
         return nil, errors.New(fmt.Sprintf("tracer extract error : %v", err))
      }
      serverSpan := tracer.StartSpan(spanName, ext.RPCServerOption(spanContext),ext.SpanKindRPCServer)
      defer serverSpan.Finish()

      ctx = opentracing.ContextWithSpan(ctx, serverSpan)

      serverSpan.LogFields(log.String("spanName", spanName))

      return handler(ctx, req)
   }

}

client 拦截器

client 端上报 span,主要的步骤是,先通过 opentracing.SpanFromContext 获取上游带下来的 span 上下文信息,接着调用 tracer.StartSpan 创建一个 client span,通过调用 tracer.Inject,将所需要透传给下游的一些信息塞到 Span 里面。jaegerCarrier 是一种 map[string] []byte 结构,用来作为传输一些 key-value 数据的载体。具体代码实现如下:

type jaegerCarrier map[string][]byte

// OpenTracingClientInterceptor packaging jaeger tracer as a client interceptor
func OpenTracingClientInterceptor(tracer opentracing.Tracer, spanName string) interceptor.ClientInterceptor {

   return func (ctx context.Context, req, rsp interface{}, ivk interceptor.Invoker) error {

      var parentCtx opentracing.SpanContext
  
      if parent := opentracing.SpanFromContext(ctx); parent != nil {
        parentCtx = parent.Context()
      }

      clientSpan := tracer.StartSpan(spanName, ext.SpanKindRPCClient, opentracing.ChildOf(parentCtx))
      defer clientSpan.Finish()

      mdCarrier := &jaegerCarrier{}

      if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdCarrier); err != nil {
         clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
      }

      clientSpan.LogFields(log.String("spanName", spanName))

      return ivk(ctx, req, rsp)

   }
}