// Copyright 2016-2018, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"context"
	"encoding/binary"
	"io"

	"google.golang.org/protobuf/types/known/emptypb"

	"google.golang.org/grpc/encoding"
	"google.golang.org/grpc/encoding/proto"
	"google.golang.org/grpc/metadata"

	"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
	pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go"
)

// pipes is the platform agnostic abstraction over a pair of channels we can read and write binary
// data over. It is provided through the `createPipes` functions provided in `proxy_unix.go` (where
// it is implemented on top of fifo files), and in `proxy_windows.go` (where it is implemented on
// top of named pipes).
type pipes interface {
	// The directory containing the two streams to read and write from.  This will be passed to the
	// nodejs process so it can connect to our read and writes streams for communication.
	directory() string

	// Attempt to create and connect to the read and write streams.
	connect() error

	// The stream that we will use to read in requests send to us by the nodejs process.
	reader() io.Reader

	// The stream we will write responses back to the nodejs process with.
	writer() io.Writer

	// called when we're done with the pipes and want to clean up any os resources we may have
	// allocated (for example, actual files and directories on disk).
	shutdown()
}

func createAndServePipes(ctx context.Context, target pulumirpc.ResourceMonitorClient) (pipes, chan error, error) {
	pipes, err := createPipes()
	if err != nil {
		return nil, nil, err
	}

	pipesDone := servePipes(ctx, pipes, target)
	return pipes, pipesDone, nil
}

func servePipes(ctx context.Context, pipes pipes, target pulumirpc.ResourceMonitorClient) chan error {
	done := make(chan error)

	go func() {
		// Keep reading and writing from the pipes until we run into an error or are canceled.
		err := func() error {
			pbcodec := encoding.GetCodec(proto.Name)

			err := pipes.connect()
			if err != nil {
				logging.V(10).Infof("Sync invoke: Error connecting to pipes: %s\n", err)
				return err
			}

			for {
				// read a 4-byte request length
				logging.V(10).Infoln("Sync invoke: Reading length from request pipe")
				var reqLen uint32
				if err := binary.Read(pipes.reader(), binary.BigEndian, &reqLen); err != nil {
					// This is benign on shutdown.
					if err == io.EOF {
						// We were asked to gracefully cancel.  Just exit now.
						logging.V(10).Infof("Sync invoke: Gracefully shutting down")
						return nil
					}

					logging.V(10).Infof("Sync invoke: Received error reading length from pipe: %s\n", err)
					return err
				}

				// read the request in full
				logging.V(10).Infoln("Sync invoke: Reading message from request pipe")
				reqBytes := make([]byte, reqLen)
				if _, err := io.ReadFull(pipes.reader(), reqBytes); err != nil {
					logging.V(10).Infof("Sync invoke: Received error reading message from pipe: %s\n", err)
					return err
				}

				// decode and dispatch the request
				logging.V(10).Infof("Sync invoke: Unmarshalling request")
				var req pulumirpc.ResourceInvokeRequest
				if err := pbcodec.Unmarshal(reqBytes, &req); err != nil {
					logging.V(10).Infof("Sync invoke: Received error reading full from pipe: %s\n", err)
					return err
				}

				logging.V(10).Infof("Sync invoke: Invoking: %s", req.GetTok())
				res, err := target.Invoke(ctx, &req)
				// Unfortunately, `monitor.Invoke` can return errors for non-exceptional cases. This
				// can happen, for example, if the underlying provider call ends up returning an
				// error itself.  A common case of this is the aws-provider which will often return
				// errors like:
				//
				//  aws:ec2/getRouteTable:getRouteTable: Your query returned no results.
				//
				// This is not an error in the sense that something has gone terribly wrong and we
				// need to shutdown.  Rather, it's just a validation issue that needs to be passed
				// back to the user program to handle.
				//
				// In the async codepath, this just returns as a grpc error which is thrown in the
				// sdk code (and can be handled by the user). Unfortunately, our sync-rpc pipes have
				// no way to send either an err or a success result.  They can only send success,
				// and only *terminate* on error.
				//
				// So, to workaround this issue, we abuse the 'success result' slightly.
				// Specifically, we take advantage of the fact that the success-result contains a
				// list of 'failures' to indicate if 'Check' reported any problems.  We just stash
				// this error into that array, knowing that the SDK will see it and immediately
				// throw, just like it would have done for an RPC error in the normal async case.
				if err != nil {
					logging.V(10).Infof("Sync invoke: Received error invoking: %s\n", err)
					logging.V(10).Infof("Sync invoke: Converting error to response.\n")
					if res == nil {
						res = &pulumirpc.InvokeResponse{}
					}

					res.Failures = append(res.Failures, &pulumirpc.CheckFailure{
						Property: "",
						Reason:   err.Error(),
					})
				}

				// encode the response
				logging.V(10).Infof("Sync invoke: Marshalling response")
				resBytes, err := pbcodec.Marshal(res)
				if err != nil {
					logging.V(10).Infof("Sync invoke: Received error marshalling: %s\n", err)
					return err
				}

				// write the 4-byte response length
				logging.V(10).Infoln("Sync invoke: Writing length to request pipe")
				//nolint:gosec // Max message size for protobuf is 2GB, so the int -> uint32 conversion is safe.
				if err := binary.Write(pipes.writer(), binary.BigEndian, uint32(len(resBytes))); err != nil {
					logging.V(10).Infof("Sync invoke: Error writing length to pipe: %s\n", err)
					return err
				}

				// write the response in full
				logging.V(10).Infoln("Sync invoke: Writing message to request pipe")
				if _, err := pipes.writer().Write(resBytes); err != nil {
					logging.V(10).Infof("Sync invoke: Error writing message to pipe: %s\n", err)
					return err
				}
			}
		}()

		// Signal our caller that we're done.
		done <- err
		close(done)

		// cleanup any resources the pipes were holding onto.
		pipes.shutdown()
	}()

	return done
}

// Forward all resource monitor calls that we're serving to nodejs back to the engine to actually
// perform.

type monitorProxy struct {
	pulumirpc.UnsafeResourceMonitorServer

	target pulumirpc.ResourceMonitorClient
}

func (p *monitorProxy) Invoke(
	ctx context.Context, req *pulumirpc.ResourceInvokeRequest,
) (*pulumirpc.InvokeResponse, error) {
	return p.target.Invoke(ctx, req)
}

func (p *monitorProxy) StreamInvoke(
	req *pulumirpc.ResourceInvokeRequest, server pulumirpc.ResourceMonitor_StreamInvokeServer,
) error {
	client, err := p.target.StreamInvoke(context.Background(), req)
	if err != nil {
		return err
	}

	for {
		in, err := client.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}

		if err := server.Send(in); err != nil {
			return err
		}
	}
}

func (p *monitorProxy) Call(
	ctx context.Context, req *pulumirpc.ResourceCallRequest,
) (*pulumirpc.CallResponse, error) {
	return p.target.Call(ctx, req)
}

func (p *monitorProxy) ReadResource(
	ctx context.Context, req *pulumirpc.ReadResourceRequest,
) (*pulumirpc.ReadResourceResponse, error) {
	return p.target.ReadResource(ctx, req)
}

func (p *monitorProxy) RegisterResource(
	ctx context.Context, req *pulumirpc.RegisterResourceRequest,
) (*pulumirpc.RegisterResourceResponse, error) {
	// Add the "pulumi-runtime" header to the context so the engine can detect this request
	// is coming from the nodejs language plugin.
	// Setting this header is not required for other SDKs. We do it for nodejs so the engine
	// can workaround a bug in older Node.js SDKs where alias specs weren't specified correctly.
	ctx = metadata.AppendToOutgoingContext(ctx, "pulumi-runtime", "nodejs")
	return p.target.RegisterResource(ctx, req)
}

func (p *monitorProxy) RegisterResourceOutputs(
	ctx context.Context, req *pulumirpc.RegisterResourceOutputsRequest,
) (*emptypb.Empty, error) {
	return p.target.RegisterResourceOutputs(ctx, req)
}

func (p *monitorProxy) SupportsFeature(
	ctx context.Context, req *pulumirpc.SupportsFeatureRequest,
) (*pulumirpc.SupportsFeatureResponse, error) {
	return p.target.SupportsFeature(ctx, req)
}

func (p *monitorProxy) RegisterStackTransform(
	ctx context.Context, req *pulumirpc.Callback,
) (*emptypb.Empty, error) {
	return p.target.RegisterStackTransform(ctx, req)
}

func (p *monitorProxy) RegisterStackInvokeTransform(
	ctx context.Context, req *pulumirpc.Callback,
) (*emptypb.Empty, error) {
	return p.target.RegisterStackInvokeTransform(ctx, req)
}

func (p *monitorProxy) RegisterPackage(
	ctx context.Context, req *pulumirpc.RegisterPackageRequest,
) (*pulumirpc.RegisterPackageResponse, error) {
	return p.target.RegisterPackage(ctx, req)
}