Skip to content

Commit

Permalink
Feat: add builtin providers and fix helm (#192)
Browse files Browse the repository at this point in the history
* feat: add builtin providers and fix helm

Signed-off-by: FogDong <[email protected]>

* fix: fix lint

Signed-off-by: FogDong <[email protected]>

---------

Signed-off-by: FogDong <[email protected]>
  • Loading branch information
FogDong committed Aug 14, 2024
1 parent 9d55737 commit 1fa0042
Show file tree
Hide file tree
Showing 10 changed files with 601 additions and 10 deletions.
2 changes: 2 additions & 0 deletions charts/vela-workflow/templates/workflow-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ spec:
- "--feature-gates=EnableSuspendOnFailure={{- .Values.workflow.enableSuspendOnFailure | toString -}}"
- "--feature-gates=EnableBackupWorkflowRecord={{- .Values.backup.enabled | toString -}}"
- "--group-by-label={{ .Values.workflow.groupByLabel }}"
- "--enable-external-package-for-default-compiler={{- .Values.workflow.enableExternalPackageForDefaultCompiler | toString -}}"
- "--enable-external-package-watch-for-default-compiler={{- .Values.workflow.enableExternalPackageWatchForDefaultCompiler | toString -}}"
{{ if .Values.backup.enable }}
- "--backup-strategy={{ .Values.backup.strategy }}"
- "--backup-ignore-strategy={{ .Values.backup.ignoreStrategy }}"
Expand Down
2 changes: 2 additions & 0 deletions charts/vela-workflow/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ workflow:
enableSuspendOnFailure: false
enablePatchStatusAtOnce: false
enableWatchEventListener: false
enableExternalPackageForDefaultCompiler: true
enableExternalPackageWatchForDefaultCompiler: false
backoff:
maxTime:
waitState: 60
Expand Down
3 changes: 3 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/kubevela/workflow/pkg/common"
"github.com/kubevela/workflow/pkg/features"
"github.com/kubevela/workflow/pkg/monitor/watcher"
"github.com/kubevela/workflow/pkg/providers"
"github.com/kubevela/workflow/pkg/types"
"github.com/kubevela/workflow/pkg/utils"
"github.com/kubevela/workflow/pkg/webhook"
Expand Down Expand Up @@ -122,6 +123,8 @@ func main() {
flag.BoolVar(&backupCleanOnBackup, "backup-clean-on-backup", false, "Set the auto clean for backup workflow records, default is false")
flag.StringVar(&backupConfigSecretName, "backup-config-secret-name", "backup-config", "Set the secret name for backup workflow configs, default is backup-config")
flag.StringVar(&backupConfigSecretNamespace, "backup-config-secret-namespace", "vela-system", "Set the secret namespace for backup workflow configs, default is backup-config")
flag.BoolVar(&providers.EnableExternalPackageForDefaultCompiler, "enable-external-package-for-default-compiler", true, "Enable external package for default compiler")
flag.BoolVar(&providers.EnableExternalPackageWatchForDefaultCompiler, "enable-external-package-watch-for-default-compiler", false, "Enable external package watch for default compiler")
multicluster.AddClusterGatewayClientFlags(flag.CommandLine)
feature.DefaultMutableFeatureGate.AddFlag(flag.CommandLine)
sharding.AddControllerFlags(flag.CommandLine)
Expand Down
6 changes: 3 additions & 3 deletions controllers/testdata/multi-suspend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
cue:
template: |
import (
"vela/op"
"vela/builtin"
)
suspend1: op.#Suspend & {}
suspend2: op.#Suspend & {}
suspend1: builtin.#Suspend & {}
suspend2: builtin.#Suspend & {}
8 changes: 4 additions & 4 deletions controllers/testdata/suspend-and-deploy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ spec:
template: |
import (
"vela/kube"
"vela/op"
"vela/builtin"
)
suspend: op.#Suspend & {duration: "1s"}
suspend: builtin.#Suspend & {$params: duration: "1s"}
output: kube.#Apply & {
$params: {
cluster: parameter.cluster
Expand Down Expand Up @@ -41,8 +41,8 @@ spec:
}
}
}
wait: op.#ConditionalWait & {
continue: output.$returns.value.status.readyReplicas == parameter.replicas
wait: builtin.#ConditionalWait & {
$params: continue: output.$returns.value.status.readyReplicas == parameter.replicas
}
parameter: {
image: string
Expand Down
6 changes: 3 additions & 3 deletions controllers/testdata/test-apply.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ spec:
template: |
import (
"vela/kube"
"vela/op"
"vela/builtin"
)
output: kube.#Apply & {
Expand Down Expand Up @@ -41,9 +41,9 @@ spec:
}
}
}
wait: op.#ConditionalWait & {
wait: builtin.#ConditionalWait & {
if len(output.$returns.value.status) > 0 if output.$returns.value.status.readyReplicas == 1 {
continue: true
$params: continue: true
}
}
parameter: {
Expand Down
79 changes: 79 additions & 0 deletions pkg/providers/builtin/workspace.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// workspace.cue

#DoVar: {
#do: "var"
#provider: "builtin"

$params: {
// +usage=The method to call on the variable
method: *"Get" | "Put"
// +usage=The path to the variable
path: string
// +usage=The value of the variable
value?: _
}

$returns?: {
// +usage=The value of the variable
value: _
}
}

