mirror of https://github.com/pulumi/pulumi.git
164 lines
5.0 KiB
Go
164 lines
5.0 KiB
Go
// Copyright 2016-2022, Pulumi Corporation.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package rpcutil
|
|
|
|
import (
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
|
|
opentracing "github.com/opentracing/opentracing-go"
|
|
"github.com/pkg/errors"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/health"
|
|
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
|
|
"google.golang.org/grpc/reflection"
|
|
)
|
|
|
|
// maxRPCMessageSize raises the gRPC Max Message size from `4194304` (4mb) to `419430400` (400mb)
|
|
var maxRPCMessageSize = 1024 * 1024 * 400
|
|
|
|
// GrpcChannelOptions returns the defaultCallOptions with the max_receive_message_length increased to 400mb
|
|
// We want to increase the default message size as per pulumi/pulumi#2319
|
|
func GrpcChannelOptions() grpc.DialOption {
|
|
return grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxRPCMessageSize))
|
|
}
|
|
|
|
// IsBenignCloseErr returns true if the error is "expected" upon shutdown of the server.
|
|
func IsBenignCloseErr(err error) bool {
|
|
msg := err.Error()
|
|
return strings.HasSuffix(msg, "use of closed network connection") ||
|
|
strings.HasSuffix(msg, "grpc: the server has been stopped")
|
|
}
|
|
|
|
type ServeOptions struct {
|
|
// Port to listen on. Passing 0 makes the system choose a port automatically.
|
|
Port int
|
|
|
|
// Initializer for the server. A typical Init registers handlers.
|
|
Init func(*grpc.Server) error
|
|
|
|
// If non-nil, Serve will gracefully terminate the server when Cancel is closed or receives true.
|
|
Cancel chan bool
|
|
|
|
// Options for serving gRPC.
|
|
Options []grpc.ServerOption
|
|
}
|
|
|
|
type ServeHandle struct {
|
|
// Port the server is listening on.
|
|
Port int
|
|
|
|
// The channel is non-nil and is closed when the server stops serving. The server will pass a non-nil error on
|
|
// this channel if something went wrong in the background and it did not terminate gracefully.
|
|
Done <-chan error
|
|
}
|
|
|
|
// ServeWithOptions creates a new gRPC server, calls opts.Init and listens on a TCP port.
|
|
func ServeWithOptions(opts ServeOptions) (ServeHandle, error) {
|
|
h, _, err := serveWithOptions(opts)
|
|
return h, err
|
|
}
|
|
|
|
func serveWithOptions(opts ServeOptions) (ServeHandle, chan error, error) {
|
|
port := opts.Port
|
|
|
|
// Listen on a TCP port, but let the kernel choose a free port for us.
|
|
lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port))
|
|
if err != nil {
|
|
return ServeHandle{Port: port}, nil,
|
|
errors.Errorf("failed to listen on TCP port ':%v': %v", port, err)
|
|
}
|
|
|
|
health := health.NewServer()
|
|
|
|
// Now new up a gRPC server and register any RPC interfaces the caller wants.
|
|
|
|
srv := grpc.NewServer(append(opts.Options, grpc.MaxRecvMsgSize(maxRPCMessageSize))...)
|
|
|
|
if opts.Init != nil {
|
|
if err := opts.Init(srv); err != nil {
|
|
return ServeHandle{Port: port}, nil,
|
|
errors.Errorf("failed to Init GRPC to register RPC handlers: %v", err)
|
|
}
|
|
}
|
|
|
|
healthgrpc.RegisterHealthServer(srv, health) // enable health checks
|
|
reflection.Register(srv) // enable reflection.
|
|
|
|
// Set health checks for all the services that they are being served
|
|
services := srv.GetServiceInfo()
|
|
for serviceName := range services {
|
|
health.SetServingStatus(serviceName, healthgrpc.HealthCheckResponse_SERVING)
|
|
}
|
|
|
|
// If the port was 0, look up what port the kernel chosen, by accessing the underlying TCP listener/address.
|
|
if port == 0 {
|
|
tcpl := lis.(*net.TCPListener)
|
|
tcpa := tcpl.Addr().(*net.TCPAddr)
|
|
port = tcpa.Port
|
|
}
|
|
|
|
cancel := opts.Cancel
|
|
if cancel != nil {
|
|
go func() {
|
|
for v, ok := <-cancel; !v && ok; v, ok = <-cancel {
|
|
}
|
|
|
|
srv.GracefulStop()
|
|
}()
|
|
}
|
|
|
|
// Finally, serve; this returns only once the server shuts down (e.g., due to a signal).
|
|
done := make(chan error)
|
|
go func() {
|
|
if err := srv.Serve(lis); err != nil && !IsBenignCloseErr(err) {
|
|
done <- errors.Errorf("stopped serving: %v", err)
|
|
} else {
|
|
done <- nil // send a signal so caller knows we're done, even though it's nil.
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
return ServeHandle{Port: port, Done: done}, done, nil
|
|
}
|
|
|
|
// Deprecated. Please use ServeWithOptions and OpenTracingServerInterceptorOptions.
|
|
func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error,
|
|
parentSpan opentracing.Span, options ...otgrpc.Option,
|
|
) (int, chan error, error) {
|
|
opts := ServeOptions{
|
|
Port: port,
|
|
Cancel: cancel,
|
|
Init: func(s *grpc.Server) error {
|
|
for _, r := range registers {
|
|
if err := r(s); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
},
|
|
Options: OpenTracingServerInterceptorOptions(parentSpan, options...),
|
|
}
|
|
|
|
handle, done, err := serveWithOptions(opts)
|
|
if err != nil {
|
|
return 0, nil, err
|
|
}
|
|
|
|
return handle.Port, done, nil
|
|
}
|