Skip to content
Snippets Groups Projects
Commit d0327629 authored by Angus Lees's avatar Angus Lees
Browse files

Implement "update" subcommand

- Support the standard client-go/kubectl flags
- Switch object types to use client-go rather than apimachinery
- Implement update subcommand
parent 85fd9f92
No related branches found
No related tags found
No related merge requests found
package cmd
import (
"bytes"
"encoding/json"
goflag "flag"
"fmt"
"os"
......@@ -9,14 +11,34 @@ import (
"github.com/golang/glog"
"github.com/spf13/cobra"
jsonnet "github.com/strickyak/jsonnet_cgo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/tools/clientcmd"
"github.com/backsplice/kubecfg/utils"
)
const (
flagJpath = "jpath"
)
var clientConfig clientcmd.ClientConfig
func init() {
RootCmd.PersistentFlags().String("context", "", "The name of the kubeconfig context to use")
RootCmd.PersistentFlags().StringP("jpath", "J", "", "Additional jsonnet library search path")
RootCmd.PersistentFlags().StringP(flagJpath, "J", "", "Additional jsonnet library search path")
// The "usual" clientcmd/kubectl flags
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
overrides := clientcmd.ConfigOverrides{}
kflags := clientcmd.RecommendedConfigOverrideFlags("")
RootCmd.PersistentFlags().StringVar(&loadingRules.ExplicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster")
clientcmd.BindOverrideFlags(&overrides, RootCmd.PersistentFlags(), kflags)
clientConfig = clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
// Standard goflags (glog in particular)
RootCmd.PersistentFlags().AddGoFlagSet(goflag.CommandLine)
RootCmd.PersistentFlags().Set("logtostderr", "true")
}
......@@ -45,7 +67,7 @@ func JsonnetVM(cmd *cobra.Command) (*jsonnet.VM, error) {
vm.JpathAdd(p)
}
jpath, err := flags.GetString("jpath")
jpath, err := flags.GetString(flagJpath)
if err != nil {
return nil, err
}
......@@ -57,14 +79,14 @@ func JsonnetVM(cmd *cobra.Command) (*jsonnet.VM, error) {
return vm, nil
}
func readObjs(cmd *cobra.Command, paths []string) ([]metav1.Object, error) {
func readObjs(cmd *cobra.Command, paths []string) ([]*runtime.Unstructured, error) {
vm, err := JsonnetVM(cmd)
if err != nil {
return nil, err
}
defer vm.Destroy()
res := []metav1.Object{}
res := []*runtime.Unstructured{}
for _, path := range paths {
objs, err := utils.Read(vm, path)
if err != nil {
......@@ -74,3 +96,72 @@ func readObjs(cmd *cobra.Command, paths []string) ([]metav1.Object, error) {
}
return res, nil
}
// For debugging
func dumpJSON(v interface{}) string {
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
enc.SetIndent("", " ")
if err := enc.Encode(v); err != nil {
return err.Error()
}
return string(buf.Bytes())
}
func restClientPool(cmd *cobra.Command) (dynamic.ClientPool, discovery.DiscoveryInterface, error) {
conf, err := clientConfig.ClientConfig()
if err != nil {
return nil, nil, err
}
disco, err := discovery.NewDiscoveryClientForConfig(conf)
if err != nil {
return nil, nil, err
}
discoCache := utils.NewMemcachedDiscoveryClient(disco)
mapper := discovery.NewDeferredDiscoveryRESTMapper(discoCache, dynamic.VersionInterfaces)
pathresolver := dynamic.LegacyAPIPathResolverFunc
pool := dynamic.NewClientPool(conf, mapper, pathresolver)
return pool, discoCache, nil
}
func serverResourceForGroupVersionKind(disco discovery.DiscoveryInterface, gvk unversioned.GroupVersionKind) (*unversioned.APIResource, error) {
resources, err := disco.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
return nil, err
}
for _, r := range resources.APIResources {
if r.Kind == gvk.Kind {
glog.V(4).Infof("Chose API '%s' for %s", r.Name, gvk)
return &r, nil
}
}
return nil, fmt.Errorf("Server is unable to handle %s", gvk)
}
func clientForResource(pool dynamic.ClientPool, disco discovery.DiscoveryInterface, obj *runtime.Unstructured, defNs string) (*dynamic.ResourceClient, error) {
gvk := obj.GroupVersionKind()
client, err := pool.ClientForGroupVersionKind(gvk)
if err != nil {
return nil, err
}
resource, err := serverResourceForGroupVersionKind(disco, gvk)
if err != nil {
return nil, err
}
namespace := obj.GetNamespace()
if namespace == "" {
namespace = defNs
}
glog.V(4).Infof("Fetching client for %s namespace=%s", resource, namespace)
rc := client.Resource(resource, namespace)
return rc, nil
}
......@@ -8,9 +8,13 @@ import (
"gopkg.in/yaml.v2"
)
const (
flagFormat = "format"
)
func init() {
RootCmd.AddCommand(showCmd)
showCmd.PersistentFlags().StringP("format", "o", "yaml", "Output format. Supported values are: json, yaml")
showCmd.PersistentFlags().StringP(flagFormat, "o", "yaml", "Output format. Supported values are: json, yaml")
}
var showCmd = &cobra.Command{
......@@ -20,18 +24,12 @@ var showCmd = &cobra.Command{
flags := cmd.Flags()
out := cmd.OutOrStdout()
vm, err := JsonnetVM(cmd)
if err != nil {
return err
}
defer vm.Destroy()
objs, err := readObjs(cmd, args)
if err != nil {
return err
}
format, err := flags.GetString("format")
format, err := flags.GetString(flagFormat)
if err != nil {
return err
}
......
package cmd
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/spf13/cobra"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/errors"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/util/diff"
)
const (
flagCreate = "create"
)
func init() {
RootCmd.AddCommand(updateCmd)
updateCmd.PersistentFlags().Bool(flagCreate, true, "Create missing resources")
}
var updateCmd = &cobra.Command{
Use: "update",
Short: "Update Kubernetes resources with local config",
RunE: func(cmd *cobra.Command, args []string) error {
flags := cmd.Flags()
create, err := flags.GetBool(flagCreate)
if err != nil {
return err
}
objs, err := readObjs(cmd, args)
if err != nil {
return err
}
clientpool, disco, err := restClientPool(cmd)
if err != nil {
return err
}
defaultNs, _, err := clientConfig.Namespace()
if err != nil {
return err
}
for _, obj := range objs {
desc := fmt.Sprintf("%s/%s", obj.GetKind(), fqName(obj))
glog.Info("Updating ", desc)
c, err := clientForResource(clientpool, disco, obj, defaultNs)
if err != nil {
return err
}
asPatch, err := json.Marshal(obj)
if err != nil {
return err
}
newobj, err := c.Patch(obj.GetName(), api.MergePatchType, asPatch)
if create && errors.IsNotFound(err) {
glog.Info(" Creating non-existent ", desc)
newobj, err = c.Create(obj)
}
if err != nil {
// TODO: retry
return fmt.Errorf("Error updating %s: %s", desc, err)
}
glog.V(2).Info("Updated object: ", diff.ObjectDiff(obj, newobj))
}
return nil
},
}
func fqName(o *runtime.Unstructured) string {
if o.GetNamespace() == "" {
return o.GetName()
}
return fmt.Sprintf("%s.%s", o.GetNamespace(), o.GetName())
}
......@@ -10,16 +10,14 @@ import (
"github.com/golang/glog"
jsonnet "github.com/strickyak/jsonnet_cgo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unstructuredv1 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/pkg/runtime"
)
// Read fetches and decodes K8s objects by path.
// TODO: Replace this with something supporting more sophisticated
// content negotiation.
func Read(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) {
func Read(vm *jsonnet.VM, path string) ([]runtime.Object, error) {
ext := filepath.Ext(path)
if ext == ".json" {
f, err := os.Open(path)
......@@ -42,21 +40,21 @@ func Read(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) {
return nil, fmt.Errorf("Unknown file extension: %s", path)
}
func jsonReader(r io.Reader) ([]runtime.Unstructured, error) {
func jsonReader(r io.Reader) ([]runtime.Object, error) {
data, err := ioutil.ReadAll(r)
if err != nil {
return nil, err
}
obj, _, err := unstructuredv1.UnstructuredJSONScheme.Decode(data, nil, nil)
obj, _, err := runtime.UnstructuredJSONScheme.Decode(data, nil, nil)
if err != nil {
return nil, err
}
return []runtime.Unstructured{obj.(runtime.Unstructured)}, nil
return []runtime.Object{obj}, nil
}
func yamlReader(r io.ReadCloser) ([]runtime.Unstructured, error) {
func yamlReader(r io.ReadCloser) ([]runtime.Object, error) {
decoder := yaml.NewDocumentDecoder(r)
ret := []runtime.Unstructured{}
ret := []runtime.Object{}
buf := []byte{}
for {
_, err := decoder.Read(buf)
......@@ -69,16 +67,16 @@ func yamlReader(r io.ReadCloser) ([]runtime.Unstructured, error) {
if err != nil {
return nil, err
}
obj, _, err := unstructuredv1.UnstructuredJSONScheme.Decode(jsondata, nil, nil)
obj, _, err := runtime.UnstructuredJSONScheme.Decode(jsondata, nil, nil)
if err != nil {
return nil, err
}
ret = append(ret, obj.(runtime.Unstructured))
ret = append(ret, obj)
}
return ret, nil
}
func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error) {
func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Object, error) {
jsonstr, err := vm.EvaluateFile(path)
if err != nil {
return nil, err
......@@ -90,17 +88,17 @@ func jsonnetReader(vm *jsonnet.VM, path string) ([]runtime.Unstructured, error)
}
// FlattenToV1 expands any List-type objects into their members, and
// cooerces everything to metav1.Objects. Panics if coercion
// cooerces everything to v1.Unstructured. Panics if coercion
// encounters an unexpected object type.
func FlattenToV1(objs []runtime.Unstructured) []metav1.Object {
ret := make([]metav1.Object, 0, len(objs))
func FlattenToV1(objs []runtime.Object) []*runtime.Unstructured {
ret := make([]*runtime.Unstructured, 0, len(objs))
for _, obj := range objs {
switch o := obj.(type) {
case *unstructuredv1.UnstructuredList:
case *runtime.UnstructuredList:
for _, item := range o.Items {
ret = append(ret, &item)
ret = append(ret, item)
}
case *unstructuredv1.Unstructured:
case *runtime.Unstructured:
ret = append(ret, o)
default:
panic("Unexpected unstructured object type")
......
package utils
import (
"sync"
"github.com/emicklei/go-restful/swagger"
"k8s.io/client-go/discovery"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/pkg/version"
"k8s.io/client-go/rest"
)
type memcachedDiscoveryClient struct {
cl discovery.DiscoveryInterface
lock sync.RWMutex
servergroups *unversioned.APIGroupList
serverresources map[string]*unversioned.APIResourceList
serverresourcesIsComplete bool
}
// NewMemcachedDiscoveryClient creates a new DiscoveryClient that
// caches results in memory
func NewMemcachedDiscoveryClient(cl discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface {
c := &memcachedDiscoveryClient{cl: cl}
c.Invalidate()
return c
}
func (c *memcachedDiscoveryClient) Fresh() bool {
return true
}
func (c *memcachedDiscoveryClient) Invalidate() {
c.lock.Lock()
defer c.lock.Unlock()
c.servergroups = nil
c.serverresources = make(map[string]*unversioned.APIResourceList)
c.serverresourcesIsComplete = false
}
func (c *memcachedDiscoveryClient) RESTClient() rest.Interface {
return c.cl.RESTClient()
}
func (c *memcachedDiscoveryClient) ServerGroups() (*unversioned.APIGroupList, error) {
c.lock.Lock()
defer c.lock.Unlock()
var err error
if c.servergroups != nil {
return c.servergroups, nil
}
c.servergroups, err = c.cl.ServerGroups()
return c.servergroups, err
}
func (c *memcachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*unversioned.APIResourceList, error) {
c.lock.Lock()
defer c.lock.Unlock()
var err error
if v := c.serverresources[groupVersion]; v != nil {
return v, nil
}
if c.serverresourcesIsComplete {
return &unversioned.APIResourceList{}, nil
}
c.serverresources[groupVersion], err = c.cl.ServerResourcesForGroupVersion(groupVersion)
return c.serverresources[groupVersion], err
}
func (c *memcachedDiscoveryClient) ServerResources() (map[string]*unversioned.APIResourceList, error) {
c.lock.Lock()
defer c.lock.Unlock()
var err error
if c.serverresourcesIsComplete {
return c.serverresources, nil
}
c.serverresources, err = c.cl.ServerResources()
if err == nil {
c.serverresourcesIsComplete = true
}
return c.serverresources, err
}
func (c *memcachedDiscoveryClient) ServerPreferredResources() ([]unversioned.GroupVersionResource, error) {
return c.cl.ServerPreferredResources()
}
func (c *memcachedDiscoveryClient) ServerPreferredNamespacedResources() ([]unversioned.GroupVersionResource, error) {
return c.cl.ServerPreferredNamespacedResources()
}
func (c *memcachedDiscoveryClient) ServerVersion() (*version.Info, error) {
return c.cl.ServerVersion()
}
func (c *memcachedDiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swagger.ApiDeclaration, error) {
return c.cl.SwaggerSchema(version)
}
var _ discovery.CachedDiscoveryInterface = &memcachedDiscoveryClient{}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment