Unverified Commit 97a91b52 authored by bryanl's avatar bryanl
Browse files

Retry apply if conflict is detected



When applying objects to a cluster, there a small chance that a
conflict could arise. Instead of failing instantly, sleep for one
second, and retry up to five times.

Fixes #619
Signed-off-by: default avatarbryanl <bryanliles@gmail.com>
parent befbc540
// Copyright 2018 The ksonnet authors
//
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
......
......@@ -16,8 +16,6 @@
package clicmd
import (
"fmt"
"github.com/ksonnet/ksonnet/pkg/actions"
"github.com/ksonnet/ksonnet/pkg/app"
"github.com/ksonnet/ksonnet/pkg/client"
......@@ -107,7 +105,6 @@ func newApplyCmd(a app.App) *cobra.Command {
actions.OptionSkipGc: viper.GetBool(vApplySkipGc),
}
fmt.Println("extract jsonnet flag")
if err := extractJsonnetFlags(a, "apply"); err != nil {
return errors.Wrap(err, "handle jsonnet flags")
}
......
......@@ -16,12 +16,9 @@
package cluster
import (
"bytes"
"compress/gzip"
"encoding/base64"
"encoding/json"
"fmt"
"sort"
"time"
"github.com/ksonnet/ksonnet/pkg/app"
"github.com/ksonnet/ksonnet/pkg/client"
......@@ -34,58 +31,24 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
kdiff "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/sets"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
const (
appKsonnet = "ksonnet"
)
type managedMetadata struct {
Pristine string `json:"pristine,omitempty"`
}
func (mm *managedMetadata) EncodePristine(m map[string]interface{}) error {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if err := json.NewEncoder(gz).Encode(m); err != nil {
return err
}
if err := gz.Flush(); err != nil {
return err
}
if err := gz.Close(); err != nil {
return err
}
mm.Pristine = base64.StdEncoding.EncodeToString(buf.Bytes())
return nil
}
func (mm *managedMetadata) DecodePristine() (map[string]interface{}, error) {
b, err := base64.StdEncoding.DecodeString(mm.Pristine)
if err != nil {
return nil, err
}
r := bytes.NewReader(b)
// applyConflictRetryCount sets how many times an apply is retried before giving up
// after a conflict error is detected.
applyConflictRetryCount = 5
zr, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
defer zr.Close()
// defaultConflictTimeout sets the wait time before retrying after a conflict is detected.
defaultConflictTimeout = 1 * time.Second
var m map[string]interface{}
if err := json.NewDecoder(zr).Decode(&m); err != nil {
return nil, err
}
appKsonnet = "ksonnet"
)
return m, nil
}
var (
errApplyConflict = errors.Errorf("apply conflict detected; retried %d times", applyConflictRetryCount)
)
// ApplyConfig is configuration for Apply.
type ApplyConfig struct {
......@@ -109,24 +72,54 @@ type Apply struct {
// these make it easier to test Apply.
findObjectsFn findObjectsFn
resourceClientFactory resourceClientFactoryFn
genClientOptsFn genClientOptsFn
clientOpts *clientOpts
objectInfo ObjectInfo
ksonnetObjectFactory func() ksonnetObject
upserterFactory func() Upserter
conflictTimeout time.Duration
}
// RunApply runs apply against a cluster given a configuration.
func RunApply(config ApplyConfig, opts ...ApplyOpts) error {
if config.ClientConfig == nil {
return errors.New("ksonnet client config is required")
}
a := &Apply{
ApplyConfig: config,
findObjectsFn: findObjects,
resourceClientFactory: resourceClientFactory,
genClientOptsFn: genClientOpts,
objectInfo: &objectInfo{},
ksonnetObjectFactory: func() ksonnetObject {
factory := cmdutil.NewFactory(config.ClientConfig.Config)
return newDefaultKsonnetObject(factory)
},
conflictTimeout: 1 * time.Second,
}
for _, opt := range opts {
opt(a)
}
if a.clientOpts == nil {
co, err := genClientOpts(a.App, a.ClientConfig, a.EnvName)
if err != nil {
return err
}
a.clientOpts = &co
}
if a.upserterFactory == nil {
u, err := newDefaultUpserter(a.ApplyConfig, a.objectInfo, *a.clientOpts, a.resourceClientFactory)
if err != nil {
return errors.Wrap(err, "creating upserter")
}
a.upserterFactory = func() Upserter {
return u
}
}
return a.Apply()
}
......@@ -141,14 +134,9 @@ func (a *Apply) Apply() error {
seenUids := sets.NewString()
co, err := a.genClientOptsFn(a.App, a.ClientConfig, a.EnvName)
if err != nil {
return err
}
for _, obj := range apiObjects {
var uid string
uid, err = a.handleObject(co, obj)
uid, err = a.handleObject(obj)
if err != nil {
return errors.Wrap(err, "handle object")
}
......@@ -162,7 +150,7 @@ func (a *Apply) Apply() error {
}
if a.GcTag != "" && !a.SkipGc {
if err = a.runGc(co, seenUids); err != nil {
if err = a.runGc(seenUids); err != nil {
return errors.Wrap(err, "run gc")
}
}
......@@ -170,95 +158,69 @@ func (a *Apply) Apply() error {
return nil
}
func (a *Apply) handleObject(co clientOpts, obj *unstructured.Unstructured) (string, error) {
if err := tagManaged(obj); err != nil {
return "", errors.Wrap(err, "tagging ksonnet managed object")
func (a *Apply) handleObject(obj *unstructured.Unstructured) (string, error) {
if err := a.preprocessObject(obj); err != nil {
return "", errors.Wrap(err, "preprocessing object before apply")
}
factory := cmdutil.NewFactory(a.ClientConfig.Config)
m := newObjectMerger(factory)
mergedObject, err := m.merge(co.namespace, obj)
mergedObject, err := a.patchFromCluster(obj)
if err != nil {
cause := errors.Cause(err)
if !kerrors.IsNotFound(cause) {
return "", errors.Wrap(cause, "merging object with existing state")
}
mergedObject = obj
}
if a.GcTag != "" {
SetMetaDataAnnotation(mergedObject, metadata.AnnotationGcTag, a.GcTag)
return "", errors.Wrap(err, "patching object from cluster")
}
desc := fmt.Sprintf("%s %s", a.objectInfo.ResourceName(co.discovery, mergedObject), utils.FqName(obj))
log.Info("Updating ", desc, a.dryRunText())
a.setupGC(mergedObject)
rc, err := a.resourceClientFactory(co, mergedObject)
if err != nil {
return "", err
}
return a.upsert(mergedObject)
}
asPatch, err := json.Marshal(mergedObject)
if err != nil {
return "", err
}
// preprocessObject preprocesses an object for it is applied to the cluster.
func (a *Apply) preprocessObject(obj *unstructured.Unstructured) error {
dm := newDefaultManaged()
return errors.Wrap(dm.Tag(obj), "tagging ksonnet managed object")
}
var newobj metav1.Object
if !a.DryRun {
newobj, err = rc.Patch(types.MergePatchType, asPatch)
log.Debugf("Patch(%s) returned (%v, %v)", obj.GetName(), newobj, err)
} else {
newobj, err = rc.Get(metav1.GetOptions{})
}
// patchFromCluster patches an object with values that may exist in the cluster.
func (a *Apply) patchFromCluster(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return a.ksonnetObjectFactory().MergeFromCluster(*a.clientOpts, obj)
}
if a.Create && kerrors.IsNotFound(err) {
log.Info(" Creating non-existent ", desc, a.dryRunText())
if !a.DryRun {
newobj, err = rc.Create()
log.Debugf("Create(%s) returned (%v, %v)", obj.GetName(), newobj, err)
} else {
newobj = mergedObject
err = nil
}
}
if err != nil {
// TODO: retry
return "", errors.Wrapf(err, "can't update %s", desc)
}
func (a *Apply) upsert(obj *unstructured.Unstructured) (string, error) {
u := a.upserterFactory()
log.Debug("Updated object: ", kdiff.ObjectDiff(obj, newobj))
for i := applyConflictRetryCount; i > 0; i-- {
uid, err := u.Upsert(obj)
if err != nil {
cause := errors.Cause(err)
if !kerrors.IsConflict(cause) {
return "", err
}
return string(newobj.GetUID()), nil
}
time.Sleep(a.conflictTimeout)
continue
}
func tagManaged(obj *unstructured.Unstructured) error {
if obj == nil {
return errors.New("object is nil")
return uid, nil
}
mm := &managedMetadata{}
if err := mm.EncodePristine(obj.Object); err != nil {
return err
}
return "", errApplyConflict
}
mmEncoded, err := json.Marshal(mm)
if err != nil {
return err
// setupGC setups ksonnet's garbage collection process for objects.
func (a *Apply) setupGC(obj *unstructured.Unstructured) {
if a.GcTag != "" {
SetMetaDataAnnotation(obj, metadata.AnnotationGcTag, a.GcTag)
}
SetMetaDataLabel(obj, metadata.LabelDeployManager, appKsonnet)
SetMetaDataAnnotation(obj, metadata.AnnotationManaged, string(mmEncoded))
return nil
}
func (a *Apply) runGc(co clientOpts, seenUids sets.String) error {
func (a *Apply) runGc(seenUids sets.String) error {
co := a.clientOpts
version, err := utils.FetchVersion(co.discovery)
if err != nil {
return err
}
err = walkObjects(co, metav1.ListOptions{}, func(o runtime.Object) error {
err = walkObjects(*co, metav1.ListOptions{}, func(o runtime.Object) error {
var metav1Object metav1.Object
metav1Object, err = meta.Accessor(o)
if err != nil {
......@@ -271,7 +233,7 @@ func (a *Apply) runGc(co clientOpts, seenUids sets.String) error {
if eligibleForGc(metav1Object, a.GcTag) && !seenUids.Has(string(metav1Object.GetUID())) {
log.Info("Garbage collecting ", desc, a.dryRunText())
if !a.DryRun {
err = gcDelete(co, a.resourceClientFactory, &version, o)
err = gcDelete(*co, a.resourceClientFactory, &version, o)
if err != nil {
return err
}
......
......@@ -18,29 +18,107 @@ package cluster
import (
"testing"
"github.com/stretchr/testify/require"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/ksonnet/ksonnet/pkg/app"
amocks "github.com/ksonnet/ksonnet/pkg/app/mocks"
"github.com/ksonnet/ksonnet/pkg/client"
"github.com/ksonnet/ksonnet/pkg/util/test"
"github.com/pkg/errors"
"github.com/spf13/afero"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func Test_tagManaged(t *testing.T) {
obj := &unstructured.Unstructured{
Object: genObject(),
type conflictError struct{}
var _ kerrors.APIStatus = (*conflictError)(nil)
var _ error = (*notFoundError)(nil)
func (e *conflictError) Status() metav1.Status {
return metav1.Status{
Reason: metav1.StatusReasonConflict,
}
}
func (e *conflictError) Error() string {
return "conflict"
}
func Test_Apply(t *testing.T) {
test.WithApp(t, "/app", func(a *amocks.App, fs afero.Fs) {
applyConfig := ApplyConfig{
App: a,
ClientConfig: &client.Config{},
}
setupApp := func(apply *Apply) {
obj := &unstructured.Unstructured{Object: genObject()}
apply.clientOpts = &clientOpts{}
apply.findObjectsFn = func(a app.App, envName string, componentNames []string) ([]*unstructured.Unstructured, error) {
objects := []*unstructured.Unstructured{obj}
return objects, nil
}
apply.ksonnetObjectFactory = func() ksonnetObject {
return &fakeKsonnetObject{
obj: obj,
}
}
apply.upserterFactory = func() Upserter {
return &fakeUpserter{
upsertID: "12345",
}
}
}
err := RunApply(applyConfig, setupApp)
require.NoError(t, err)
})
}
func Test_Apply_retry_on_conflict(t *testing.T) {
test.WithApp(t, "/app", func(a *amocks.App, fs afero.Fs) {
applyConfig := ApplyConfig{
App: a,
ClientConfig: &client.Config{},
}
setupApp := func(apply *Apply) {
obj := &unstructured.Unstructured{Object: genObject()}
apply.clientOpts = &clientOpts{}
apply.findObjectsFn = func(a app.App, envName string, componentNames []string) ([]*unstructured.Unstructured, error) {
objects := []*unstructured.Unstructured{obj}
err := tagManaged(obj)
require.NoError(t, err)
return objects, nil
}
metadata, ok := obj.Object["metadata"].(map[string]interface{})
require.True(t, ok)
apply.ksonnetObjectFactory = func() ksonnetObject {
return &fakeKsonnetObject{
obj: obj,
}
}
annotations, ok := metadata["annotations"].(map[string]interface{})
require.True(t, ok)
apply.upserterFactory = func() Upserter {
return &fakeUpserter{
upsertErr: &conflictError{},
}
}
managed, ok := annotations["ksonnet.io/managed"].(string)
require.True(t, ok)
apply.conflictTimeout = 0
}
expected := "{\"pristine\":\"H4sIAAAAAAAA/1yPzWrsMAyF9/cxzjrzk93F6z5AV92UWSiJSE1iSdhKYQh+9+IMlNCV8RH6vqMdZPGDc4kqCCCzcvvuB3bq0WGJMiHgjW3VZ2JxdEjsNJETwg4SUSePKqV9l6Ii7Neot2lL6YmA11s7CCVGwLzFrOotKcZj28psaxypIPQdnJOt5NwGZ9NKA6+HhMzOnBNoVHGKwrkgfO6IieZDOebW6IvNo16OtNyWcpk3Lj6oLpeJk4b7tV38p2YH0+wv3i/+XbMj/L/XR33UWuu/HwAAAP//AQAA///Dx6kERQEAAA==\"}"
require.Equal(t, expected, managed)
err := RunApply(applyConfig, setupApp)
cause := errors.Cause(err)
require.Equal(t, errApplyConflict, cause)
})
}
func genObject() map[string]interface{} {
......
package cluster
import (
"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)
// ksonnetObject can merge an object with its cluster state. This is required because
// some fields will be overwritten if applied again (e.g. Server NodePort).
type ksonnetObject interface {
MergeFromCluster(co clientOpts, obj *unstructured.Unstructured) (*unstructured.Unstructured, error)
}
type defaultKsonnetObject struct {
objectMerger objectMerger
}
var _ ksonnetObject = (*defaultKsonnetObject)(nil)
func newDefaultKsonnetObject(factory cmdutil.Factory) *defaultKsonnetObject {
merger := newDefaultObjectMerger(factory)
return &defaultKsonnetObject{
objectMerger: merger,
}
}
func (ko *defaultKsonnetObject) MergeFromCluster(co clientOpts, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
mergedObject, err := ko.objectMerger.Merge(co.namespace, obj)
if err != nil {
cause := errors.Cause(err)
if !kerrors.IsNotFound(cause) {
return nil, errors.Wrap(cause, "merging object with existing state")
}
mergedObject = obj
}
return mergedObject, nil
}
package cluster
import (
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
)
type notFoundError struct{}
var _ kerrors.APIStatus = (*notFoundError)(nil)
var _ error = (*notFoundError)(nil)
func (e *notFoundError) Status() metav1.Status {
return metav1.Status{
Reason: metav1.StatusReasonNotFound,
}
}
func (e *notFoundError) Error() string {
return "not found"
}
func Test_defaultKsonnetObject_MergeFromCluster(t *testing.T) {
sampleObj := &unstructured.Unstructured{
Object: genObject(),
}
cases := []struct {
name string
obj *unstructured.Unstructured
expected *unstructured.Unstructured
objectMerger *fakeObjectMerger
isErr bool
}{
{
name: "merge object",
obj: sampleObj,
objectMerger: &fakeObjectMerger{
mergeObj: sampleObj,
},
expected: sampleObj,
},
{
name: "unexpected error",
obj: sampleObj,
objectMerger: &fakeObjectMerger{
mergeErr: errors.Errorf("failed"),
},
isErr: true,
},
{
name: "object not found",
obj: sampleObj,
objectMerger: &fakeObjectMerger{
mergeErr: &notFoundError{},
},
expected: sampleObj,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
factory := cmdtesting.NewTestFactory()
defer factory.Cleanup()
co := clientOpts{}
ko := newDefaultKsonnetObject(factory)
ko.objectMerger = tc.objectMerger
merged, err := ko.MergeFromCluster(co, tc.obj)
if tc.isErr {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.expected, merged)
})
}
}
type fakeKsonnetObject struct {
obj *unstructured.Unstructured
err error
}
var _ (ksonnetObject) = (*fakeKsonnetObject)(nil)
func (ko *fakeKsonnetObject) MergeFromCluster(co clientOpts, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
return ko.obj, ko.err
}