Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Matheus Cruz <[email protected]>
  • Loading branch information
mcruzdev committed Sep 14, 2024
1 parent 1b14679 commit bef381e
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 239 deletions.
243 changes: 7 additions & 236 deletions workspaces/controller/internal/controller/workspace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package controller
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"os"
"github.com/kubeflow/notebooks/workspaces/controller/internal/istio"
"reflect"
"strings"

Expand Down Expand Up @@ -55,8 +54,6 @@ const (
workspaceSelectorLabel = "statefulset"

// lengths for resource names
generateNameSuffixLength = 6
maxServiceNameLength = 63
maxStatefulSetNameLength = 52 // https://github.com/kubernetes/kubernetes/issues/64023

// state message formats for Workspace status
Expand All @@ -78,10 +75,6 @@ const (
stateMsgRunning = "Workspace is running"
stateMsgTerminating = "Workspace is terminating"
stateMsgUnknown = "Workspace is in an unknown state"

IstioHost = "ISTIO_HOST"
IstioGateway = "ISTIO_GATEWAY"
ClusterDomain = "CLUSTER_DOMAIN"
)

// WorkspaceReconciler reconciles a Workspace object
Expand Down Expand Up @@ -352,7 +345,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// TODO: reconcile the Istio VirtualService to expose the Workspace
// and implement the `spec.podTemplate.httpProxy` options
//
virtualService, err := GenerateIstioVirtualService(workspace, workspaceKind, currentImageConfig, serviceName, log)
virtualService, err := istio.GenerateIstioVirtualService(workspace, workspaceKind, currentImageConfig, serviceName, log)
if err != nil {
log.Error(err, "unable to generate Istio Virtual Service")
}
Expand All @@ -362,7 +355,7 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, err
}

if err := ReconcileVirtualService(ctx, r.Client, virtualService.GetName(), virtualService.GetNamespace(), virtualService, log); err != nil {
if err := istio.ReconcileVirtualService(ctx, r.Client, virtualService.GetName(), virtualService.GetNamespace(), virtualService, log); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -574,25 +567,10 @@ func getPodConfig(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefl
}
}

// generateNamePrefix generates a name prefix for a Workspace
// the format is "ws-{WORKSPACE_NAME}-" the workspace name is truncated to fit within the max length
func generateNamePrefix(workspaceName string, maxLength int) string {
namePrefix := fmt.Sprintf("ws-%s", workspaceName)
maxLength = maxLength - generateNameSuffixLength // subtract 6 for the `metadata.generateName` suffix
maxLength = maxLength - 1 // subtract 1 for the trailing "-"
if len(namePrefix) > maxLength {
namePrefix = namePrefix[:min(len(namePrefix), maxLength)]
}
if namePrefix[len(namePrefix)-1] != '-' {
namePrefix = namePrefix + "-"
}
return namePrefix
}

