pulumi/pkg/util/rpcdebug/interceptors.go

389 lines
9.7 KiB
Go
Raw Permalink Normal View History

2022-11-01 15:15:09 +00:00
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcdebug
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"reflect"
"sync"
"google.golang.org/grpc"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/metadata"
)
type DebugInterceptor struct {
logFile string
mutex *sync.Mutex
}
type DebugInterceptorOptions struct {
LogFile string
Mutex *sync.Mutex
}
type LogOptions struct {
Metadata interface{}
}
// Each LogFile should have a unique instance of DebugInterceptor for proper locking.
func NewDebugInterceptor(opts DebugInterceptorOptions) (*DebugInterceptor, error) {
if opts.LogFile == "" {
return nil, fmt.Errorf("logFile cannot be empty")
}
i := &DebugInterceptor{logFile: opts.LogFile}
if opts.Mutex != nil {
i.mutex = opts.Mutex
} else {
i.mutex = &sync.Mutex{}
}
return i, nil
}
func (i *DebugInterceptor) ServerOptions(opts LogOptions) []grpc.ServerOption {
return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(i.DebugServerInterceptor(opts)),
grpc.ChainStreamInterceptor(i.DebugStreamServerInterceptor(opts)),
}
}
func (i *DebugInterceptor) DialOptions(opts LogOptions) []grpc.DialOption {
return []grpc.DialOption{
grpc.WithChainUnaryInterceptor(i.DebugClientInterceptor(opts)),
grpc.WithChainStreamInterceptor(i.DebugStreamClientInterceptor(opts)),
}
}
// Logs all gRPC converations in JSON format.
//
// To enable, call InitDebugInterceptors first in your process main to
// configure the location of the Go file.
func (i *DebugInterceptor) DebugServerInterceptor(opts LogOptions) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{},
all: Reformat with gofumpt Per team discussion, switching to gofumpt. [gofumpt][1] is an alternative, stricter alternative to gofmt. It addresses other stylistic concerns that gofmt doesn't yet cover. [1]: https://github.com/mvdan/gofumpt See the full list of [Added rules][2], but it includes: - Dropping empty lines around function bodies - Dropping unnecessary variable grouping when there's only one variable - Ensuring an empty line between multi-line functions - simplification (`-s` in gofmt) is always enabled - Ensuring multi-line function signatures end with `) {` on a separate line. [2]: https://github.com/mvdan/gofumpt#Added-rules gofumpt is stricter, but there's no lock-in. All gofumpt output is valid gofmt output, so if we decide we don't like it, it's easy to switch back without any code changes. gofumpt support is built into the tooling we use for development so this won't change development workflows. - golangci-lint includes a gofumpt check (enabled in this PR) - gopls, the LSP for Go, includes a gofumpt option (see [installation instrutions][3]) [3]: https://github.com/mvdan/gofumpt#installation This change was generated by running: ```bash gofumpt -w $(rg --files -g '*.go' | rg -v testdata | rg -v compilation_error) ``` The following files were manually tweaked afterwards: - pkg/cmd/pulumi/stack_change_secrets_provider.go: one of the lines overflowed and had comments in an inconvenient place - pkg/cmd/pulumi/destroy.go: `var x T = y` where `T` wasn't necessary - pkg/cmd/pulumi/policy_new.go: long line because of error message - pkg/backend/snapshot_test.go: long line trying to assign three variables in the same assignment I have included mention of gofumpt in the CONTRIBUTING.md.
2023-03-03 16:36:39 +00:00
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (interface{}, error) {
2022-11-01 15:15:09 +00:00
log := debugInterceptorLogEntry{
Method: info.FullMethod,
Metadata: opts.Metadata,
}
i.trackRequest(&log, req)
resp, err := handler(ctx, req)
i.trackResponse(&log, resp)
if e := i.record(log); e != nil {
return resp, e
}
return resp, err
}
}
// Like debugServerInterceptor but for streaming calls.
func (i *DebugInterceptor) DebugStreamServerInterceptor(opts LogOptions) grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ssWrapped := &debugServerStream{
interceptor: i,
method: info.FullMethod,
innerServerStream: ss,
metadata: opts.Metadata,
}
err := handler(srv, ssWrapped)
return err
}
}
// Like debugServerInterceptor but for GRPC client connections.
func (i *DebugInterceptor) DebugClientInterceptor(opts LogOptions) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{},
all: Reformat with gofumpt Per team discussion, switching to gofumpt. [gofumpt][1] is an alternative, stricter alternative to gofmt. It addresses other stylistic concerns that gofmt doesn't yet cover. [1]: https://github.com/mvdan/gofumpt See the full list of [Added rules][2], but it includes: - Dropping empty lines around function bodies - Dropping unnecessary variable grouping when there's only one variable - Ensuring an empty line between multi-line functions - simplification (`-s` in gofmt) is always enabled - Ensuring multi-line function signatures end with `) {` on a separate line. [2]: https://github.com/mvdan/gofumpt#Added-rules gofumpt is stricter, but there's no lock-in. All gofumpt output is valid gofmt output, so if we decide we don't like it, it's easy to switch back without any code changes. gofumpt support is built into the tooling we use for development so this won't change development workflows. - golangci-lint includes a gofumpt check (enabled in this PR) - gopls, the LSP for Go, includes a gofumpt option (see [installation instrutions][3]) [3]: https://github.com/mvdan/gofumpt#installation This change was generated by running: ```bash gofumpt -w $(rg --files -g '*.go' | rg -v testdata | rg -v compilation_error) ``` The following files were manually tweaked afterwards: - pkg/cmd/pulumi/stack_change_secrets_provider.go: one of the lines overflowed and had comments in an inconvenient place - pkg/cmd/pulumi/destroy.go: `var x T = y` where `T` wasn't necessary - pkg/cmd/pulumi/policy_new.go: long line because of error message - pkg/backend/snapshot_test.go: long line trying to assign three variables in the same assignment I have included mention of gofumpt in the CONTRIBUTING.md.
2023-03-03 16:36:39 +00:00
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, gopts ...grpc.CallOption,
) error {
2022-11-01 15:15:09 +00:00
// Ignoring weird entries with empty method and nil req and reply.
if method == "" {
return invoker(ctx, method, req, reply, cc, gopts...)
}
log := debugInterceptorLogEntry{
Method: method,
Metadata: opts.Metadata,
}
i.trackRequest(&log, req)
err := invoker(ctx, method, req, reply, cc, gopts...)
if err != nil {
i.track(&log, err)
} else {
i.trackResponse(&log, reply)
}
2022-11-01 15:15:09 +00:00
if e := i.record(log); e != nil {
return e
}
return err
}
}
// Like debugClientInterceptor but for streaming calls.
func (i *DebugInterceptor) DebugStreamClientInterceptor(opts LogOptions) grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string,
all: Reformat with gofumpt Per team discussion, switching to gofumpt. [gofumpt][1] is an alternative, stricter alternative to gofmt. It addresses other stylistic concerns that gofmt doesn't yet cover. [1]: https://github.com/mvdan/gofumpt See the full list of [Added rules][2], but it includes: - Dropping empty lines around function bodies - Dropping unnecessary variable grouping when there's only one variable - Ensuring an empty line between multi-line functions - simplification (`-s` in gofmt) is always enabled - Ensuring multi-line function signatures end with `) {` on a separate line. [2]: https://github.com/mvdan/gofumpt#Added-rules gofumpt is stricter, but there's no lock-in. All gofumpt output is valid gofmt output, so if we decide we don't like it, it's easy to switch back without any code changes. gofumpt support is built into the tooling we use for development so this won't change development workflows. - golangci-lint includes a gofumpt check (enabled in this PR) - gopls, the LSP for Go, includes a gofumpt option (see [installation instrutions][3]) [3]: https://github.com/mvdan/gofumpt#installation This change was generated by running: ```bash gofumpt -w $(rg --files -g '*.go' | rg -v testdata | rg -v compilation_error) ``` The following files were manually tweaked afterwards: - pkg/cmd/pulumi/stack_change_secrets_provider.go: one of the lines overflowed and had comments in an inconvenient place - pkg/cmd/pulumi/destroy.go: `var x T = y` where `T` wasn't necessary - pkg/cmd/pulumi/policy_new.go: long line because of error message - pkg/backend/snapshot_test.go: long line trying to assign three variables in the same assignment I have included mention of gofumpt in the CONTRIBUTING.md.
2023-03-03 16:36:39 +00:00
streamer grpc.Streamer, gopts ...grpc.CallOption,
) (grpc.ClientStream, error) {
2022-11-01 15:15:09 +00:00
stream, err := streamer(ctx, desc, cc, method, gopts...)
wrappedStream := &debugClientStream{
innerClientStream: stream,
interceptor: i,
method: method,
metadata: opts.Metadata,
}
return wrappedStream, err
}
}
func (i *DebugInterceptor) record(log debugInterceptorLogEntry) error {
i.mutex.Lock()
defer i.mutex.Unlock()
all: Reformat with gofumpt Per team discussion, switching to gofumpt. [gofumpt][1] is an alternative, stricter alternative to gofmt. It addresses other stylistic concerns that gofmt doesn't yet cover. [1]: https://github.com/mvdan/gofumpt See the full list of [Added rules][2], but it includes: - Dropping empty lines around function bodies - Dropping unnecessary variable grouping when there's only one variable - Ensuring an empty line between multi-line functions - simplification (`-s` in gofmt) is always enabled - Ensuring multi-line function signatures end with `) {` on a separate line. [2]: https://github.com/mvdan/gofumpt#Added-rules gofumpt is stricter, but there's no lock-in. All gofumpt output is valid gofmt output, so if we decide we don't like it, it's easy to switch back without any code changes. gofumpt support is built into the tooling we use for development so this won't change development workflows. - golangci-lint includes a gofumpt check (enabled in this PR) - gopls, the LSP for Go, includes a gofumpt option (see [installation instrutions][3]) [3]: https://github.com/mvdan/gofumpt#installation This change was generated by running: ```bash gofumpt -w $(rg --files -g '*.go' | rg -v testdata | rg -v compilation_error) ``` The following files were manually tweaked afterwards: - pkg/cmd/pulumi/stack_change_secrets_provider.go: one of the lines overflowed and had comments in an inconvenient place - pkg/cmd/pulumi/destroy.go: `var x T = y` where `T` wasn't necessary - pkg/cmd/pulumi/policy_new.go: long line because of error message - pkg/backend/snapshot_test.go: long line trying to assign three variables in the same assignment I have included mention of gofumpt in the CONTRIBUTING.md.
2023-03-03 16:36:39 +00:00
f, err := os.OpenFile(i.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o600)
2022-11-01 15:15:09 +00:00
if err != nil {
return fmt.Errorf("Failed to append GRPC debug logs to file %s: %v", i.logFile, err)
}
defer f.Close()
if err := json.NewEncoder(f).Encode(log); err != nil {
return fmt.Errorf("Failed to encode GRPC debug logs: %v", err)
}
return nil
}
func (*DebugInterceptor) track(log *debugInterceptorLogEntry, err error) {
log.Errors = append(log.Errors, err.Error())
}
func (i *DebugInterceptor) trackRequest(log *debugInterceptorLogEntry, req interface{}) {
j, err := i.transcode(req)
if err != nil {
i.track(log, err)
} else {
log.Request = j
}
}
func (i *DebugInterceptor) trackResponse(log *debugInterceptorLogEntry, resp interface{}) {
j, err := i.transcode(resp)
if err != nil {
i.track(log, err)
} else {
log.Response = j
}
}
func (*DebugInterceptor) transcode(obj interface{}) (json.RawMessage, error) {
if obj == nil {
return json.RawMessage("null"), nil
}
m, ok := obj.(proto.Message)
if !ok {
return json.RawMessage("null"),
fmt.Errorf("Failed to decode, expecting proto.Message, got %v",
reflect.TypeOf(obj))
}
jsonSer := jsonpb.Marshaler{}
buf := bytes.Buffer{}
if err := jsonSer.Marshal(&buf, m); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Wraps grpc.ServerStream with interceptor hooks for SendMsg, RecvMsg.
type debugServerStream struct {
innerServerStream grpc.ServerStream
interceptor *DebugInterceptor
method string
metadata interface{}
}
func (dss *debugServerStream) errorEntry(err error) debugInterceptorLogEntry {
return debugInterceptorLogEntry{
Metadata: dss.metadata,
Method: dss.method,
Errors: []string{err.Error()},
}
}
func (dss *debugServerStream) SetHeader(md metadata.MD) error {
return dss.innerServerStream.SetHeader(md)
}
func (dss *debugServerStream) SendHeader(md metadata.MD) error {
return dss.innerServerStream.SendHeader(md)
}
func (dss *debugServerStream) SetTrailer(md metadata.MD) {
dss.innerServerStream.SetTrailer(md)
}
func (dss *debugServerStream) Context() context.Context {
return dss.innerServerStream.Context()
}
func (dss *debugServerStream) SendMsg(m interface{}) error {
err := dss.innerServerStream.SendMsg(m)
if err != nil {
if e := dss.interceptor.record(dss.errorEntry(err)); e != nil {
return e
}
} else {
req, err := dss.interceptor.transcode(m)
if err != nil {
if e := dss.interceptor.record(dss.errorEntry(err)); e != nil {
return e
}
} else {
if e := dss.interceptor.record(debugInterceptorLogEntry{
Metadata: dss.metadata,
Method: dss.method,
Request: req,
}); e != nil {
return e
}
}
}
return err
}
func (dss *debugServerStream) RecvMsg(m interface{}) error {
err := dss.innerServerStream.RecvMsg(m)
if err == io.EOF {
return err
} else if err != nil {
if e := dss.interceptor.record(dss.errorEntry(err)); e != nil {
return e
}
} else {
resp, err := dss.interceptor.transcode(m)
if err != nil {
if e := dss.interceptor.record(dss.errorEntry(err)); e != nil {
return e
}
} else {
if e := dss.interceptor.record(debugInterceptorLogEntry{
Method: dss.method,
Metadata: dss.metadata,
Response: resp,
}); e != nil {
return e
}
}
}
return err
}
var _ grpc.ServerStream = &debugServerStream{}
// Wraps grpc.ClientStream with interceptor hooks for SendMsg, RecvMsg.
type debugClientStream struct {
innerClientStream grpc.ClientStream
interceptor *DebugInterceptor
method string
metadata interface{}
}
func (d *debugClientStream) errorEntry(err error) debugInterceptorLogEntry {
return debugInterceptorLogEntry{
Method: d.method,
Metadata: d.metadata,
Errors: []string{err.Error()},
}
}
func (d *debugClientStream) Header() (metadata.MD, error) {
return d.innerClientStream.Header()
}
func (d *debugClientStream) Trailer() metadata.MD {
return d.innerClientStream.Trailer()
}
func (d *debugClientStream) CloseSend() error {
return d.innerClientStream.CloseSend()
}
func (d *debugClientStream) Context() context.Context {
return d.innerClientStream.Context()
}
func (d *debugClientStream) SendMsg(m interface{}) error {
err := d.innerClientStream.SendMsg(m)
if err != nil {
if e := d.interceptor.record(d.errorEntry(err)); e != nil {
return e
}
} else {
req, err := d.interceptor.transcode(m)
if err != nil {
if e := d.interceptor.record(d.errorEntry(err)); e != nil {
return e
}
} else {
if e := d.interceptor.record(debugInterceptorLogEntry{
Method: d.method,
Metadata: d.metadata,
Request: req,
}); e != nil {
return e
}
}
}
return err
}
func (d *debugClientStream) RecvMsg(m interface{}) error {
err := d.innerClientStream.RecvMsg(m)
if err == io.EOF {
return err
} else if err != nil {
if e := d.interceptor.record(d.errorEntry(err)); e != nil {
return e
}
} else {
resp, err := d.interceptor.transcode(m)
if err != nil {
if e := d.interceptor.record(d.errorEntry(err)); e != nil {
return e
}
} else {
if e := d.interceptor.record(debugInterceptorLogEntry{
Method: d.method,
Metadata: d.metadata,
Response: resp,
}); e != nil {
return e
}
}
}
return err
}
var _ grpc.ClientStream = &debugClientStream{}