pulumi/sdk/go/common/util/rpcutil/interceptor.go

141 lines
5.1 KiB
Go

// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcutil
import (
"os"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil"
"google.golang.org/grpc"
)
// Configures interceptors to propagate OpenTracing metadata through headers. If parentSpan is non-nil, it becomes the
// default parent for orphan spans.
func OpenTracingServerInterceptorOptions(parentSpan opentracing.Span, options ...otgrpc.Option) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(OpenTracingServerInterceptor(parentSpan, options...)),
grpc.ChainStreamInterceptor(OpenTracingStreamServerInterceptor(parentSpan, options...)),
}
}
// OpenTracingServerInterceptor provides a default gRPC server
// interceptor for emitting tracing to the global OpenTracing tracer.
func OpenTracingServerInterceptor(parentSpan opentracing.Span, options ...otgrpc.Option) grpc.UnaryServerInterceptor {
options = append(options, logPayloads()...)
tracer := opentracing.GlobalTracer()
if parentSpan != nil {
tracer = &reparentingTracer{parentSpan.Context(), tracer}
}
return otgrpc.OpenTracingServerInterceptor(tracer, options...)
}
// OpenTracingStreamServerInterceptor is OpenTracingServerInterceptor for instrumenting streaming gRPC calls.
func OpenTracingStreamServerInterceptor(parentSpan opentracing.Span,
options ...otgrpc.Option,
) grpc.StreamServerInterceptor {
options = append(options, logPayloads()...)
tracer := opentracing.GlobalTracer()
if parentSpan != nil {
tracer = &reparentingTracer{parentSpan.Context(), tracer}
}
return otgrpc.OpenTracingStreamServerInterceptor(tracer, options...)
}
// OpenTracingClientInterceptor provides a default gRPC client interceptor for emitting tracing to the global
// OpenTracing tracer.
func OpenTracingClientInterceptor(options ...otgrpc.Option) grpc.UnaryClientInterceptor {
options = append(append(options,
// Do not trace calls to the empty method
otgrpc.IncludingSpans(func(_ opentracing.SpanContext, method string, _, _ interface{}) bool {
return method != ""
})), logPayloads()...)
return otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer(), options...)
}
// OpenTracingStreamClientInterceptor is OpenTracingClientInterceptor for streaming gRPC calls.
func OpenTracingStreamClientInterceptor(options ...otgrpc.Option) grpc.StreamClientInterceptor {
options = append(append(options,
// Do not trace calls to the empty method
otgrpc.IncludingSpans(func(_ opentracing.SpanContext, method string, _, _ interface{}) bool {
return method != ""
})), logPayloads()...)
return otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer(), options...)
}
// Configures gRPC clients with OpenTracing interceptors.
func OpenTracingInterceptorDialOptions(opts ...otgrpc.Option) []grpc.DialOption {
return []grpc.DialOption{
grpc.WithChainUnaryInterceptor(OpenTracingClientInterceptor(opts...)),
grpc.WithChainStreamInterceptor(OpenTracingStreamClientInterceptor(opts...)),
}
}
// Wraps an opentracing.Tracer to reparent orphan traces with a given
// default parent span.
type reparentingTracer struct {
parentSpanContext opentracing.SpanContext
underlying opentracing.Tracer
}
func (t *reparentingTracer) StartSpan(operationName string, opts ...opentracing.StartSpanOption) opentracing.Span {
if !t.hasChildOf(opts...) {
opts = append(opts, opentracing.ChildOf(t.parentSpanContext))
}
return t.underlying.StartSpan(operationName, opts...)
}
func (t *reparentingTracer) Inject(sm opentracing.SpanContext, format interface{}, carrier interface{}) error {
return t.underlying.Inject(sm, format, carrier)
}
func (t *reparentingTracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {
return t.underlying.Extract(format, carrier)
}
func (t *reparentingTracer) packOptions(opts ...opentracing.StartSpanOption) opentracing.StartSpanOptions {
sso := opentracing.StartSpanOptions{}
for _, o := range opts {
o.Apply(&sso)
}
return sso
}
func (t *reparentingTracer) hasChildOf(opts ...opentracing.StartSpanOption) bool {
for _, ref := range t.packOptions(opts...).References {
if ref.Type == opentracing.ChildOfRef {
return true
}
}
return false
}
var _ opentracing.Tracer = &reparentingTracer{}
// Option to log payloads in trace spans. Default is on. Can be
// disabled by setting an env var to reduce tracing overhead.
func logPayloads() []otgrpc.Option {
res := []otgrpc.Option{}
if !cmdutil.IsTruthy(os.Getenv("PULUMI_TRACING_NO_PAYLOADS")) {
res = append(res, otgrpc.LogPayloads())
}
return res
}