// generateStatefulSet generates a StatefulSet for a Workspace
func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefloworgv1beta1.WorkspaceKind, imageConfigSpec kubefloworgv1beta1.ImageConfigSpec, podConfigSpec kubefloworgv1beta1.PodConfigSpec) (*appsv1.StatefulSet, error) {
// generate name prefix
namePrefix := generateNamePrefix(workspace.Name, maxStatefulSetNameLength)
namePrefix := helper.GenerateNamePrefix(workspace.Name, maxStatefulSetNameLength)

// generate replica count
replicas := int32(1)
Expand All @@ -613,7 +591,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
// define go string template functions
// NOTE: these are used in places like the `extraEnv` values
containerPortsIdMap := make(map[string]kubefloworgv1beta1.ImagePort)
httpPathPrefixFunc := generateHttpPathPrefixFunc(workspace, containerPortsIdMap)
httpPathPrefixFunc := helper.GenerateHttpPathPrefixFunc(workspace, containerPortsIdMap)

// generate container ports
containerPorts := make([]corev1.ContainerPort, len(imageConfigSpec.Ports))
Expand All @@ -639,7 +617,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
env := env.DeepCopy() // copy to avoid modifying the original
if env.Value != "" {
rawValue := env.Value
outValue, err := helper.RenderExtraEnvValueTemplate(rawValue, httpPathPrefixFunc)
outValue, err := helper.RenderValueUsingFunc(rawValue, httpPathPrefixFunc)
if err != nil {
return nil, fmt.Errorf("failed to render extraEnv %q: %w", env.Name, err)
}
Expand Down Expand Up @@ -815,21 +793,10 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
return statefulSet, nil
}

func generateHttpPathPrefixFunc(workspace *kubefloworgv1beta1.Workspace, containerPortsIdMap map[string]kubefloworgv1beta1.ImagePort) func(portId string) string {
return func(portId string) string {
port, ok := containerPortsIdMap[portId]
if ok {
return fmt.Sprintf("/workspace/%s/%s/%s/", workspace.Namespace, workspace.Name, port.Id)
} else {
return ""
}
}
}

// generateService generates a Service for a Workspace
func generateService(workspace *kubefloworgv1beta1.Workspace, imageConfigSpec kubefloworgv1beta1.ImageConfigSpec) (*corev1.Service, error) {
// generate name prefix
namePrefix := generateNamePrefix(workspace.Name, maxServiceNameLength)
namePrefix := helper.GenerateNamePrefix(workspace.Name, helper.MaxServiceNameLength)

// generate service ports
servicePorts := make([]corev1.ServicePort, len(imageConfigSpec.Ports))
Expand Down Expand Up @@ -1026,199 +993,3 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
status.StateMessage = stateMsgUnknown
return status, nil
}

const istioApiVersion = "networking.istio.io/v1"
const virtualServiceKind = "VirtualService"

func GenerateIstioVirtualService(workspace *kubefloworgv1beta1.Workspace, workspaceKind *kubefloworgv1beta1.WorkspaceKind, imageConfig *kubefloworgv1beta1.ImageConfigValue, serviceName string, _ logr.Logger) (*unstructured.Unstructured, error) {

virtualService := &unstructured.Unstructured{}
virtualService.SetAPIVersion(istioApiVersion)
virtualService.SetKind(virtualServiceKind)

prefix := generateNamePrefix(workspace.Name, maxServiceNameLength)
virtualService.SetName(removeTrailingDash(prefix))
virtualService.SetNamespace(workspace.Namespace)

// .spec.gateways
istioGateway := getEnvOrDefault(IstioGateway, "kubeflow/kubeflow-gateway")
if err := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioGateway},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.gateways error: %v", err)
}

istioHost := getEnvOrDefault(IstioHost, "*")
if err := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioHost},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.hosts error: %v", err)
}

var prefixes []string
for _, imagePort := range imageConfig.Spec.Ports {
prefix := fmt.Sprintf("/workspace/%s/%s/%s", workspace.Namespace, workspace.Name, imagePort.Id)
prefixes = append(prefixes, prefix)
}

var httpRoutes []interface{}

host := fmt.Sprintf("%s.%s.svc.%s", serviceName, workspace.Namespace, getEnvOrDefault(ClusterDomain, "cluster.local"))

// generate container ports
// TODO: It can be better
containerPortsIdMap, err := generateContainerPortsIdMap(imageConfig)
if errContainerPorts := unstructured.SetNestedStringSlice(virtualService.Object, []string{istioHost},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("set .spec.hosts error: %v", errContainerPorts)
}
httpPathPrefixFunc := generateHttpPathPrefixFunc(workspace, containerPortsIdMap)

