// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rpcutil import ( "net" "strconv" "strings" "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/health" healthgrpc "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" ) // maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb) var maxRPCMessageSize = 1024 * 1024 * 400 // GrpcChannelOptions returns the defaultCallOptions with the max_receive_message_length increased to 400mb // We want to increase the default message size as per pulumi/pulumi#2319 func GrpcChannelOptions() grpc.DialOption { return grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRPCMessageSize)) } // IsBenignCloseErr returns true if the error is "expected" upon shutdown of the server. func IsBenignCloseErr(err error) bool { msg := err.Error() return strings.HasSuffix(msg, "use of closed network connection") || strings.HasSuffix(msg, "grpc: the server has been stopped") } type ServeOptions struct { // Port to listen on. Passing 0 makes the system choose a port automatically. Port int // Initializer for the server. A typical Init registers handlers. Init func(*grpc.Server) error // If non-nil, Serve will gracefully terminate the server when Cancel is closed or receives true. Cancel chan bool // Options for serving gRPC. Options []grpc.ServerOption } type ServeHandle struct { // Port the server is listening on. Port int // The channel is non-nil and is closed when the server stops serving. The server will pass a non-nil error on // this channel if something went wrong in the background and it did not terminate gracefully. Done <-chan error } // ServeWithOptions creates a new gRPC server, calls opts.Init and listens on a TCP port. func ServeWithOptions(opts ServeOptions) (ServeHandle, error) { h, _, err := serveWithOptions(opts) return h, err } func serveWithOptions(opts ServeOptions) (ServeHandle, chan error, error) { port := opts.Port // Listen on a TCP port, but let the kernel choose a free port for us. lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port)) if err != nil { return ServeHandle{Port: port}, nil, errors.Errorf("failed to listen on TCP port ':%v': %v", port, err) } health := health.NewServer() // Now new up a gRPC server and register any RPC interfaces the caller wants. srv := grpc.NewServer(append(opts.Options, grpc.MaxRecvMsgSize(maxRPCMessageSize))...) if opts.Init != nil { if err := opts.Init(srv); err != nil { return ServeHandle{Port: port}, nil, errors.Errorf("failed to Init GRPC to register RPC handlers: %v", err) } } healthgrpc.RegisterHealthServer(srv, health) // enable health checks reflection.Register(srv) // enable reflection. // Set health checks for all the services that they are being served services := srv.GetServiceInfo() for serviceName := range services { health.SetServingStatus(serviceName, healthgrpc.HealthCheckResponse_SERVING) } // If the port was 0, look up what port the kernel chosen, by accessing the underlying TCP listener/address. if port == 0 { tcpl := lis.(*net.TCPListener) tcpa := tcpl.Addr().(*net.TCPAddr) port = tcpa.Port } cancel := opts.Cancel if cancel != nil { go func() { for v, ok := <-cancel; !v && ok; v, ok = <-cancel { } srv.GracefulStop() }() } // Finally, serve; this returns only once the server shuts down (e.g., due to a signal). done := make(chan error) go func() { if err := srv.Serve(lis); err != nil && !IsBenignCloseErr(err) { done <- errors.Errorf("stopped serving: %v", err) } else { done <- nil // send a signal so caller knows we're done, even though it's nil. } close(done) }() return ServeHandle{Port: port, Done: done}, done, nil } // Deprecated. Please use ServeWithOptions and OpenTracingServerInterceptorOptions. func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error, parentSpan opentracing.Span, options ...otgrpc.Option, ) (int, chan error, error) { opts := ServeOptions{ Port: port, Cancel: cancel, Init: func(s *grpc.Server) error { for _, r := range registers { if err := r(s); err != nil { return err } } return nil }, Options: OpenTracingServerInterceptorOptions(parentSpan, options...), } handle, done, err := serveWithOptions(opts) if err != nil { return 0, nil, err } return handle.Port, done, nil }