pulumi/sdk/go/common/util/rpcutil/serve.go

164 lines
5.0 KiB
Go

// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpcutil
import (
"net"
"strconv"
"strings"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
opentracing "github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)
// maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb)
var maxRPCMessageSize = 1024 * 1024 * 400
// GrpcChannelOptions returns the defaultCallOptions with the max_receive_message_length increased to 400mb
// We want to increase the default message size as per pulumi/pulumi#2319
func GrpcChannelOptions() grpc.DialOption {
return grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRPCMessageSize))
}
// IsBenignCloseErr returns true if the error is "expected" upon shutdown of the server.
func IsBenignCloseErr(err error) bool {
msg := err.Error()
return strings.HasSuffix(msg, "use of closed network connection") ||
strings.HasSuffix(msg, "grpc: the server has been stopped")
}
type ServeOptions struct {
// Port to listen on. Passing 0 makes the system choose a port automatically.
Port int
// Initializer for the server. A typical Init registers handlers.
Init func(*grpc.Server) error
// If non-nil, Serve will gracefully terminate the server when Cancel is closed or receives true.
Cancel chan bool
// Options for serving gRPC.
Options []grpc.ServerOption
}
type ServeHandle struct {
// Port the server is listening on.
Port int
// The channel is non-nil and is closed when the server stops serving. The server will pass a non-nil error on
// this channel if something went wrong in the background and it did not terminate gracefully.
Done <-chan error
}
// ServeWithOptions creates a new gRPC server, calls opts.Init and listens on a TCP port.
func ServeWithOptions(opts ServeOptions) (ServeHandle, error) {
h, _, err := serveWithOptions(opts)
return h, err
}
func serveWithOptions(opts ServeOptions) (ServeHandle, chan error, error) {
port := opts.Port
// Listen on a TCP port, but let the kernel choose a free port for us.
lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port))
if err != nil {
return ServeHandle{Port: port}, nil,
errors.Errorf("failed to listen on TCP port ':%v': %v", port, err)
}
health := health.NewServer()
// Now new up a gRPC server and register any RPC interfaces the caller wants.
srv := grpc.NewServer(append(opts.Options, grpc.MaxRecvMsgSize(maxRPCMessageSize))...)
if opts.Init != nil {
if err := opts.Init(srv); err != nil {
return ServeHandle{Port: port}, nil,
errors.Errorf("failed to Init GRPC to register RPC handlers: %v", err)
}
}
healthgrpc.RegisterHealthServer(srv, health) // enable health checks
reflection.Register(srv) // enable reflection.
// Set health checks for all the services that they are being served
services := srv.GetServiceInfo()
for serviceName := range services {
health.SetServingStatus(serviceName, healthgrpc.HealthCheckResponse_SERVING)
}
// If the port was 0, look up what port the kernel chosen, by accessing the underlying TCP listener/address.
if port == 0 {
tcpl := lis.(*net.TCPListener)
tcpa := tcpl.Addr().(*net.TCPAddr)
port = tcpa.Port
}
cancel := opts.Cancel
if cancel != nil {
go func() {
for v, ok := <-cancel; !v && ok; v, ok = <-cancel {
}
srv.GracefulStop()
}()
}
// Finally, serve; this returns only once the server shuts down (e.g., due to a signal).
done := make(chan error)
go func() {
if err := srv.Serve(lis); err != nil && !IsBenignCloseErr(err) {
done <- errors.Errorf("stopped serving: %v", err)
} else {
done <- nil // send a signal so caller knows we're done, even though it's nil.
}
close(done)
}()
return ServeHandle{Port: port, Done: done}, done, nil
}
// Deprecated. Please use ServeWithOptions and OpenTracingServerInterceptorOptions.
func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error,
parentSpan opentracing.Span, options ...otgrpc.Option,
) (int, chan error, error) {
opts := ServeOptions{
Port: port,
Cancel: cancel,
Init: func(s *grpc.Server) error {
for _, r := range registers {
if err := r(s); err != nil {
return err
}
}
return nil
},
Options: OpenTracingServerInterceptorOptions(parentSpan, options...),
}
handle, done, err := serveWithOptions(opts)
if err != nil {
return 0, nil, err
}
return handle.Port, done, nil
}