#ConditionalWait: {
#do: "wait"
#provider: "builtin"

$params: {
// +usage=If continue is false, the step will wait for continue to be true.
continue: *false | bool
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Suspend: {
#do: "suspend"
#provider: "builtin"

$params: {
// +usage=Specify the wait duration time to resume automaticlly such as "30s", "1min" or "2m15s"
duration?: string
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Break: {
#do: "break"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Fail: {
#do: "fail"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Message: {
#do: "message"
#provider: "builtin"

$params: {
// +usage=Optional message that will be shown in workflow step status, note that the message might be override by other actions.
message?: string
}
}

#Steps: {
...
}

215 changes: 215 additions & 0 deletions pkg/providers/builtin/workspace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
Copyright 2022 The KubeVela Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package builtin

import (
"context"
_ "embed"
"encoding/json"
"fmt"
"strings"
"time"

"cuelang.org/go/cue/cuecontext"

cuexruntime "github.com/kubevela/pkg/cue/cuex/runtime"

"github.com/kubevela/workflow/api/v1alpha1"
"github.com/kubevela/workflow/pkg/cue/model"
"github.com/kubevela/workflow/pkg/errors"
providertypes "github.com/kubevela/workflow/pkg/providers/types"
)

const (
// ProviderName is provider name.
ProviderName = "builtin"
// ResumeTimeStamp is resume time stamp.
ResumeTimeStamp = "resumeTimeStamp"
// SuspendTimeStamp is suspend time stamp.
SuspendTimeStamp = "suspendTimeStamp"
)

// VarVars .
type VarVars struct {
Method string `json:"method"`
Path string `json:"path"`
Value any `json:"value"`
}

// VarReturnVars .
type VarReturnVars struct {
Value any `json:"value"`
}

// VarReturns .
type VarReturns = providertypes.Returns[VarReturnVars]

// VarParams .
type VarParams = providertypes.Params[VarVars]

// DoVar get & put variable from context.
func DoVar(_ context.Context, params *VarParams) (*VarReturns, error) {
wfCtx := params.RuntimeParams.WorkflowContext
path := params.Params.Path

switch params.Params.Method {
case "Get":
value, err := wfCtx.GetVar(strings.Split(path, ".")...)
if err != nil {
return nil, err
}
b, err := value.MarshalJSON()
if err != nil {
return nil, err
}
var v any
if err := json.Unmarshal(b, &v); err != nil {
return nil, err
}
return &VarReturns{
Returns: VarReturnVars{
Value: v,
},
}, nil
case "Put":
b, err := json.Marshal(params.Params.Value)
if err != nil {
return nil, err
}
if err := wfCtx.SetVar(cuecontext.New().CompileBytes(b), strings.Split(path, ".")...); err != nil {
return nil, err
}
return nil, nil
}
return nil, nil
}

// ActionVars .
type ActionVars struct {
Message string `json:"message,omitempty"`
}

// ActionParams .
type ActionParams = providertypes.Params[ActionVars]

// WaitVars .
type WaitVars struct {
Continue bool `json:"continue"`
ActionVars
}

// WaitParams .
type WaitParams = providertypes.Params[WaitVars]

// Wait let workflow wait.
func Wait(_ context.Context, params *WaitParams) (*any, error) {
if params.Params.Continue {
return nil, nil
}
params.Action.Wait(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionWait)
}

// Break let workflow terminate.
func Break(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Terminate(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionTerminate)
}

// Fail let the step fail, its status is failed and reason is Action
func Fail(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Fail(params.Params.Message)
return nil, errors.GenericActionError(errors.ActionTerminate)
}

// SuspendVars .
type SuspendVars struct {
Duration string `json:"duration,omitempty"`
ActionVars
}

// SuspendParams .
type SuspendParams = providertypes.Params[SuspendVars]

// Suspend let the step suspend, its status is suspending and reason is Suspend
func Suspend(_ context.Context, params *SuspendParams) (*any, error) {
pCtx := params.ProcessContext
wfCtx := params.WorkflowContext
act := params.Action
stepID := fmt.Sprint(pCtx.GetData(model.ContextStepSessionID))
timestamp := wfCtx.GetMutableValue(stepID, ResumeTimeStamp)

var msg string
if msg == "" {
msg = fmt.Sprintf("Suspended by field %s", params.FieldLabel)
}
if timestamp != "" {
t, err := time.Parse(time.RFC3339, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to parse timestamp %s: %w", timestamp, err)
}
if time.Now().After(t) {
act.Resume("")
return nil, nil
}
act.Suspend(msg)
return nil, errors.GenericActionError(errors.ActionSuspend)
}
if params.Params.Duration != "" {
d, err := time.ParseDuration(params.Params.Duration)
if err != nil {
return nil, fmt.Errorf("failed to parse duration %s: %w", params.Params.Duration, err)
}
wfCtx.SetMutableValue(time.Now().Add(d).Format(time.RFC3339), stepID, ResumeTimeStamp)
}
if ts := wfCtx.GetMutableValue(stepID, params.FieldLabel, SuspendTimeStamp); ts != "" {
if act.GetStatus().Phase == v1alpha1.WorkflowStepPhaseRunning {
// if it is already suspended before and has been resumed, we should not suspend it again.
return nil, nil
}
} else {
wfCtx.SetMutableValue(time.Now().Format(time.RFC3339), stepID, params.FieldLabel, SuspendTimeStamp)
}
act.Suspend(msg)
return nil, errors.GenericActionError(errors.ActionSuspend)
}

// Message writes message to step status, note that the message will be overwritten by the next message.
func Message(_ context.Context, params *ActionParams) (*any, error) {
params.Action.Message(params.Params.Message)
return nil, nil
}

//go:embed workspace.cue
var template string

// GetTemplate returns the cue template.
func GetTemplate() string {
return template
}

// GetProviders returns the cue providers.
func GetProviders() map[string]cuexruntime.ProviderFn {
return map[string]cuexruntime.ProviderFn{
"wait": providertypes.GenericProviderFn[WaitVars, any](Wait),
"break": providertypes.GenericProviderFn[ActionVars, any](Break),
"fail": providertypes.GenericProviderFn[ActionVars, any](Fail),
"message": providertypes.GenericProviderFn[ActionVars, any](Message),
"var": providertypes.GenericProviderFn[VarVars, VarReturns](DoVar),
"suspend": providertypes.GenericProviderFn[SuspendVars, any](Suspend),
}
}
Loading

0 comments on commit 1fa0042

Please sign in to comment.