diff --git a/charts/vela-workflow/templates/workflow-controller.yaml b/charts/vela-workflow/templates/workflow-controller.yaml index 0dab479..a8c1c89 100644 --- a/charts/vela-workflow/templates/workflow-controller.yaml +++ b/charts/vela-workflow/templates/workflow-controller.yaml @@ -142,6 +142,8 @@ spec: - "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}" - "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}" - "--group-by-label={{ .Values.workflow.groupByLabel }}" + - "--enable-external-package-for-default-compiler={{- .Values.workflow.enableExternalPackageForDefaultCompiler | toString -}}" + - "--enable-external-package-watch-for-default-compiler={{- .Values.workflow.enableExternalPackageWatchForDefaultCompiler | toString -}}" {{ if .Values.backup.enable }} - "--backup-strategy={{ .Values.backup.strategy }}" - "--backup-ignore-strategy={{ .Values.backup.ignoreStrategy }}" diff --git a/charts/vela-workflow/values.yaml b/charts/vela-workflow/values.yaml index de62443..a14c82c 100644 --- a/charts/vela-workflow/values.yaml +++ b/charts/vela-workflow/values.yaml @@ -25,6 +25,8 @@ workflow: enableSuspendOnFailure: false enablePatchStatusAtOnce: false enableWatchEventListener: false + enableExternalPackageForDefaultCompiler: true + enableExternalPackageWatchForDefaultCompiler: false backoff: maxTime: waitState: 60 diff --git a/cmd/main.go b/cmd/main.go index 31c7459..d8ddfd1 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -55,6 +55,7 @@ import ( "github.com/kubevela/workflow/pkg/common" "github.com/kubevela/workflow/pkg/features" "github.com/kubevela/workflow/pkg/monitor/watcher" + "github.com/kubevela/workflow/pkg/providers" "github.com/kubevela/workflow/pkg/types" "github.com/kubevela/workflow/pkg/utils" "github.com/kubevela/workflow/pkg/webhook" @@ -122,6 +123,8 @@ func main() { flag.BoolVar(&backupCleanOnBackup, "backup-clean-on-backup", false, "Set the auto clean for backup workflow records, default is false") flag.StringVar(&backupConfigSecretName, "backup-config-secret-name", "backup-config", "Set the secret name for backup workflow configs, default is backup-config") flag.StringVar(&backupConfigSecretNamespace, "backup-config-secret-namespace", "vela-system", "Set the secret namespace for backup workflow configs, default is backup-config") + flag.BoolVar(&providers.EnableExternalPackageForDefaultCompiler, "enable-external-package-for-default-compiler", true, "Enable external package for default compiler") + flag.BoolVar(&providers.EnableExternalPackageWatchForDefaultCompiler, "enable-external-package-watch-for-default-compiler", false, "Enable external package watch for default compiler") multicluster.AddClusterGatewayClientFlags(flag.CommandLine) feature.DefaultMutableFeatureGate.AddFlag(flag.CommandLine) sharding.AddControllerFlags(flag.CommandLine) diff --git a/controllers/testdata/multi-suspend.yaml b/controllers/testdata/multi-suspend.yaml index ef99b69..2319172 100644 --- a/controllers/testdata/multi-suspend.yaml +++ b/controllers/testdata/multi-suspend.yaml @@ -9,7 +9,7 @@ spec: cue: template: | import ( - "vela/op" + "vela/builtin" ) - suspend1: op.#Suspend & {} - suspend2: op.#Suspend & {} + suspend1: builtin.#Suspend & {} + suspend2: builtin.#Suspend & {} diff --git a/controllers/testdata/suspend-and-deploy.yaml b/controllers/testdata/suspend-and-deploy.yaml index c5ef591..d712857 100644 --- a/controllers/testdata/suspend-and-deploy.yaml +++ b/controllers/testdata/suspend-and-deploy.yaml @@ -10,10 +10,10 @@ spec: template: | import ( "vela/kube" - "vela/op" + "vela/builtin" ) - suspend: op.#Suspend & {duration: "1s"} + suspend: builtin.#Suspend & {$params: duration: "1s"} output: kube.#Apply & { $params: { cluster: parameter.cluster @@ -41,8 +41,8 @@ spec: } } } - wait: op.#ConditionalWait & { - continue: output.$returns.value.status.readyReplicas == parameter.replicas + wait: builtin.#ConditionalWait & { + $params: continue: output.$returns.value.status.readyReplicas == parameter.replicas } parameter: { image: string diff --git a/controllers/testdata/test-apply.yaml b/controllers/testdata/test-apply.yaml index d9f890d..1622bfc 100644 --- a/controllers/testdata/test-apply.yaml +++ b/controllers/testdata/test-apply.yaml @@ -9,7 +9,7 @@ spec: template: | import ( "vela/kube" - "vela/op" + "vela/builtin" ) output: kube.#Apply & { @@ -41,9 +41,9 @@ spec: } } } - wait: op.#ConditionalWait & { + wait: builtin.#ConditionalWait & { if len(output.$returns.value.status) > 0 if output.$returns.value.status.readyReplicas == 1 { - continue: true + $params: continue: true } } parameter: { diff --git a/pkg/providers/builtin/workspace.cue b/pkg/providers/builtin/workspace.cue new file mode 100644 index 0000000..70dcf13 --- /dev/null +++ b/pkg/providers/builtin/workspace.cue @@ -0,0 +1,79 @@ +// workspace.cue + +#DoVar: { + #do: "var" + #provider: "builtin" + + $params: { + // +usage=The method to call on the variable + method: *"Get" | "Put" + // +usage=The path to the variable + path: string + // +usage=The value of the variable + value?: _ + } + + $returns?: { + // +usage=The value of the variable + value: _ + } +} + +#ConditionalWait: { + #do: "wait" + #provider: "builtin" + + $params: { + // +usage=If continue is false, the step will wait for continue to be true. + continue: *false | bool + // +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions. + message?: string + } +} + +#Suspend: { + #do: "suspend" + #provider: "builtin" + + $params: { + // +usage=Specify the wait duration time to resume automaticlly such as "30s", "1min" or "2m15s" + duration?: string + // +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions. + message?: string + } +} + +#Break: { + #do: "break" + #provider: "builtin" + + $params: { + // +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions. + message?: string + } +} + +#Fail: { + #do: "fail" + #provider: "builtin" + + $params: { + // +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions. + message?: string + } +} + +#Message: { + #do: "message" + #provider: "builtin" + + $params: { + // +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions. + message?: string + } +} + +#Steps: { + ... +} + diff --git a/pkg/providers/builtin/workspace.go b/pkg/providers/builtin/workspace.go new file mode 100644 index 0000000..702dd85 --- /dev/null +++ b/pkg/providers/builtin/workspace.go @@ -0,0 +1,215 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builtin + +import ( + "context" + _ "embed" + "encoding/json" + "fmt" + "strings" + "time" + + "cuelang.org/go/cue/cuecontext" + + cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime" + + "github.com/kubevela/workflow/api/v1alpha1" + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/errors" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +const ( + // ProviderName is provider name. + ProviderName = "builtin" + // ResumeTimeStamp is resume time stamp. + ResumeTimeStamp = "resumeTimeStamp" + // SuspendTimeStamp is suspend time stamp. + SuspendTimeStamp = "suspendTimeStamp" +) + +// VarVars . +type VarVars struct { + Method string `json:"method"` + Path string `json:"path"` + Value any `json:"value"` +} + +// VarReturnVars . +type VarReturnVars struct { + Value any `json:"value"` +} + +// VarReturns . +type VarReturns = providertypes.Returns[VarReturnVars] + +// VarParams . +type VarParams = providertypes.Params[VarVars] + +// DoVar get & put variable from context. +func DoVar(_ context.Context, params *VarParams) (*VarReturns, error) { + wfCtx := params.RuntimeParams.WorkflowContext + path := params.Params.Path + + switch params.Params.Method { + case "Get": + value, err := wfCtx.GetVar(strings.Split(path, ".")...) + if err != nil { + return nil, err + } + b, err := value.MarshalJSON() + if err != nil { + return nil, err + } + var v any + if err := json.Unmarshal(b, &v); err != nil { + return nil, err + } + return &VarReturns{ + Returns: VarReturnVars{ + Value: v, + }, + }, nil + case "Put": + b, err := json.Marshal(params.Params.Value) + if err != nil { + return nil, err + } + if err := wfCtx.SetVar(cuecontext.New().CompileBytes(b), strings.Split(path, ".")...); err != nil { + return nil, err + } + return nil, nil + } + return nil, nil +} + +// ActionVars . +type ActionVars struct { + Message string `json:"message,omitempty"` +} + +// ActionParams . +type ActionParams = providertypes.Params[ActionVars] + +// WaitVars . +type WaitVars struct { + Continue bool `json:"continue"` + ActionVars +} + +// WaitParams . +type WaitParams = providertypes.Params[WaitVars] + +// Wait let workflow wait. +func Wait(_ context.Context, params *WaitParams) (*any, error) { + if params.Params.Continue { + return nil, nil + } + params.Action.Wait(params.Params.Message) + return nil, errors.GenericActionError(errors.ActionWait) +} + +// Break let workflow terminate. +func Break(_ context.Context, params *ActionParams) (*any, error) { + params.Action.Terminate(params.Params.Message) + return nil, errors.GenericActionError(errors.ActionTerminate) +} + +// Fail let the step fail, its status is failed and reason is Action +func Fail(_ context.Context, params *ActionParams) (*any, error) { + params.Action.Fail(params.Params.Message) + return nil, errors.GenericActionError(errors.ActionTerminate) +} + +// SuspendVars . +type SuspendVars struct { + Duration string `json:"duration,omitempty"` + ActionVars +} + +// SuspendParams . +type SuspendParams = providertypes.Params[SuspendVars] + +// Suspend let the step suspend, its status is suspending and reason is Suspend +func Suspend(_ context.Context, params *SuspendParams) (*any, error) { + pCtx := params.ProcessContext + wfCtx := params.WorkflowContext + act := params.Action + stepID := fmt.Sprint(pCtx.GetData(model.ContextStepSessionID)) + timestamp := wfCtx.GetMutableValue(stepID, ResumeTimeStamp) + + var msg string + if msg == "" { + msg = fmt.Sprintf("Suspended by field %s", params.FieldLabel) + } + if timestamp != "" { + t, err := time.Parse(time.RFC3339, timestamp) + if err != nil { + return nil, fmt.Errorf("failed to parse timestamp %s: %w", timestamp, err) + } + if time.Now().After(t) { + act.Resume("") + return nil, nil + } + act.Suspend(msg) + return nil, errors.GenericActionError(errors.ActionSuspend) + } + if params.Params.Duration != "" { + d, err := time.ParseDuration(params.Params.Duration) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %w", params.Params.Duration, err) + } + wfCtx.SetMutableValue(time.Now().Add(d).Format(time.RFC3339), stepID, ResumeTimeStamp) + } + if ts := wfCtx.GetMutableValue(stepID, params.FieldLabel, SuspendTimeStamp); ts != "" { + if act.GetStatus().Phase == v1alpha1.WorkflowStepPhaseRunning { + // if it is already suspended before and has been resumed, we should not suspend it again. + return nil, nil + } + } else { + wfCtx.SetMutableValue(time.Now().Format(time.RFC3339), stepID, params.FieldLabel, SuspendTimeStamp) + } + act.Suspend(msg) + return nil, errors.GenericActionError(errors.ActionSuspend) +} + +// Message writes message to step status, note that the message will be overwritten by the next message. +func Message(_ context.Context, params *ActionParams) (*any, error) { + params.Action.Message(params.Params.Message) + return nil, nil +} + +//go:embed workspace.cue +var template string + +// GetTemplate returns the cue template. +func GetTemplate() string { + return template +} + +// GetProviders returns the cue providers. +func GetProviders() map[string]cuexruntime.ProviderFn { + return map[string]cuexruntime.ProviderFn{ + "wait": providertypes.GenericProviderFn[WaitVars, any](Wait), + "break": providertypes.GenericProviderFn[ActionVars, any](Break), + "fail": providertypes.GenericProviderFn[ActionVars, any](Fail), + "message": providertypes.GenericProviderFn[ActionVars, any](Message), + "var": providertypes.GenericProviderFn[VarVars, VarReturns](DoVar), + "suspend": providertypes.GenericProviderFn[SuspendVars, any](Suspend), + } +} diff --git a/pkg/providers/builtin/workspace_test.go b/pkg/providers/builtin/workspace_test.go new file mode 100644 index 0000000..99c13f6 --- /dev/null +++ b/pkg/providers/builtin/workspace_test.go @@ -0,0 +1,288 @@ +/* +Copyright 2022 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builtin + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/yaml" + + "github.com/kubevela/workflow/api/v1alpha1" + wfContext "github.com/kubevela/workflow/pkg/context" + "github.com/kubevela/workflow/pkg/cue/model" + "github.com/kubevela/workflow/pkg/cue/process" + "github.com/kubevela/workflow/pkg/errors" + providertypes "github.com/kubevela/workflow/pkg/providers/types" +) + +func TestProvider_DoVar(t *testing.T) { + wfCtx := newWorkflowContextForTest(t) + r := require.New(t) + ctx := context.Background() + + _, err := DoVar(ctx, &VarParams{ + Params: VarVars{ + Method: "Put", + Path: "clusterIP", + Value: "1.1.1.1", + }, + RuntimeParams: providertypes.RuntimeParams{ + WorkflowContext: wfCtx, + }, + }) + r.NoError(err) + varV, err := wfCtx.GetVar("clusterIP") + r.NoError(err) + s, err := varV.String() + r.NoError(err) + r.Equal(s, "1.1.1.1") + + res, err := DoVar(ctx, &VarParams{ + Params: VarVars{ + Method: "Get", + Path: "clusterIP", + }, + RuntimeParams: providertypes.RuntimeParams{ + WorkflowContext: wfCtx, + }, + }) + r.NoError(err) + r.Equal(res.Returns.Value, "1.1.1.1") +} + +func TestProvider_Wait(t *testing.T) { + ctx := context.Background() + r := require.New(t) + act := &mockAction{} + + _, err := Wait(ctx, &WaitParams{ + Params: WaitVars{ + Continue: false, + ActionVars: ActionVars{ + Message: "test log", + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + _, ok := err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.wait, true) + r.Equal(act.msg, "test log") + + act = &mockAction{} + _, err = Wait(ctx, &WaitParams{ + Params: WaitVars{ + Continue: true, + ActionVars: ActionVars{ + Message: "omit msg", + }, + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + r.NoError(err) + r.Equal(act.wait, false) + r.Equal(act.msg, "") +} + +func TestProvider_Break(t *testing.T) { + ctx := context.Background() + r := require.New(t) + act := &mockAction{} + _, err := Break(ctx, &ActionParams{ + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + _, ok := err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.terminate, true) + + act = &mockAction{} + _, err = Break(ctx, &ActionParams{ + Params: ActionVars{ + Message: "terminate", + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + _, ok = err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.terminate, true) + r.Equal(act.msg, "terminate") +} + +func TestProvider_Suspend(t *testing.T) { + wfCtx := newWorkflowContextForTest(t) + ctx := context.Background() + pCtx := process.NewContext(process.ContextData{}) + pCtx.PushData(model.ContextStepSessionID, "test-id") + r := require.New(t) + act := &mockAction{} + + params := &SuspendParams{ + Params: SuspendVars{ + Duration: "1s", + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + WorkflowContext: wfCtx, + ProcessContext: pCtx, + }, + } + _, err := Suspend(ctx, params) + _, ok := err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.suspend, true) + r.Equal(act.msg, "Suspended by field ") + // test second time to check if the suspend is resumed in 1s + _, err = Suspend(ctx, params) + _, ok = err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.suspend, true) + time.Sleep(time.Second) + _, err = Suspend(ctx, params) + r.NoError(err) + r.Equal(act.suspend, false) +} + +func TestProvider_Fail(t *testing.T) { + ctx := context.Background() + r := require.New(t) + act := &mockAction{} + _, err := Fail(ctx, &ActionParams{ + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + _, ok := err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.terminate, true) + + act = &mockAction{} + _, err = Fail(ctx, &ActionParams{ + Params: ActionVars{ + Message: "fail", + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + _, ok = err.(errors.GenericActionError) + r.Equal(ok, true) + r.Equal(act.terminate, true) + r.Equal(act.msg, "fail") +} + +func TestProvider_Message(t *testing.T) { + ctx := context.Background() + r := require.New(t) + act := &mockAction{} + _, err := Message(ctx, &ActionParams{ + Params: ActionVars{ + Message: "test", + }, + RuntimeParams: providertypes.RuntimeParams{ + Action: act, + }, + }) + r.NoError(err) + r.Equal(act.msg, "test") +} + +type mockAction struct { + suspend bool + terminate bool + wait bool + msg string +} + +func (act *mockAction) GetStatus() v1alpha1.StepStatus { + return v1alpha1.StepStatus{} +} + +func (act *mockAction) Suspend(msg string) { + act.suspend = true + if msg != "" { + act.msg = msg + } +} + +func (act *mockAction) Resume(msg string) { + act.suspend = false + if msg != "" { + act.msg = msg + } +} + +func (act *mockAction) Terminate(msg string) { + act.terminate = true + act.msg = msg +} + +func (act *mockAction) Wait(msg string) { + act.wait = true + if msg != "" { + act.msg = msg + } +} + +func (act *mockAction) Fail(msg string) { + act.terminate = true + if msg != "" { + act.msg = msg + } +} + +func (act *mockAction) Message(msg string) { + if msg != "" { + act.msg = msg + } +} + +func newWorkflowContextForTest(t *testing.T) wfContext.Context { + cm := corev1.ConfigMap{} + r := require.New(t) + testCaseJson, err := yaml.YAMLToJSON([]byte(testCaseYaml)) + r.NoError(err) + err = json.Unmarshal(testCaseJson, &cm) + r.NoError(err) + + wfCtx := new(wfContext.WorkflowContext) + err = wfCtx.LoadFromConfigMap(context.Background(), cm) + r.NoError(err) + return wfCtx +} + +var ( + testCaseYaml = `apiVersion: v1 +data: + test: "" +kind: ConfigMap +metadata: + name: app-v1 +` +) diff --git a/pkg/providers/compiler.go b/pkg/providers/compiler.go index 42a5219..5bb19ef 100644 --- a/pkg/providers/compiler.go +++ b/pkg/providers/compiler.go @@ -26,6 +26,7 @@ import ( kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/klog/v2" + "github.com/kubevela/workflow/pkg/providers/builtin" "github.com/kubevela/workflow/pkg/providers/email" "github.com/kubevela/workflow/pkg/providers/http" "github.com/kubevela/workflow/pkg/providers/kube" @@ -59,6 +60,7 @@ var compiler = singleton.NewSingletonE[*cuex.Compiler](func() (*cuex.Compiler, e runtime.Must(cuexruntime.NewInternalPackage("metrics", metrics.GetTemplate(), metrics.GetProviders())), runtime.Must(cuexruntime.NewInternalPackage("time", time.GetTemplate(), time.GetProviders())), runtime.Must(cuexruntime.NewInternalPackage("util", util.GetTemplate(), util.GetProviders())), + runtime.Must(cuexruntime.NewInternalPackage("builtin", builtin.GetTemplate(), builtin.GetProviders())), ), nil })