From d03276298550467deca4ea67e42a0aad3191e94e Mon Sep 17 00:00:00 2001 From: Angus Lees <gus@inodes.org> Date: Thu, 18 May 2017 13:53:39 +1000 Subject: [PATCH] Implement "update" subcommand - Support the standard client-go/kubectl flags - Switch object types to use client-go rather than apimachinery - Implement update subcommand --- cmd/root.go | 103 +++++++++++++++++++++++++++++++++++++++++++--- cmd/show.go | 14 +++---- cmd/update.go | 85 ++++++++++++++++++++++++++++++++++++++ utils/acquire.go | 34 ++++++++-------- utils/client.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 308 insertions(+), 32 deletions(-) create mode 100644 cmd/update.go create mode 100644 utils/client.go diff --git a/cmd/root.go b/cmd/root.go index 619f5d88..1cf55ed9 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,6 +1,8 @@ package cmd import ( + "bytes" + "encoding/json" goflag "flag" "fmt" "os" @@ -9,14 +11,34 @@ import ( "github.com/golang/glog" "github.com/spf13/cobra" jsonnet "github.com/strickyak/jsonnet_cgo" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/tools/clientcmd" "github.com/backsplice/kubecfg/utils" ) +const ( + flagJpath = "jpath" +) + +var clientConfig clientcmd.ClientConfig + func init() { - RootCmd.PersistentFlags().String("context", "", "The name of the kubeconfig context to use") - RootCmd.PersistentFlags().StringP("jpath", "J", "", "Additional jsonnet library search path") + RootCmd.PersistentFlags().StringP(flagJpath, "J", "", "Additional jsonnet library search path") + + // The "usual" clientcmd/kubectl flags + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig + overrides := clientcmd.ConfigOverrides{} + kflags := clientcmd.RecommendedConfigOverrideFlags("") + RootCmd.PersistentFlags().StringVar(&loadingRules.ExplicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster") + clientcmd.BindOverrideFlags(&overrides, RootCmd.PersistentFlags(), kflags) + clientConfig = clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin) + + // Standard goflags (glog in particular) RootCmd.PersistentFlags().AddGoFlagSet(goflag.CommandLine) RootCmd.PersistentFlags().Set("logtostderr", "true") } @@ -45,7 +67,7 @@ func JsonnetVM(cmd *cobra.Command) (*jsonnet.VM, error) { vm.JpathAdd(p) } - jpath, err := flags.GetString("jpath") + jpath, err := flags.GetString(flagJpath) if err != nil { return nil, err } @@ -57,14 +79,14 @@ func JsonnetVM(cmd *cobra.Command) (*jsonnet.VM, error) { return vm, nil } -func readObjs(cmd *cobra.Command, paths []string) ([]metav1.Object, error) { +func readObjs(cmd *cobra.Command, paths []string) ([]*runtime.Unstructured, error) { vm, err := JsonnetVM(cmd) if err != nil { return nil, err } defer vm.Destroy() - res := []metav1.Object{} + res := []*runtime.Unstructured{} for _, path := range paths { objs, err := utils.Read(vm, path) if err != nil { @@ -74,3 +96,72 @@ func readObjs(cmd *cobra.Command, paths []string) ([]metav1.Object, error) { } return res, nil } + +// For debugging +func dumpJSON(v interface{}) string { + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + enc.SetIndent("", " ") + if err := enc.Encode(v); err != nil { + return err.Error() + } + return string(buf.Bytes()) +} + +func restClientPool(cmd *cobra.Command) (dynamic.ClientPool, discovery.DiscoveryInterface, error) { + conf, err := clientConfig.ClientConfig() + if err != nil { + return nil, nil, err + } + + disco, err := discovery.NewDiscoveryClientForConfig(conf) + if err != nil { + return nil, nil, err + } + + discoCache := utils.NewMemcachedDiscoveryClient(disco) + mapper := discovery.NewDeferredDiscoveryRESTMapper(discoCache, dynamic.VersionInterfaces) + pathresolver := dynamic.LegacyAPIPathResolverFunc + + pool := dynamic.NewClientPool(conf, mapper, pathresolver) + return pool, discoCache, nil +} + +func serverResourceForGroupVersionKind(disco discovery.DiscoveryInterface, gvk unversioned.GroupVersionKind) (*unversioned.APIResource, error) { + resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + return nil, err + } + + for _, r := range resources.APIResources { + if r.Kind == gvk.Kind { + glog.V(4).Infof("Chose API '%s' for %s", r.Name, gvk) + return &r, nil + } + } + + return nil, fmt.Errorf("Server is unable to handle %s", gvk) +} + +func clientForResource(pool dynamic.ClientPool, disco discovery.DiscoveryInterface, obj *runtime.Unstructured, defNs string) (*dynamic.ResourceClient, error) { + gvk := obj.GroupVersionKind() + + client, err := pool.ClientForGroupVersionKind(gvk) + if err != nil { + return nil, err + } + + resource, err := serverResourceForGroupVersionKind(disco, gvk) + if err != nil { + return nil, err + } + + namespace := obj.GetNamespace() + if namespace == "" { + namespace = defNs + } + + glog.V(4).Infof("Fetching client for %s namespace=%s", resource, namespace) + rc := client.Resource(resource, namespace) + return rc, nil +} diff --git a/cmd/show.go b/cmd/show.go index 9735cb41..b99fb106 100644 --- a/cmd/show.go +++ b/cmd/show.go @@ -8,9 +8,13 @@ import ( "gopkg.in/yaml.v2" ) +const ( + flagFormat = "format" +) + func init() { RootCmd.AddCommand(showCmd) - showCmd.PersistentFlags().StringP("format", "o", "yaml", "Output format. Supported values are: json, yaml") + showCmd.PersistentFlags().StringP(flagFormat, "o", "yaml", "Output format. Supported values are: json, yaml") } var showCmd = &cobra.Command{ @@ -20,18 +24,12 @@ var showCmd = &cobra.Command{ flags := cmd.Flags() out := cmd.OutOrStdout() - vm, err := JsonnetVM(cmd) - if err != nil { - return err - } - defer vm.Destroy() - objs, err := readObjs(cmd, args) if err != nil { return err } - format, err := flags.GetString("format") + format, err := flags.GetString(flagFormat) if err != nil { return err } diff --git a/cmd/update.go b/cmd/update.go new file mode 100644 index 00000000..8ffc485d --- /dev/null +++ b/cmd/update.go @@ -0,0 +1,85 @@ +package cmd + +import ( + "encoding/json" + "fmt" + + "github.com/golang/glog" + "github.com/spf13/cobra" + "k8s.io/client-go/pkg/api" + "k8s.io/client-go/pkg/api/errors" + "k8s.io/client-go/pkg/runtime" + "k8s.io/client-go/pkg/util/diff" +) + +const ( + flagCreate = "create" +) + +func init() { + RootCmd.AddCommand(updateCmd) + updateCmd.PersistentFlags().Bool(flagCreate, true, "Create missing resources") +} + +var updateCmd = &cobra.Command{ + Use: "update", + Short: "Update Kubernetes resources with local config", + RunE: func(cmd *cobra.Command, args []string) error { + flags := cmd.Flags() + + create, err := flags.GetBool(flagCreate) + if err != nil { + return err + } + + objs, err := readObjs(cmd, args) + if err != nil { + return err + } + + clientpool, disco, err := restClientPool(cmd) + if err != nil { + return err + } + + defaultNs, _, err := clientConfig.Namespace() + if err != nil { + return err + } + + for _, obj := range objs { + desc := fmt.Sprintf("%s/%s", obj.GetKind(), fqName(obj)) + glog.Info("Updating ", desc) + + c, err := clientForResource(clientpool, disco, obj, defaultNs) + if err != nil { + return err + } + + asPatch, err := json.Marshal(obj) + if err != nil { + return err + } + newobj, err := c.Patch(obj.GetName(), api.MergePatchType, asPatch) + if create && errors.IsNotFound(err) { + glog.Info(" Creating non-existent ", desc) + newobj, err = c.Create(obj) + } + if err != nil { + // TODO: retry + return fmt.Errorf("Error updating %s: %s", desc, err) + } + + glog.V(2).Info("Updated object: ", diff.ObjectDiff(obj, newobj)) + } + + return nil + }, +} + +func fqName(o *runtime.Unstructured) string { + if o.GetNamespace() == "" { + return o.GetName() + } + return fmt.Sprintf("%s.%s", o.GetNamespace(), o.GetName()) +} diff --git a/utils/acquire.go b/utils/acquire.go index 67677bb3..1d564a11 100644 --- a/utils/acquire.go +++ b/utils/acquire.go @@ -10,16 +10,14 @@ import ( "github.com/golang/glog" jsonnet "github.com/strickyak/jsonnet_cgo" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - unstructuredv1 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/pkg/runtime" ) // Read fetches and decodes K8s objects by path. // TODO: Replace this with something supporting more sophisticated // content negotiation. -func Read(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) { +func Read(vm *jsonnet.VM, path string) ([]runtime.Object, error) { ext := filepath.Ext(path) if ext == ".json" { f, err := os.Open(path) @@ -42,21 +40,21 @@ func Read(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) { return nil, fmt.Errorf("Unknown file extension: %s", path) } -func jsonReader(r io.Reader) ([]runtime.Unstructured, error) { +func jsonReader(r io.Reader) ([]runtime.Object, error) { data, err := ioutil.ReadAll(r) if err != nil { return nil, err } - obj, _, err := unstructuredv1.UnstructuredJSONScheme.Decode(data, nil, nil) + obj, _, err := runtime.UnstructuredJSONScheme.Decode(data, nil, nil) if err != nil { return nil, err } - return []runtime.Unstructured{obj.(runtime.Unstructured)}, nil + return []runtime.Object{obj}, nil } -func yamlReader(r io.ReadCloser) ([]runtime.Unstructured, error) { +func yamlReader(r io.ReadCloser) ([]runtime.Object, error) { decoder := yaml.NewDocumentDecoder(r) - ret := []runtime.Unstructured{} + ret := []runtime.Object{} buf := []byte{} for { _, err := decoder.Read(buf) @@ -69,16 +67,16 @@ func yamlReader(r io.ReadCloser) ([]runtime.Unstructured, error) { if err != nil { return nil, err } - obj, _, err := unstructuredv1.UnstructuredJSONScheme.Decode(jsondata, nil, nil) + obj, _, err := runtime.UnstructuredJSONScheme.Decode(jsondata, nil, nil) if err != nil { return nil, err } - ret = append(ret, obj.(runtime.Unstructured)) + ret = append(ret, obj) } return ret, nil } -func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) { +func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Object, error) { jsonstr, err := vm.EvaluateFile(path) if err != nil { return nil, err @@ -90,17 +88,17 @@ func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) } // FlattenToV1 expands any List-type objects into their members, and -// cooerces everything to metav1.Objects. Panics if coercion +// cooerces everything to v1.Unstructured. Panics if coercion // encounters an unexpected object type. -func FlattenToV1(objs []runtime.Unstructured) []metav1.Object { - ret := make([]metav1.Object, 0, len(objs)) +func FlattenToV1(objs []runtime.Object) []*runtime.Unstructured { + ret := make([]*runtime.Unstructured, 0, len(objs)) for _, obj := range objs { switch o := obj.(type) { - case *unstructuredv1.UnstructuredList: + case *runtime.UnstructuredList: for _, item := range o.Items { - ret = append(ret, &item) + ret = append(ret, item) } - case *unstructuredv1.Unstructured: + case *runtime.Unstructured: ret = append(ret, o) default: panic("Unexpected unstructured object type") diff --git a/utils/client.go b/utils/client.go new file mode 100644 index 00000000..e718f13f --- /dev/null +++ b/utils/client.go @@ -0,0 +1,104 @@ +package utils + +import ( + "sync" + + "github.com/emicklei/go-restful/swagger" + "k8s.io/client-go/discovery" + "k8s.io/client-go/pkg/api/unversioned" + "k8s.io/client-go/pkg/version" + "k8s.io/client-go/rest" +) + +type memcachedDiscoveryClient struct { + cl discovery.DiscoveryInterface + lock sync.RWMutex + servergroups *unversioned.APIGroupList + serverresources map[string]*unversioned.APIResourceList + serverresourcesIsComplete bool +} + +// NewMemcachedDiscoveryClient creates a new DiscoveryClient that +// caches results in memory +func NewMemcachedDiscoveryClient(cl discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface { + c := &memcachedDiscoveryClient{cl: cl} + c.Invalidate() + return c +} + +func (c *memcachedDiscoveryClient) Fresh() bool { + return true +} + +func (c *memcachedDiscoveryClient) Invalidate() { + c.lock.Lock() + defer c.lock.Unlock() + + c.servergroups = nil + c.serverresources = make(map[string]*unversioned.APIResourceList) + c.serverresourcesIsComplete = false +} + +func (c *memcachedDiscoveryClient) RESTClient() rest.Interface { + return c.cl.RESTClient() +} + +func (c *memcachedDiscoveryClient) ServerGroups() (*unversioned.APIGroupList, error) { + c.lock.Lock() + defer c.lock.Unlock() + + var err error + if c.servergroups != nil { + return c.servergroups, nil + } + c.servergroups, err = c.cl.ServerGroups() + return c.servergroups, err +} + +func (c *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) { + c.lock.Lock() + defer c.lock.Unlock() + + var err error + if v := c.serverresources[groupVersion]; v != nil { + return v, nil + } + if c.serverresourcesIsComplete { + return &unversioned.APIResourceList{}, nil + } + c.serverresources[groupVersion], err = c.cl.ServerResourcesForGroupVersion(groupVersion) + return c.serverresources[groupVersion], err +} + +func (c *memcachedDiscoveryClient) ServerResources() (map[string]*unversioned.APIResourceList, error) { + c.lock.Lock() + defer c.lock.Unlock() + + var err error + if c.serverresourcesIsComplete { + return c.serverresources, nil + } + c.serverresources, err = c.cl.ServerResources() + if err == nil { + c.serverresourcesIsComplete = true + } + return c.serverresources, err +} + +func (c *memcachedDiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) { + return c.cl.ServerPreferredResources() +} + +func (c *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) { + return c.cl.ServerPreferredNamespacedResources() +} + +func (c *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) { + return c.cl.ServerVersion() +} + +func (c *memcachedDiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swagger.ApiDeclaration, error) { + return c.cl.SwaggerSchema(version) +} + +var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{} -- GitLab