pulumi/sdk/go/common/util/cmdutil/trace.go

325 lines
8.9 KiB
Go

// Copyright 2016-2021, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmdutil
import (
"fmt"
"io"
"log"
"net"
"net/url"
"os"
"runtime"
"strings"
"time"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pulumi/appdash"
appdash_opentracing "github.com/pulumi/appdash/opentracing"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
jaeger "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/transport/zipkin"
)
// TracingEndpoint is the Zipkin-compatible tracing endpoint where tracing data will be sent.
var TracingEndpoint string
// TracingToFile indicates if pulumi was called with a file:// scheme URL (--tracing=file:///...).
//
// Deprecated: Even in this case TracingEndpoint will now have the tcp:// scheme and will point to a
// proxy server that will append traces to the user-specified file. Plugins should respect
// TracingEndpoint and ignore TracingToFile.
var TracingToFile bool
var TracingRootSpan opentracing.Span
var traceCloser io.Closer
type localStore struct {
path string
store *appdash.MemoryStore
}
func (s *localStore) Close() error {
f, err := os.Create(s.path)
if err != nil {
return err
}
defer contract.IgnoreClose(f)
return s.store.Write(f)
}
func IsTracingEnabled() bool {
return TracingEndpoint != ""
}
// InitTracing initializes tracing
func InitTracing(name, rootSpanName, tracingEndpoint string) {
// If no tracing endpoint was provided, just return. The default global tracer is already a no-op tracer.
if tracingEndpoint == "" {
return
}
endpointURL, err := url.Parse(tracingEndpoint)
if err != nil {
log.Fatalf("invalid tracing endpoint: %v", err)
}
var tracer opentracing.Tracer
switch {
case endpointURL.Scheme == "file":
// If the endpoint is a file:// URL, use a local tracer.
TracingToFile = true
path := endpointURL.Path
if path == "" {
path = endpointURL.Opaque
}
if path == "" {
log.Fatalf("invalid tracing endpoint: %v", err)
}
store := &localStore{
path: path,
store: appdash.NewMemoryStore(),
}
traceCloser = store
collector := appdash.NewLocalCollector(store.store)
tracer = appdash_opentracing.NewTracer(collector)
proxyEndpoint, err := startProxyAppDashServer(collector)
if err != nil {
log.Fatal(err)
}
// Instead of storing the original endpoint, store the
// proxy endpoint. The TracingEndpoint global var is
// consumed by code forking off sub-processes, and we
// want those sending data to the proxy endpoint, so
// it cleanly lands in the file managed by the parent
// process.
TracingEndpoint = proxyEndpoint
case endpointURL.Scheme == "tcp":
// Store the tracing endpoint
TracingEndpoint = tracingEndpoint
// If the endpoint scheme is tcp, use an Appdash endpoint.
collector := appdash.NewRemoteCollector(endpointURL.Host)
traceCloser = collector
tracer = appdash_opentracing.NewTracer(collector)
default:
// Store the tracing endpoint
TracingEndpoint = tracingEndpoint
// Jaeger tracer can be initialized with a transport that will
// report tracing Spans to a Zipkin backend
transport, err := zipkin.NewHTTPTransport(
tracingEndpoint,
zipkin.HTTPBatchSize(1),
zipkin.HTTPLogger(jaeger.StdLogger),
)
if err != nil {
log.Fatalf("Cannot initialize HTTP transport: %v", err)
}
// create Jaeger tracer
t, closer := jaeger.NewTracer(
name,
jaeger.NewConstSampler(true), // sample all traces
jaeger.NewRemoteReporter(transport))
tracer, traceCloser = t, closer
}
// Set the ambient tracer
opentracing.SetGlobalTracer(tracer)
// If a root span was requested, start it now.
if rootSpanName != "" {
var options []opentracing.StartSpanOption
for _, tag := range rootSpanTags() {
options = append(options, tag)
}
TracingRootSpan = tracer.StartSpan(rootSpanName, options...)
go collectMemStats(rootSpanName)
}
}
// CloseTracing ensures that all pending spans have been flushed. It should be called before process exit.
func CloseTracing() {
if !IsTracingEnabled() {
return
}
if TracingRootSpan != nil {
TracingRootSpan.Finish()
}
contract.IgnoreClose(traceCloser)
}
// Starts an AppDash server listening on any available TCP port
// locally and sends the spans and annotations to the given collector.
// Returns a Pulumi-formatted tracing endpoint pointing to this
// server.
//
// See https://github.com/sourcegraph/appdash/blob/master/cmd/appdash/example_app.go
func startProxyAppDashServer(collector appdash.Collector) (string, error) {
l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0})
if err != nil {
return "", err
}
collectorPort := l.Addr().(*net.TCPAddr).Port
cs := appdash.NewServer(l, collector)
cs.Debug = true
cs.Trace = true
go cs.Start()
// The default sends to stderr, which is unfortunate for
// end-users. Discard for now.
cs.Log = log.New(io.Discard, "appdash", 0)
return fmt.Sprintf("tcp://127.0.0.1:%d", collectorPort), nil
}
// Computes initial tags to write to the `TracingRootSpan`, which can
// be useful for aggregating trace data in benchmarks.
func rootSpanTags() []opentracing.Tag {
tags := []opentracing.Tag{
{
Key: "os.Args",
Value: os.Args,
},
{
Key: "runtime.GOOS",
Value: runtime.GOOS,
},
{
Key: "runtime.GOARCH",
Value: runtime.GOARCH,
},
{
Key: "runtime.NumCPU",
Value: runtime.NumCPU(),
},
}
// Promote all env vars `pulumi_tracing_tag_foo=bar` into tags `foo: bar`.
envPrefix := "pulumi_tracing_tag_"
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
envVarName := strings.ToLower(pair[0])
envVarValue := pair[1]
if strings.HasPrefix(envVarName, envPrefix) {
tags = append(tags, opentracing.Tag{
Key: strings.TrimPrefix(envVarName, envPrefix),
Value: envVarValue,
})
}
}
return tags
}
// Samples memory stats in the background at 1s intervals, and creates
// spans for the data. This is currently opt-in via
// `PULUMI_TRACING_MEMSTATS_POLL_INTERVAL=1s` or similar. Consider
// collecting this by default later whenever tracing is enabled as we
// calibrate that the overhead is low enough.
func collectMemStats(spanPrefix string) {
memStats := runtime.MemStats{}
maxStats := runtime.MemStats{}
poll := func() {
if TracingRootSpan == nil {
return
}
runtime.ReadMemStats(&memStats)
// report cumulative metrics as is
TracingRootSpan.SetTag("runtime.NumCgoCall", runtime.NumCgoCall())
TracingRootSpan.SetTag("MemStats.TotalAlloc", memStats.TotalAlloc)
TracingRootSpan.SetTag("MemStats.Mallocs", memStats.Mallocs)
TracingRootSpan.SetTag("MemStats.Frees", memStats.Frees)
TracingRootSpan.SetTag("MemStats.PauseTotalNs", memStats.PauseTotalNs)
TracingRootSpan.SetTag("MemStats.NumGC", memStats.NumGC)
// for other metrics report the max
if memStats.Sys > maxStats.Sys {
maxStats.Sys = memStats.Sys
TracingRootSpan.SetTag("MemStats.Sys.Max", maxStats.Sys)
}
if memStats.HeapAlloc > maxStats.HeapAlloc {
maxStats.HeapAlloc = memStats.HeapAlloc
TracingRootSpan.SetTag("MemStats.HeapAlloc.Max", maxStats.HeapAlloc)
}
if memStats.HeapSys > maxStats.HeapSys {
maxStats.HeapSys = memStats.HeapSys
TracingRootSpan.SetTag("MemStats.HeapSys.Max", maxStats.HeapSys)
}
if memStats.HeapIdle > maxStats.HeapIdle {
maxStats.HeapIdle = memStats.HeapIdle
TracingRootSpan.SetTag("MemStats.HeapIdle.Max", maxStats.HeapIdle)
}
if memStats.HeapInuse > maxStats.HeapInuse {
maxStats.HeapInuse = memStats.HeapInuse
TracingRootSpan.SetTag("MemStats.HeapInuse.Max", maxStats.HeapInuse)
}
if memStats.HeapReleased > maxStats.HeapReleased {
maxStats.HeapReleased = memStats.HeapReleased
TracingRootSpan.SetTag("MemStats.HeapReleased.Max", maxStats.HeapReleased)
}
if memStats.HeapObjects > maxStats.HeapObjects {
maxStats.HeapObjects = memStats.HeapObjects
TracingRootSpan.SetTag("MemStats.HeapObjects.Max", maxStats.HeapObjects)
}
if memStats.StackInuse > maxStats.StackInuse {
maxStats.StackInuse = memStats.StackInuse
TracingRootSpan.SetTag("MemStats.StackInuse.Max", maxStats.StackInuse)
}
if memStats.StackSys > maxStats.StackSys {
maxStats.StackSys = memStats.StackSys
TracingRootSpan.SetTag("MemStats.StackSys.Max", maxStats.StackSys)
}
}
interval := os.Getenv("PULUMI_TRACING_MEMSTATS_POLL_INTERVAL")
if interval != "" {
intervalDuration, err := time.ParseDuration(interval)
if err == nil {
for {
poll()
time.Sleep(intervalDuration)
}
}
}
}