for _, imagePort := range imageConfig.Spec.Ports {

httpRoute := map[string]interface{}{
"match": []map[string]interface{}{
{
"uri": map[string]interface{}{
"prefix": fmt.Sprintf("/workspace/%s/%s/%s", workspace.Namespace, workspace.Name, imagePort.Id),
},
},
},
"route": []map[string]interface{}{
{
"destination": map[string]interface{}{
"host": host,
"port": map[string]interface{}{
"number": imagePort.Port,
},
},
},
},
}

if *workspaceKind.Spec.PodTemplate.HTTPProxy.RemovePathPrefix {
httpRoute["rewrite"] = map[string]interface{}{"uri": "/"}
}

// templating.spec.http[].math.headers
setHeaders := templateHeaders(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Set, httpPathPrefixFunc)
addHeaders := templateHeaders(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Add, httpPathPrefixFunc)

removeHeaders := make([]string, len(workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Remove))
for i, header := range workspaceKind.Spec.PodTemplate.HTTPProxy.RequestHeaders.Remove {
if header != "" {
out, err := helper.RenderExtraEnvValueTemplate(header, httpPathPrefixFunc)
if err != nil {
return nil, fmt.Errorf("failed to render header %q: %w", header, err)
}
header = out
}
removeHeaders[i] = header
}

httpRoute["headers"] = map[string]interface{}{
"request": map[string]interface{}{
"add": setHeaders,
"set": addHeaders,
"remove": removeHeaders,
},
}

httpRoutes = append(httpRoutes, httpRoute)
}

virtualService.Object["spec"] = map[string]interface{}{
"gateways": []string{
istioGateway,
},
"hosts": []string{
istioHost,
},
"http": httpRoutes,
}

return virtualService, nil
}

func templateHeaders(requestHeaders map[string]string, httpPathPrefixFunc func(portId string) string) map[string]string {

if len(requestHeaders) == 0 {
return make(map[string]string, 0)
}

headers := make(map[string]string, len(requestHeaders))
for _, header := range requestHeaders {
value := headers[header]
if value != "" {
out, err := helper.RenderExtraEnvValueTemplate(header, httpPathPrefixFunc)
if err != nil {
return make(map[string]string, 0)
}
value = out
}
headers[header] = value
}
return headers
}

func generateContainerPortsIdMap(imageConfig *kubefloworgv1beta1.ImageConfigValue) (map[string]kubefloworgv1beta1.ImagePort, error) {
containerPortsIdMap := make(map[string]kubefloworgv1beta1.ImagePort)

containerPorts := make([]corev1.ContainerPort, len(imageConfig.Spec.Ports))
seenPorts := make(map[int32]bool)
for i, port := range imageConfig.Spec.Ports {
if seenPorts[port.Port] {
return nil, fmt.Errorf("duplicate port number %d in imageConfig", port.Port)
}
containerPorts[i] = corev1.ContainerPort{
Name: fmt.Sprintf("http-%d", port.Port),
ContainerPort: port.Port,
Protocol: corev1.ProtocolTCP,
}
seenPorts[port.Port] = true
containerPortsIdMap[port.Id] = port
}
return containerPortsIdMap, nil
}

func getEnvOrDefault(name, defaultValue string) string {
if lookupEnv, exists := os.LookupEnv(name); exists {
return lookupEnv
} else {
return defaultValue
}
}

func ReconcileVirtualService(ctx context.Context, r client.Client, virtualServiceName, namespace string, virtualService *unstructured.Unstructured, log logr.Logger) error {
foundVirtualService := &unstructured.Unstructured{}
foundVirtualService.SetAPIVersion(istioApiVersion)
foundVirtualService.SetKind(virtualServiceKind)
justCreated := false
if err := r.Get(ctx, types.NamespacedName{Name: virtualServiceName, Namespace: namespace}, foundVirtualService); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", namespace, "name", virtualServiceName)
if err := r.Create(ctx, virtualService); err != nil {
log.Error(err, "unable to create virtual service")
return err
}
justCreated = true
} else {
log.Error(err, "error getting virtual service")
return err
}
}
if !justCreated {
log.Info("Updating virtual service", "namespace", namespace, "name", virtualServiceName)
if err := r.Update(ctx, foundVirtualService); err != nil {
log.Error(err, "unable to update virtual service")
return err
}
}

return nil
}

func removeTrailingDash(s string) string {
if len(s) > 0 && s[len(s)-1] == '-' {
return s[:len(s)-1]
}
return s
}
Loading

0 comments on commit bef381e

Please sign in to comment.