pulumi/pkg/backend/httpstate/client/api.go

549 lines
16 KiB
Go

// Copyright 2016-2023, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/google/go-querystring/query"
"github.com/opentracing/opentracing-go"
"github.com/pulumi/pulumi/pkg/v3/util/tracing"
"github.com/pulumi/pulumi/pkg/v3/version"
"github.com/pulumi/pulumi/sdk/v3/go/common/apitype"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/tokens"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/httputil"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
)
const (
apiRequestLogLevel = 10 // log level for logging API requests and responses
apiRequestDetailLogLevel = 11 // log level for logging extra details about API requests and responses
)
func UserAgent() string {
return fmt.Sprintf("pulumi-cli/1 (%s; %s)", version.Version, runtime.GOOS)
}
// StackIdentifier is the set of data needed to identify a Pulumi Cloud stack.
type StackIdentifier struct {
Owner string
Project string
Stack tokens.StackName
}
func (s StackIdentifier) String() string {
return fmt.Sprintf("%s/%s/%s", s.Owner, s.Project, s.Stack)
}
// UpdateIdentifier is the set of data needed to identify an update to a Pulumi Cloud stack.
type UpdateIdentifier struct {
StackIdentifier
UpdateKind apitype.UpdateKind
UpdateID string
}
// accessTokenKind is enumerates the various types of access token used with the Pulumi API. These kinds correspond
// directly to the "method" piece of an HTTP `Authorization` header.
type accessTokenKind string
const (
// accessTokenKindAPIToken denotes a standard Pulumi API token.
accessTokenKindAPIToken accessTokenKind = "token"
// accessTokenKindUpdateToken denotes an update lease token.
accessTokenKindUpdateToken accessTokenKind = "update-token"
)
// accessToken is an abstraction over the two different kinds of access tokens used by the Pulumi API.
type accessToken interface {
Kind() accessTokenKind
Get(ctx context.Context) (string, error)
}
type httpCallOptions struct {
// RetryPolicy defines the policy for retrying requests by httpClient.Do.
//
// By default, only GET requests are retried.
RetryPolicy retryPolicy
// GzipCompress compresses the request using gzip before sending it.
GzipCompress bool
// Header is any additional headers to add to the request.
Header http.Header
// ErrorResponse is an optional response body for errors.
ErrorResponse any
// StreamRequest streams the request body instead of marshaling and compressing it in-memory.
StreamRequest bool
}
// apiAccessToken is an implementation of accessToken for Pulumi API tokens (i.e. tokens of kind
// accessTokenKindAPIToken)
type apiAccessToken string
func (apiAccessToken) Kind() accessTokenKind {
return accessTokenKindAPIToken
}
func (t apiAccessToken) Get(_ context.Context) (string, error) {
return string(t), nil
}
// UpdateTokenSource allows the API client to request tokens for an in-progress update as near as possible to the
// actual API call (e.g. after marshaling, etc.).
type UpdateTokenSource interface {
GetToken(ctx context.Context) (string, error)
}
type updateTokenStaticSource string
func (t updateTokenStaticSource) GetToken(_ context.Context) (string, error) {
return string(t), nil
}
// updateToken is an implementation of accessToken for update lease tokens (i.e. tokens of kind
// accessTokenKindUpdateToken)
type updateToken struct {
source UpdateTokenSource
}
func updateAccessToken(source UpdateTokenSource) updateToken {
return updateToken{source: source}
}
func (updateToken) Kind() accessTokenKind {
return accessTokenKindUpdateToken
}
func (t updateToken) Get(ctx context.Context) (string, error) {
return t.source.GetToken(ctx)
}
func float64Ptr(f float64) *float64 {
return &f
}
func durationPtr(d time.Duration) *time.Duration {
return &d
}
func intPtr(i int) *int {
return &i
}
// retryPolicy defines the policy for retrying requests by httpClient.Do.
type retryPolicy int
const (
// retryNone indicates that no retry should be attempted.
retryNone retryPolicy = iota - 1
// retryGetMethod indicates that only GET requests should be retried.
//
// This is the default retry policy.
retryGetMethod // == 0
// retryAllMethods indicates that all requests should be retried.
retryAllMethods
)
func (p retryPolicy) String() string {
switch p {
case retryNone:
return "none"
case retryGetMethod:
return "get"
case retryAllMethods:
return "all"
default:
return fmt.Sprintf("retryPolicy(%d)", p)
}
}
func (p retryPolicy) shouldRetry(req *http.Request) bool {
switch p {
case retryNone:
return false
case retryGetMethod:
return req.Method == http.MethodGet
case retryAllMethods:
return true
default:
contract.Failf("unknown retry policy: %v", p)
return false // unreachable
}
}
// httpClient is an HTTP client abstraction, used by defaultRESTClient.
type httpClient interface {
Do(req *http.Request, policy retryPolicy) (*http.Response, error)
}
// defaultHTTPClient is an implementation of httpClient that provides a basic implementation of Do
// using the specified *http.Client, with retry support.
type defaultHTTPClient struct {
client *http.Client
}
func (c *defaultHTTPClient) Do(req *http.Request, policy retryPolicy) (*http.Response, error) {
if policy.shouldRetry(req) {
// Wait 1s before retrying on failure. Then increase by 2x until the
// maximum delay is reached. Stop after maxRetryCount requests have
// been made.
opts := httputil.RetryOpts{
Delay: durationPtr(time.Second),
Backoff: float64Ptr(2.0),
MaxDelay: durationPtr(30 * time.Second),
MaxRetryCount: intPtr(4),
}
return httputil.DoWithRetryOpts(req, c.client, opts)
}
return c.client.Do(req)
}
type nopCloseWriter struct {
io.Writer
}
func (w nopCloseWriter) Close() error {
return nil
}
var gzipPool = sync.Pool{
New: func() any {
return gzip.NewWriter(io.Discard)
},
}
// pulumiAPICall makes an HTTP request to the Pulumi API.
func pulumiAPICall(ctx context.Context,
requestSpan opentracing.Span,
d diag.Sink, client httpClient, cloudAPI, method, path string, body io.WriterTo,
tok accessToken, opts httpCallOptions,
) (string, *http.Response, error) {
// Normalize URL components
cloudAPI = strings.TrimSuffix(cloudAPI, "/")
path = cleanPath(path)
url := fmt.Sprintf("%s%s", cloudAPI, path)
var bodyReader io.Reader
var bodyWriter io.WriteCloser
if !opts.StreamRequest {
b := &bytes.Buffer{}
bodyReader, bodyWriter = b, nopCloseWriter{Writer: b}
} else {
bodyReader, bodyWriter = io.Pipe()
}
writeBody := func() error {
if opts.GzipCompress {
defer contract.IgnoreClose(bodyWriter)
gz := gzipPool.Get().(*gzip.Writer)
defer gzipPool.Put(gz)
gz.Reset(bodyWriter)
bodyWriter = gz
}
defer contract.IgnoreClose(bodyWriter)
_, err := body.WriteTo(bodyWriter)
return err
}
if opts.StreamRequest {
go func() {
err := writeBody()
contract.IgnoreError(err)
}()
} else {
if err := writeBody(); err != nil {
return "", nil, fmt.Errorf("writing body: %w", err)
}
}
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
return "", nil, fmt.Errorf("creating new HTTP request: %w", err)
}
req = req.WithContext(ctx)
req.Header.Set("Content-Type", "application/json")
// Set headers from the incoming options.
for k, v := range opts.Header {
req.Header[k] = v
}
// Add a User-Agent header to allow for the backend to make breaking API changes while preserving
// backwards compatibility.
req.Header.Set("User-Agent", UserAgent())
// Specify the specific API version we accept.
req.Header.Set("Accept", "application/vnd.pulumi+8")
// Apply credentials if provided.
creds, err := tok.Get(ctx)
if err != nil {
return "", nil, fmt.Errorf("fetching credentials: %w", err)
}
if creds != "" {
req.Header.Set("Authorization", fmt.Sprintf("%s %s", tok.Kind(), creds))
}
tracingOptions := tracing.OptionsFromContext(ctx)
if tracingOptions.PropagateSpans {
carrier := opentracing.HTTPHeadersCarrier(req.Header)
if err = requestSpan.Tracer().Inject(requestSpan.Context(), opentracing.HTTPHeaders, carrier); err != nil {
logging.Errorf("injecting tracing headers: %v", err)
}
}
if tracingOptions.TracingHeader != "" {
req.Header.Set("X-Pulumi-Tracing", tracingOptions.TracingHeader)
}
// Opt-in to accepting gzip-encoded responses from the service.
req.Header.Set("Accept-Encoding", "gzip")
if opts.GzipCompress {
// If we're sending something that's gzipped, set that header too.
req.Header.Set("Content-Encoding", "gzip")
}
logging.V(apiRequestLogLevel).Infof("Making Pulumi API call: %s", url)
if logging.V(apiRequestDetailLogLevel) {
logging.V(apiRequestDetailLogLevel).Infof(
"Pulumi API call details (%s): headers=%v", url, req.Header)
}
resp, err := client.Do(req, opts.RetryPolicy)
if err != nil {
// Don't wrap *apitype.ErrorResponse.
if _, ok := err.(*apitype.ErrorResponse); ok {
return "", nil, err
}
return "", nil, fmt.Errorf("performing HTTP request: %w", err)
}
logging.V(apiRequestLogLevel).Infof("Pulumi API call response code (%s): %v", url, resp.Status)
requestSpan.SetTag("responseCode", resp.Status)
if warningHeader, ok := resp.Header["X-Pulumi-Warning"]; ok {
for _, warning := range warningHeader {
d.Warningf(diag.RawMessage("", warning))
}
}
// Provide a better error if using an authenticated call without having logged in first.
if resp.StatusCode == 401 && tok.Kind() == accessTokenKindAPIToken && creds == "" {
return "", nil, errors.New("this command requires logging in; try running `pulumi login` first")
}
// Provide a better error if rate-limit is exceeded(429: Too Many Requests)
if resp.StatusCode == 429 {
return "", nil, errors.New("pulumi service: request rate-limit exceeded")
}
// For 4xx and 5xx failures, attempt to provide better diagnostics about what may have gone wrong.
if resp.StatusCode >= 400 && resp.StatusCode <= 599 {
// 4xx and 5xx responses should be of type ErrorResponse. See if we can unmarshal as that
// type, and if not just return the raw response text.
respBody, err := readBody(resp)
if err != nil {
return "", nil, fmt.Errorf("API call failed (%s), could not read response: %w", resp.Status, err)
}
return "", nil, decodeError(respBody, resp.StatusCode, opts)
}
return url, resp, nil
}
func decodeError(respBody []byte, statusCode int, opts httpCallOptions) error {
if opts.ErrorResponse != nil {
if err := json.Unmarshal(respBody, opts.ErrorResponse); err == nil {
return opts.ErrorResponse.(error)
}
}
var errResp apitype.ErrorResponse
if err := json.Unmarshal(respBody, &errResp); err != nil {
errResp.Code = statusCode
errResp.Message = strings.TrimSpace(string(respBody))
}
return &errResp
}
// restClient is an abstraction for calling the Pulumi REST API.
type restClient interface {
Call(ctx context.Context, diag diag.Sink, cloudAPI, method, path string, queryObj, reqObj,
respObj interface{}, tok accessToken, opts httpCallOptions) error
}
// defaultRESTClient is the default implementation for calling the Pulumi REST API.
type defaultRESTClient struct {
client httpClient
}
type countingWriter struct {
w io.Writer
n int64
}
func (w *countingWriter) Write(p []byte) (int, error) {
n, err := w.w.Write(p)
w.n += int64(n)
return n, err
}
type marshalerTo struct {
req any
}
func (m marshalerTo) WriteTo(w io.Writer) (int64, error) {
if m.req == nil {
return 0, nil
}
cw := countingWriter{w: w}
err := json.NewEncoder(&cw).Encode(m.req)
return cw.n, err
}
// Call calls the Pulumi REST API marshalling reqObj to JSON and using that as
// the request body (use nil for GETs), and if successful, marshalling the responseObj
// as JSON and storing it in respObj (use nil for NoContent). The error return type might
// be an instance of apitype.ErrorResponse, in which case will have the response code.
func (c *defaultRESTClient) Call(ctx context.Context, diag diag.Sink, cloudAPI, method, path string, queryObj, reqObj,
respObj interface{}, tok accessToken, opts httpCallOptions,
) error {
requestSpan, ctx := opentracing.StartSpanFromContext(ctx, getEndpointName(method, path),
opentracing.Tag{Key: "method", Value: method},
opentracing.Tag{Key: "path", Value: path},
opentracing.Tag{Key: "api", Value: cloudAPI},
opentracing.Tag{Key: "retry", Value: opts.RetryPolicy.String()})
defer requestSpan.Finish()
// Compute query string from query object
querystring := ""
if queryObj != nil {
queryValues, err := query.Values(queryObj)
if err != nil {
return fmt.Errorf("marshalling query object as JSON: %w", err)
}
query := queryValues.Encode()
if len(query) > 0 {
querystring = "?" + query
}
}
var reqBody io.WriterTo
if raw, ok := reqObj.(json.RawMessage); ok {
// Send verbatim if already marshalled. This is
// important when sending indented JSON is needed.
reqBody = bytes.NewReader([]byte(raw))
} else {
// Otherwise, marshal the request as JSON.
reqBody = marshalerTo{req: reqObj}
}
// Make API call
url, resp, err := pulumiAPICall(
ctx, requestSpan, diag, c.client, cloudAPI, method, path+querystring, reqBody, tok, opts)
if err != nil {
return err
}
if respPtr, ok := respObj.(**http.Response); ok {
*respPtr = resp
return nil
}
// Read API response
respBody, err := readBody(resp)
if err != nil {
return fmt.Errorf("reading response from API: %w", err)
}
if logging.V(apiRequestDetailLogLevel) {
logging.V(apiRequestDetailLogLevel).Infof("Pulumi API call response body (%s): %v", url, string(respBody))
}
if respObj != nil {
switch respObj := respObj.(type) {
case *[]byte:
// Return the raw bytes of the response body.
*respObj = respBody
case []byte:
return errors.New("Can't unmarshal response body to []byte. Try *[]byte")
default:
// Else, unmarshal as JSON.
if err = json.Unmarshal(respBody, respObj); err != nil {
return fmt.Errorf("unmarshalling response object: %w", err)
}
}
}
return nil
}
// readBody reads the contents of an http.Response into a byte array, returning an error if one occurred while in the
// process of doing so. readBody uses the Content-Encoding of the response to pick the correct reader to use.
func readBody(resp *http.Response) ([]byte, error) {
contentEncoding, ok := resp.Header["Content-Encoding"]
defer contract.IgnoreClose(resp.Body)
if !ok {
// No header implies that there's no additional encoding on this response.
return io.ReadAll(resp.Body)
}
if len(contentEncoding) > 1 {
// We only know how to deal with gzip. We can't handle additional encodings layered on top of it.
return nil, fmt.Errorf("can't handle content encodings %v", contentEncoding)
}
switch contentEncoding[0] {
case "x-gzip":
// The HTTP/1.1 spec recommends we treat x-gzip as an alias of gzip.
fallthrough
case "gzip":
logging.V(apiRequestDetailLogLevel).Infoln("decompressing gzipped response from service")
reader, err := gzip.NewReader(resp.Body)
if reader != nil {
defer contract.IgnoreClose(reader)
}
if err != nil {
return nil, fmt.Errorf("reading gzip-compressed body: %w", err)
}
return io.ReadAll(reader)
default:
return nil, fmt.Errorf("unrecognized encoding %s", contentEncoding[0])
}
}