Unverified Commit 8ad57968 authored by bryanl's avatar bryanl
Browse files

Fix up for merging objects



* Process was using wrong modified document
* Added support for using open api schema
* Merger now can retrieve original object
Signed-off-by: default avatarbryanl <bryanliles@gmail.com>
parent 55e2c310
......@@ -16,50 +16,57 @@
package cluster
import (
"encoding/json"
"github.com/ksonnet/ksonnet/pkg/metadata"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
type annotationEncoder interface {
EncodePristine(map[string]interface{}) error
// annotationCodec manages the contents of the annotation
// ksonnet adds to objects.
type annotationCodec interface {
Encode(map[string]interface{}) error
Decode() (map[string]interface{}, error)
Marshal() ([]byte, error)
}
// tagger tags an object with ksonnet metadata. Currently, the
// annotationApplier tags an object with ksonnet metadata. Currently, the
// metadata includes a label and an annotation with the object's
// state as generated by ksonnet.
type tagger interface {
Tag(*unstructured.Unstructured) error
type annotationApplier interface {
SetOriginalConfiguration(*unstructured.Unstructured) error
}
// defaultTagger is the default implementation of managed.
type defaultTagger struct {
annotationEncoder annotationEncoder
// defaultAnnotationApplier is the default implementation of annotationApplier.
type defaultAnnotationApplier struct {
codec annotationCodec
}
var _ tagger = (*defaultTagger)(nil)
var _ annotationApplier = (*defaultAnnotationApplier)(nil)
func newDefaultManaged() *defaultTagger {
return &defaultTagger{
annotationEncoder: &managedAnnotation{},
func newDefaultAnnotationApplier() *defaultAnnotationApplier {
return &defaultAnnotationApplier{
codec: &managedAnnotation{},
}
}
func (m *defaultTagger) Tag(obj *unstructured.Unstructured) error {
func (m *defaultAnnotationApplier) SetOriginalConfiguration(obj *unstructured.Unstructured) error {
if obj == nil {
return errors.New("object is nil")
}
if m.annotationEncoder == nil {
if m.codec == nil {
return errors.New("encoder is nil")
}
if err := m.annotationEncoder.EncodePristine(obj.Object); err != nil {
if err := m.codec.Encode(obj.Object); err != nil {
return err
}
mmEncoded, err := m.annotationEncoder.Marshal()
mmEncoded, err := m.codec.Marshal()
if err != nil {
return err
}
......@@ -69,3 +76,38 @@ func (m *defaultTagger) Tag(obj *unstructured.Unstructured) error {
return nil
}
func (m *defaultAnnotationApplier) GetOriginalConfiguration(mapping *meta.RESTMapping, obj runtime.Object) ([]byte, error) {
annotations, err := mapping.MetadataAccessor.Annotations(obj)
if err != nil {
return nil, err
}
if annotations == nil {
// This object might not have any annotations. This should
// not be an error.
return nil, nil
}
data, ok := annotations[metadata.AnnotationManaged]
if !ok {
return nil, nil
}
var annotation managedAnnotation
if err = json.Unmarshal([]byte(data), &annotation); err != nil {
return nil, errors.Wrap(err, "decoding ksonnet managed annotation")
}
pristineObject, err := annotation.Decode()
if err != nil {
return nil, err
}
b, err := json.Marshal(pristineObject)
if err != nil {
return nil, errors.Wrap(err, "encoding JSON")
}
return b, nil
}
......@@ -23,11 +23,11 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func Test_defaultTagger_Tag(t *testing.T) {
func Test_defaultAnnoationApplier_SetOriginalConfiguration(t *testing.T) {
cases := []struct {
name string
obj *unstructured.Unstructured
setup func(m *defaultTagger)
setup func(m *defaultAnnotationApplier)
isErr bool
}{
{
......@@ -45,8 +45,8 @@ func Test_defaultTagger_Tag(t *testing.T) {
obj: &unstructured.Unstructured{
Object: genObject(),
},
setup: func(m *defaultTagger) {
m.annotationEncoder = nil
setup: func(m *defaultAnnotationApplier) {
m.codec = nil
},
isErr: true,
},
......@@ -55,11 +55,11 @@ func Test_defaultTagger_Tag(t *testing.T) {
obj: &unstructured.Unstructured{
Object: genObject(),
},
setup: func(m *defaultTagger) {
fm := &fakeAnnotationEncoder{
err: errors.New("failure"),
setup: func(m *defaultAnnotationApplier) {
fm := &fakeAnnotationCodec{
encodeError: errors.New("failure"),
}
m.annotationEncoder = fm
m.codec = fm
},
isErr: true,
},
......@@ -68,11 +68,11 @@ func Test_defaultTagger_Tag(t *testing.T) {
obj: &unstructured.Unstructured{
Object: genObject(),
},
setup: func(m *defaultTagger) {
fm := &fakeAnnotationEncoder{
setup: func(m *defaultAnnotationApplier) {
fm := &fakeAnnotationCodec{
marshalError: errors.New("failure"),
}
m.annotationEncoder = fm
m.codec = fm
},
isErr: true,
},
......@@ -80,12 +80,12 @@ func Test_defaultTagger_Tag(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
m := newDefaultManaged()
m := newDefaultAnnotationApplier()
if tc.setup != nil {
tc.setup(m)
}
err := m.Tag(tc.obj)
err := m.SetOriginalConfiguration(tc.obj)
if tc.isErr {
require.Error(t, err)
return
......@@ -108,17 +108,24 @@ func Test_defaultTagger_Tag(t *testing.T) {
}
}
type fakeAnnotationEncoder struct {
err error
type fakeAnnotationCodec struct {
encodeError error
marshalBytes []byte
marshalError error
decodeObject map[string]interface{}
decodeError error
}
func (m *fakeAnnotationEncoder) EncodePristine(in map[string]interface{}) error {
return m.err
func (m *fakeAnnotationCodec) Encode(in map[string]interface{}) error {
return m.encodeError
}
func (m *fakeAnnotationEncoder) Marshal() ([]byte, error) {
func (m *fakeAnnotationCodec) Marshal() ([]byte, error) {
return m.marshalBytes, m.marshalError
}
func (m *fakeAnnotationCodec) Decode() (map[string]interface{}, error) {
return m.decodeObject, m.decodeError
}
......@@ -175,9 +175,9 @@ func (a *Apply) handleObject(obj *unstructured.Unstructured) (string, error) {
// preprocessObject preprocesses an object for it is applied to the cluster.
func (a *Apply) preprocessObject(obj *unstructured.Unstructured) error {
dm := newDefaultManaged()
aa := newDefaultAnnotationApplier()
if !a.DryRun {
return errors.Wrap(dm.Tag(obj), "tagging ksonnet managed object")
return errors.Wrap(aa.SetOriginalConfiguration(obj), "tagging ksonnet managed object")
}
log.Info("tagging ksonnet managed object", a.dryRunText())
......
......@@ -34,8 +34,8 @@ func (mm *managedAnnotation) Marshal() ([]byte, error) {
return json.Marshal(mm)
}
// EncodePristine encodes a pristine copy of the object.
func (mm *managedAnnotation) EncodePristine(m map[string]interface{}) error {
// Encode encodes a pristine copy of the object.
func (mm *managedAnnotation) Encode(m map[string]interface{}) error {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
......@@ -53,8 +53,8 @@ func (mm *managedAnnotation) EncodePristine(m map[string]interface{}) error {
return nil
}
// DecodePristine decodes a pristone copy of the object.
func (mm *managedAnnotation) DecodePristine() (map[string]interface{}, error) {
// Decode decodes a pristine copy of the object.
func (mm *managedAnnotation) Decode() (map[string]interface{}, error) {
b, err := base64.StdEncoding.DecodeString(mm.Pristine)
if err != nil {
return nil, err
......
......@@ -118,17 +118,17 @@ func (p *defaultObjectMerger) Merge(namespace string, obj *unstructured.Unstruct
info := infos[0]
modified, err := runtime.Encode(encoder, obj)
if err != nil {
return nil, errors.Wrap(err, "encode modified object")
}
if err = info.Get(); err != nil {
if !kerrors.IsNotFound(err) {
return nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving current configuration of:\n%v\nfrom server for:", info), info.Source, err)
}
}
modified, err := runtime.Encode(encoder, info.Object)
if err != nil {
return nil, errors.Wrap(err, "encode modified object")
}
helper := resource.NewHelper(info.Client, info.Mapping)
patcher := &patcher{
encoder: encoder,
......@@ -145,6 +145,15 @@ func (p *defaultObjectMerger) Merge(namespace string, obj *unstructured.Unstruct
gracePeriod: 0,
}
discoveryClient, err := p.factory.DiscoveryClient()
if err == nil {
openAPIGetter := openapi.NewOpenAPIGetter(discoveryClient)
resources, err := openAPIGetter.Get()
if err == nil {
patcher.openapiSchema = resources
}
}
patchBytes, patchedObject, err := patcher.patch(info.Object, modified, info.Source, info.Namespace, info.Name, os.Stderr)
if err != nil {
logrus.Debug("applying patch:\n%s\nto:\n%v\nfor:\n", patchBytes, info)
......@@ -160,7 +169,7 @@ func (p *defaultObjectMerger) Merge(namespace string, obj *unstructured.Unstruct
}
// stageInTempFile stages an object in a temp file. The file will have to be
// manaully removed once it is no longer needed.
// manually removed once it is no longer needed.
func (p *defaultObjectMerger) stageInTempFile(obj *unstructured.Unstructured) (*os.File, error) {
encoded, err := runtime.Encode(scheme.DefaultJSONEncoder(), obj)
if err != nil {
......@@ -169,11 +178,11 @@ func (p *defaultObjectMerger) stageInTempFile(obj *unstructured.Unstructured) (*
tmpfile, err := ioutil.TempFile("", "ksonnet-mergepatch")
if err != nil {
return nil, errors.Wrap(err, "creating tempfile")
return nil, errors.Wrap(err, "creating temporary file")
}
if _, err = tmpfile.Write(encoded); err != nil {
return nil, errors.Wrap(err, "writing tempfile")
return nil, errors.Wrap(err, "writing temporary file")
}
return tmpfile, nil
......@@ -207,8 +216,10 @@ func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, names
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("serializing current configuration from:\n%v\nfor:", obj), source, err)
}
t := newDefaultAnnotationApplier()
// Retrieve the original configuration of the object from the annotation.
original, err := kubectl.GetOriginalConfiguration(p.mapping, obj)
original, err := t.GetOriginalConfiguration(p.mapping, obj)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf("retrieving original configuration from:\n%v\nfor:", obj), source, err)
}
......@@ -260,6 +271,7 @@ func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, names
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
}
patch, err = strategicpatch.CreateThreeWayMergePatch(original, modified, current, lookupPatchMeta, p.overwrite)
if err != nil {
return nil, nil, cmdutil.AddSourceToErr(fmt.Sprintf(createPatchErrFormat, original, modified, current), source, err)
......@@ -272,6 +284,10 @@ func (p *patcher) patchSimple(obj runtime.Object, modified []byte, source, names
}
patchedObj, err := p.helper.Patch(namespace, name, patchType, patch)
if err != nil {
return nil, nil, errors.Wrap(err, "patching existing object")
}
return patch, patchedObj, err
}
......
......@@ -21,16 +21,21 @@ import (
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"testing"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/rest/fake"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
"k8s.io/kubernetes/pkg/kubectl/cmd/util/openapi"
"k8s.io/kubernetes/pkg/kubectl/scheme"
)
......@@ -62,7 +67,7 @@ func Test_merger_merge(t *testing.T) {
codec := legacyscheme.Codecs.LegacyCodec(scheme.Versions...)
servicePath := "/namespaces/test/services/service"
servicePath := "/namespaces/testing/services/service"
clusterService := &api.Service{
Spec: api.ServiceSpec{
......@@ -72,8 +77,6 @@ func Test_merger_merge(t *testing.T) {
},
}
var isPatched bool
tf.UnstructuredClient = &fake.RESTClient{
NegotiatedSerializer: unstructuredSerializer,
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
......@@ -85,16 +88,36 @@ func Test_merger_merge(t *testing.T) {
_, err := convertToObject(req.Body)
require.NoError(t, err)
isPatched = true
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(codec, clusterService)}, nil
default:
t.Fatalf("unexpected request: %#v\n%#v", req.URL, req)
t.Fatalf("unexpected request using unstructured client: %#v\n%#v", req.URL, req)
return nil, nil
}
}),
}
tf.Client = &fake.RESTClient{
Client: fake.CreateHTTPClient(func(req *http.Request) (*http.Response, error) {
switch p, m := req.URL.Path, req.Method; {
case p == "/openapi/v2" && m == "GET":
schemaPath := filepath.Join("testdata", "swagger.json")
f, err := os.Open(schemaPath)
require.NoError(t, err)
return &http.Response{StatusCode: 200, Body: f}, nil
default:
t.Fatalf("unexpected request using client: %#v\n%#v", req.URL, req)
return nil, errors.New("not found")
}
}),
}
tf.OpenAPISchemaFunc = func() (openapi.Resources, error) {
return nil, errors.New("not found")
}
tf.ClientConfigVal = &restclient.Config{}
om := newDefaultObjectMerger(tf)
obj := &unstructured.Unstructured{
......@@ -123,10 +146,8 @@ func Test_merger_merge(t *testing.T) {
},
}
_, err := om.Merge("test", obj)
_, err := om.Merge("testing", obj)
require.NoError(t, err)
require.True(t, isPatched)
}
type fakeObjectMerger struct {
......
......@@ -133,7 +133,7 @@ func RebuildObject(m map[string]interface{}) (map[string]interface{}, error) {
return nil, errors.WithStack(err)
}
return mm.DecodePristine()
return mm.Decode()
}
// CollectObjects collects objects in a cluster namespace.
......
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment