diff --git a/api/v1alpha1/wildflyserver_types.go b/api/v1alpha1/wildflyserver_types.go index c4745004e..a6fac3294 100644 --- a/api/v1alpha1/wildflyserver_types.go +++ b/api/v1alpha1/wildflyserver_types.go @@ -38,8 +38,10 @@ type WildFlyServerSpec struct { // SessionAffinity defines if connections from the same client ip are passed to the same WildFlyServer instance/pod each time (false if omitted) SessionAffinity bool `json:"sessionAffinity,omitempty"` // DisableHTTPRoute disables the creation a route to the HTTP port of the application service (false if omitted) - DisableHTTPRoute bool `json:"disableHTTPRoute,omitempty"` - StandaloneConfigMap *StandaloneConfigMapSpec `json:"standaloneConfigMap,omitempty"` + DisableHTTPRoute bool `json:"disableHTTPRoute,omitempty"` + // DeactivateTransactionRecovery disables the process of recoverying transactions (false if omitted) + DeactivateTransactionRecovery bool `json:"deactivateTransactionRecovery,omitempty"` + StandaloneConfigMap *StandaloneConfigMapSpec `json:"standaloneConfigMap,omitempty"` // StorageSpec defines specific storage required for the server own data directory. If omitted, an EmptyDir is used (that will not // persist data across pod restart). Storage *StorageSpec `json:"storage,omitempty"` @@ -109,22 +111,20 @@ type WildFlyServerStatus struct { } const ( - // PodStateActive represents PodStatus.State when pod is active to serve requests - // it's connected in the Service load balancer + // (PodStatus.State) PodStateActive represents an active pod that is connected to the load balancer Service + // and that can serve requests PodStateActive = "ACTIVE" - // PodStateScalingDownRecoveryInvestigation represents the PodStatus.State when pod is in state of scaling down - // and is to be verified if it's dirty and if recovery is needed - // as the pod is under recovery verification it can't be immediately removed - // and it needs to be wait until it's marked as clean to be removed + // (PodStatus.State) PodStateScalingDownRecoveryInvestigation represents a pod that is under investigation + // to find out if there are transactions to be recovered. A pod in this state will be updated to one of + // the following states eventually PodStateScalingDownRecoveryInvestigation = "SCALING_DOWN_RECOVERY_INVESTIGATION" - // PodStateScalingDownRecoveryDirty represents the PodStatus.State when the pod was marked as recovery is needed - // because there are some in-doubt transactions. - // The app server was restarted with the recovery properties to speed-up recovery nad it's needed to wait - // until all ind-doubt transactions are processed. - PodStateScalingDownRecoveryDirty = "SCALING_DOWN_RECOVERY_DIRTY" - // PodStateScalingDownClean represents the PodStatus.State when pod is not active to serve requests - // it's in state of scaling down and it's clean - // 'clean' means it's ready to be removed from the kubernetes cluster + // (PodStatus.State) PodStateScalingDownRecoveryProcessing represents a pod that has transactions to be completed. + // The Operator will wait until all transactions are processed + PodStateScalingDownRecoveryProcessing = "SCALING_DOWN_RECOVERY_PROCESSING" + // (PodStatus.State) PodStateScalingDownRecoveryHeuristic represents a pod that has heuristic transactions. + // The Operator will wait until all heuristic transactions are manually solved + PodStateScalingDownRecoveryHeuristic = "SCALING_DOWN_RECOVERY_HEURISTICS" + // (PodStatus.State) PodStateScalingDownClean represents a pod that is ready to be scaled down PodStateScalingDownClean = "SCALING_DOWN_CLEAN" ) @@ -134,8 +134,10 @@ type PodStatus struct { Name string `json:"name"` PodIP string `json:"podIP"` // Represent the state of the Pod, it is used especially during scale down. - // +kubebuilder:validation:Enum=ACTIVE;SCALING_DOWN_RECOVERY_INVESTIGATION;SCALING_DOWN_RECOVERY_DIRTY;SCALING_DOWN_CLEAN + // +kubebuilder:validation:Enum=ACTIVE;SCALING_DOWN_RECOVERY_INVESTIGATION;SCALING_DOWN_RECOVERY_PROCESSING;SCALING_DOWN_RECOVERY_HEURISTICS;SCALING_DOWN_CLEAN State string `json:"state"` + // Counts the recovery attempts when there are in-doubt transactions + RecoveryCounter int32 `json:"recoveryCounter"` } // WildFlyServer is the Schema for the wildflyservers API diff --git a/bundle/manifests/wildfly.org_wildflyservers.yaml b/bundle/manifests/wildfly.org_wildflyservers.yaml index b00955e29..be9979c97 100644 --- a/bundle/manifests/wildfly.org_wildflyservers.yaml +++ b/bundle/manifests/wildfly.org_wildflyservers.yaml @@ -766,18 +766,25 @@ spec: podIP: type: string state: - description: Represent the state of the Pod, it is used especially - during scale down. + description: Represents the state of the Pod, it is used during scale down. enum: - ACTIVE - SCALING_DOWN_RECOVERY_INVESTIGATION - - SCALING_DOWN_RECOVERY_DIRTY + - SCALING_DOWN_RECOVERY_PROCESSING + - SCALING_DOWN_RECOVERY_HEURISTICS - SCALING_DOWN_CLEAN type: string + recoveryCounter: + description: Counts recovery attempts when there are in-doubt transactions. + format: int32 + type: integer + default: 0 + minimum: 0 required: - name - podIP - state + - recoveryCounter type: object type: array x-kubernetes-list-type: atomic diff --git a/controllers/transaction_recovery.go b/controllers/transaction_recovery.go index 9915a4f8e..2208b8799 100644 --- a/controllers/transaction_recovery.go +++ b/controllers/transaction_recovery.go @@ -31,136 +31,165 @@ const ( wftcDataDirName = "ejb-xa-recovery" // data directory where WFTC stores transaction runtime data ) -func (r *WildFlyServerReconciler) checkRecovery(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (bool, string, error) { +// Defines a new type to map the result of the checkRecovery routine +type checkResult int + +// Declaration of the possible results of checkRecovery +const ( + clean checkResult = iota + recovery + heuristic + genericError +) + +func (r *WildFlyServerReconciler) checkRecovery(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (checkResult, string, error) { + scaleDownPodName := scaleDownPod.ObjectMeta.Name scaleDownPodIP := scaleDownPod.Status.PodIP scaleDownPodRecoveryPort := defaultRecoveryPort - // Reading timestamp for the latest log record + // Reading timestamp of the latest log record scaleDownPodLogTimestampAtStart, err := wfly.RemoteOps.ObtainLogLatestTimestamp(scaleDownPod) if err != nil { - return false, "", fmt.Errorf("Log of the pod '%s' of WildflyServer '%v' is not ready during scaling down, "+ + return genericError, "", fmt.Errorf("The pod '%s' (of the WildflyServer '%v') is not ready to be scaled down, "+ "please verify its state. Error: %v", scaleDownPodName, w.ObjectMeta.Name, err) } - // If we are in state of recovery is needed the setup of the server has to be already done + // If transactions needs to be recovered, the setup of the server has to be carried out if scaleDownPod.Annotations[markerRecoveryPort] == "" { - reqLogger.Info("Verification the recovery listener is setup to run transaction recovery at " + scaleDownPodName) + reqLogger.Info("Verification that the transaction recovery listener of the pod is ready to recover transactions", "Pod name", scaleDownPodName) // Verify the recovery listener is setup jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnCheckRecoveryListener) if err != nil { - return false, "", fmt.Errorf("Cannot check if the transaction recovery listener is enabled for recovery at pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot check if the transaction recovery listener of the pod %v is enabled, error: %v", scaleDownPodName, err) } if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { - return false, "", fmt.Errorf("Failed to verify if transaction recovery listener is enabled at pod %v. Scaledown processing cannot trigger recovery. "+ + return genericError, "", fmt.Errorf("Failed to verify if the transaction recovery listener of the pod %v is enabled. Scaledown processing cannot trigger recovery. "+ "Management command: %v, JSON response: %v", scaleDownPodName, wfly.MgmtOpTxnCheckRecoveryListener, jsonResult) } - // When listener is not enabled then the pod will be terminated + // When the transaction recovery listener is not enabled then the pod will be terminated isTxRecoveryListenerEnabledAsInterface := wfly.ReadJSONDataByIndex(jsonResult, "result") isTxRecoveryListenerEnabledAsString, _ := wfly.ConvertToString(isTxRecoveryListenerEnabledAsInterface) if txrecoverydefined, err := strconv.ParseBool(isTxRecoveryListenerEnabledAsString); err == nil && !txrecoverydefined { - reqLogger.Info("Transaction recovery listener is not enabled. Transaction recovery cannot proceed at pod " + scaleDownPodName) + reqLogger.Info("The transaction recovery listener of the pod is not enabled", "Pod name", scaleDownPodName) r.Recorder.Event(w, corev1.EventTypeWarning, "WildFlyServerTransactionRecovery", - "Application server at pod "+scaleDownPodName+" does not define transaction recovery listener and recovery processing can't go forward."+ - " Please consider fixing server configuration. The pod is now going to be terminated.") - return true, "", nil + "The transaction recovery listener of the pod "+scaleDownPodName+" is not defined and the recovery process cannot started."+ + " If this is not intentional, fix the server configuration. The pod will be terminated.") + return clean, "", nil } // Reading recovery port from the app server with management port - reqLogger.Info("Query to find the transaction recovery port to force scan at pod " + scaleDownPodName) + reqLogger.Info("Query the app server to find out the transaction recovery port of the pod", "Pod name", scaleDownPodName) queriedScaleDownPodRecoveryPort, err := wfly.GetTransactionRecoveryPort(scaleDownPod) if err == nil && queriedScaleDownPodRecoveryPort != 0 { scaleDownPodRecoveryPort = queriedScaleDownPodRecoveryPort } if err != nil { - reqLogger.Error(err, "Error on reading transaction recovery port with management command. Using default port: "+scaleDownPodName, - "Pod name", scaleDownPodName) + reqLogger.Error(err, "Error reading the transaction recovery port", "Pod name", scaleDownPodName, "Default port", strconv.Itoa(int(scaleDownPodRecoveryPort))) } - // The pod was already searched for the recovery port, marking that into the annotations + // Save the recovery port into the annotations annotations := wfly.MapMerge( scaleDownPod.GetAnnotations(), map[string]string{markerRecoveryPort: strconv.FormatInt(int64(scaleDownPodRecoveryPort), 10)}) patch := client.MergeFrom(scaleDownPod.DeepCopy()) scaleDownPod.SetAnnotations(annotations) if err := resources.Patch(w, r.Client, scaleDownPod, patch); err != nil { - return false, "", fmt.Errorf("Failed to update pod annotations, pod name %v, annotations to be set %v, error: %v", + return genericError, "", fmt.Errorf("Failed to update the annotation of the pod %v, annotations to be set %v, error: %v", scaleDownPodName, scaleDownPod.Annotations, err) } } else { - // pod annotation already contains information on recovery port thus we just use it + // The annotations of the pod already contain information about the recovery port queriedScaleDownPodRecoveryPortString := scaleDownPod.Annotations[markerRecoveryPort] queriedScaleDownPodRecoveryPort, err := strconv.Atoi(queriedScaleDownPodRecoveryPortString) if err != nil { patch := client.MergeFrom(scaleDownPod.DeepCopy()) delete(scaleDownPod.Annotations, markerRecoveryPort) if errUpdate := resources.Patch(w, r.Client, scaleDownPod, patch); errUpdate != nil { - reqLogger.Info("Cannot update scaledown pod while resetting the recovery port annotation", - "Scale down Pod", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) + reqLogger.Info("Cannot update the pod while resetting the recovery port annotation", + "Pod name", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) } - return false, "", fmt.Errorf("Cannot convert recovery port value '%s' to integer for the scaling down pod %v, error: %v", + return genericError, "", fmt.Errorf("Cannot convert recovery port value '%s' to integer for the pod %v, error: %v", queriedScaleDownPodRecoveryPortString, scaleDownPodName, err) } scaleDownPodRecoveryPort = int32(queriedScaleDownPodRecoveryPort) } - // With enabled recovery listener and the port, let's start the recovery scan - reqLogger.Info("Executing recovery scan at "+scaleDownPodName, "Pod IP", scaleDownPodIP, "Recovery port", scaleDownPodRecoveryPort) + // Transaction recovery listener is enabled and the recovery port has been retrieved, the recovery scan can be started + reqLogger.Info("Executing the recovery scan for the pod", "Pod name", scaleDownPodName, "Pod IP", scaleDownPodIP, "Recovery port", scaleDownPodRecoveryPort) _, err = wfly.RemoteOps.SocketConnect(scaleDownPodIP, scaleDownPodRecoveryPort, txnRecoveryScanCommand) if err != nil { patch := client.MergeFrom(scaleDownPod.DeepCopy()) delete(scaleDownPod.Annotations, markerRecoveryPort) if errUpdate := r.Client.Patch(context.TODO(), scaleDownPod, patch); errUpdate != nil { - reqLogger.Info("Cannot update scaledown pod while resetting the recovery port annotation", - "Scale down Pod", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) + reqLogger.Info("Cannot update the pod while resetting the recovery port annotation", + "Pod name", scaleDownPodName, "Annotations", scaleDownPod.Annotations, "Error", errUpdate) } - return false, "", fmt.Errorf("Failed to run transaction recovery scan for scaling down pod %v. "+ - "Please, verify the pod log file. Error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Failed to run the transaction recovery scan for the pod %v. "+ + "Please, verify the pod logs. Error: %v", scaleDownPodName, err) } // No error on recovery scan => all the registered resources were available during the recovery processing foundLogLine, err := wfly.RemoteOps.VerifyLogContainsRegexp(scaleDownPod, scaleDownPodLogTimestampAtStart, recoveryErrorRegExp) if err != nil { - return false, "", fmt.Errorf("Cannot parse log from scaling down pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot parse logs from the pod %v, error: %v", scaleDownPodName, err) } if foundLogLine != "" { - retString := fmt.Sprintf("Scale down transaction recovery processing contains errors in log. The recovery will be retried."+ - "Pod name: %v, log line with error '%v'", scaleDownPod, foundLogLine) - return false, retString, nil + retString := fmt.Sprintf("The transaction recovery process contains errors in the logs. The recovery will be retried."+ + "Pod: %v, log line with error '%v'", scaleDownPod, foundLogLine) + return genericError, retString, nil } - // Probing transaction log to verify there is not in-doubt transaction in the log + // Probing transaction logs to verify there is not any in-doubt transaction in the log _, err = wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnProbe) if err != nil { - return false, "", fmt.Errorf("Error in probing transaction log for scaling down pod %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Error while probing transaction logs of the pod %v, error: %v", scaleDownPodName, err) } - // Transaction log was probed, now we read the set of transactions which are in-doubt + // Transaction logs were probed, now we read the set of in-doubt transactions jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnRead) if err != nil { - return false, "", fmt.Errorf("Cannot read transactions from the transaction log for pod scaling down %v, error: %v", scaleDownPodName, err) + return genericError, "", fmt.Errorf("Cannot read transactions from the log store of the pod %v, error: %v", scaleDownPodName, err) } if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { - return false, "", fmt.Errorf("Cannot get list of the in-doubt transactions at pod %v for transaction scaledown", scaleDownPodName) + return genericError, "", fmt.Errorf("Cannot get the list of in-doubt transactions of the pod %v", scaleDownPodName) } // Is the number of in-doubt transactions equal to zero? transactions := jsonResult["result"] txnMap, isMap := transactions.(map[string]interface{}) // typing the variable to be a map of interfaces if isMap && len(txnMap) > 0 { - retString := fmt.Sprintf("Recovery scan to be invoked as the transaction log storage is not empty for pod scaling down pod %v, "+ + + // Check for HEURISTIC transactions + jsonResult, err := wfly.ExecuteMgmtOp(scaleDownPod, wfly.MgmtOpTxnReadHeuristic) + if err != nil { + return genericError, "", fmt.Errorf("Cannot read HEURISTIC transactions from the log store of the pod %v, error: %v", scaleDownPodName, err) + } + if !wfly.IsMgmtOutcomeSuccesful(jsonResult) { + return genericError, "", fmt.Errorf("Cannot read HEURISTIC transactions from the log store of the pod %v", scaleDownPodName) + } + // Is the number of HEURISTIC transactions equal to zero? + transactions := jsonResult["result"] + heuristicTxnArray, isArray := transactions.([]interface{}) // typing the variable to be an array of interfaces + if isArray && len(heuristicTxnArray) > 0 { + retString := fmt.Sprintf("There are HEURISTIC transactions in the log store of the pod %v. Please, resolve them manually, "+ + "transaction list: %v", scaleDownPodName, heuristicTxnArray) + return heuristic, retString, nil + } + + retString := fmt.Sprintf("A recovery scan is needed as the log store of the pod %v is not empty, "+ "transaction list: %v", scaleDownPodName, txnMap) - return false, retString, nil + return recovery, retString, nil } // Verification of the unfinished data of the WildFly transaction client (verification of the directory content) lsCommand := fmt.Sprintf(`ls ${JBOSS_HOME}/%s/%s/ 2> /dev/null || true`, resources.StandaloneServerDataDirRelativePath, wftcDataDirName) commandResult, err := wfly.RemoteOps.Execute(scaleDownPod, lsCommand) if err != nil { - return false, "", fmt.Errorf("Cannot query filesystem at scaling down pod %v to check existing remote transactions. "+ + return genericError, "", fmt.Errorf("Cannot query the filesystem of the pod %v to check existing remote transactions. "+ "Exec command: %v", scaleDownPodName, lsCommand) } if commandResult != "" { - retString := fmt.Sprintf("WildFly Transaction Client data dir is not empty and scaling down of the pod '%v' will be retried."+ - "Wildfly Transacton Client data dir path '${JBOSS_HOME}/%v/%v', output listing: %v", + retString := fmt.Sprintf("WildFly's data directory is not empty and the scaling down of the pod '%v' will be retried."+ + "Wildfly's data directory path is: '${JBOSS_HOME}/%v/%v', output listing: %v", scaleDownPodName, resources.StandaloneServerDataDirRelativePath, wftcDataDirName, commandResult) - return false, retString, nil + return recovery, retString, nil } - return true, "", nil + return clean, "", nil } func (r *WildFlyServerReconciler) setupRecoveryPropertiesAndRestart(reqLogger logr.Logger, scaleDownPod *corev1.Pod, w *wildflyv1alpha1.WildFlyServer) (mustReconcile int, err error) { @@ -238,28 +267,44 @@ func (r *WildFlyServerReconciler) updatePodLabel(w *wildflyv1alpha1.WildFlyServe return updated, nil } -// processTransactionRecoveryScaleDown runs transaction recovery on provided number of pods -// -// mustReconcile returns int constant; 'requeueNow' if the reconcile requeue loop should be called as soon as possible, -// 'requeueLater' if requeue loop is needed but it could be delayed, 'requeueOff' if requeue loop is not necessary -// err reports error which occurs during method processing +/* + * processTransactionRecoveryScaleDown runs transaction recovery on provided number of pods + * Returns the int contant mustReconcile: + * - 'requeueNow' if the reconcile requeue loop should be called as soon as possible + * - 'requeueLater' if requeue loop is needed but it could be delayed + * - 'requeueOff' if requeue loop is not necessary + * err reports error that occured during the transaction recovery + */ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger logr.Logger, w *wildflyv1alpha1.WildFlyServer, numberOfPodsToScaleDown int, podList *corev1.PodList) (mustReconcile int, err error) { + // podList comes from the wildflyserver_controller.Reconcile method, + // where the list of pods specific to a WildFlyServer CR is created + + // Current number of pods wildflyServerNumberOfPods := len(podList.Items) - scaleDownPodsStates := sync.Map{} // map referring to: pod name - pod state - scaleDownErrors := sync.Map{} // errors occurred during processing the scaledown for the pods + // sync.map to store errors occurred during processing the scaledown of pods + errorsSyncMap := sync.Map{} + // Return value for the reconcile cycle mustReconcile = requeueOff if wildflyServerNumberOfPods == 0 || numberOfPodsToScaleDown == 0 { // no active pods to scale down or no pods are requested to scale down return requeueOff, nil } + // In case the WildFlyServer custom resource has not been updated yet, wait the next reconcile cycle + if len(w.Status.Pods) != wildflyServerNumberOfPods { + return requeueLater, nil + } if w.Spec.BootableJar { reqLogger.Info("Transaction scale down recovery is unsupported for Bootable JAR pods. The pods will be removed without checking pending transactions.", "Number of pods to be scaled down", numberOfPodsToScaleDown) return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) } + if w.Spec.DeactivateTransactionRecovery { + reqLogger.Info("The 'DeactivateTransactionRecovery' flag is set to true thus the process to recovery transactions will be skipped.") + return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) + } subsystemsList, err := wfly.ListSubsystems(&podList.Items[0]) if err != nil { reqLogger.Info("Cannot get list of subsystems available in application server", "Pod", podList.Items[0].ObjectMeta.Name) @@ -277,119 +322,129 @@ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger return requeueLater, err } if !isJDBCObjectStore { - reqLogger.Info("Transaction scale down recovery will be skipped as it's unsupported when WildFlyServer Storage uses 'EmptyDir' "+ - "and transaction subsystem does not use the JDBC object store. The recovery processing is unsafe. "+ - "Please configure WildFlyServer.Spec.Storage with a Persistent Volume Claim or use database to store transaction log.", + reqLogger.Info("Transaction scale down recovery will be skipped as it is unsupported when WildFlyServer's Storage is 'EmptyDir' "+ + "and the transaction subsystem does not use the JDBC object store. As a consequence, the transaction recovery processing is unsafe. "+ + "Please, configure WildFlyServer.Spec.Storage with a Persistent Volume Claim or use a database to store transaction logs.", "Number of pods to be scaled down", numberOfPodsToScaleDown) return r.skipRecoveryAndForceScaleDown(w, wildflyServerNumberOfPods, numberOfPodsToScaleDown, podList) } } - // Setting-up the pod status - status is used to decide if the pod could be scaled (aka. removed from the statefulset) - updated := abool.New() - for scaleDownIndex := 1; scaleDownIndex <= numberOfPodsToScaleDown; scaleDownIndex++ { - scaleDownPodName := podList.Items[wildflyServerNumberOfPods-scaleDownIndex].ObjectMeta.Name - wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, scaleDownPodName) - if wildflyServerSpecPodStatus == nil { - continue + // Flag to signal that there are pods in need of updating + podStatusNeedsUpdating := abool.New() + // Select pods to be scaled down (starting from the tail of the list) + var podsToScaleDown []corev1.Pod + for index := wildflyServerNumberOfPods - 1; index >= wildflyServerNumberOfPods-numberOfPodsToScaleDown; index-- { + // Only running pods can be considered + if podList.Items[index].Status.Phase == "Running" { + podsToScaleDown = append(podsToScaleDown, podList.Items[index]) } + } + if len(podsToScaleDown) == 0 { + reqLogger.Info("There are not 'Running' pods to scale down") + return requeueLater, nil + } + // PodStatus.State is set to PodStateScalingDownRecoveryInvestigation (SCALING_DOWN_RECOVERY_INVESTIGATION) + // to start the scale down process + for _, corePod := range podsToScaleDown { + // wildflyServerSpecPodStatus points to wildflyv1alpha1.WildFlyServer.Status.Pods[index]; + wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, corePod.ObjectMeta.Name) if wildflyServerSpecPodStatus.State == wildflyv1alpha1.PodStateActive { wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryInvestigation - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownRecoveryInvestigation) - updated.Set() - } else { - scaleDownPodsStates.Store(scaleDownPodName, wildflyServerSpecPodStatus.State) + podStatusNeedsUpdating.Set() } } - if updated.IsSet() { // updating status of pods as soon as possible + // If there are pods in PodStateScalingDownRecoveryInvestigation state, an update cycle must be run + if podStatusNeedsUpdating.IsSet() { w.Status.ScalingdownPods = int32(numberOfPodsToScaleDown) err := resources.UpdateWildFlyServerStatus(w, r.Client) if err != nil { - return requeueNow, fmt.Errorf("There was trouble to update state of WildflyServer: %v, error: %v", w.Status.Pods, err) + return requeueNow, fmt.Errorf("Failed to update the state of the WildflyServer resource: %v, error: %v", w.Status.Pods, err) } } - updated.UnSet() + // Reset the flag to signal the need to run an update cycle + podStatusNeedsUpdating.UnSet() var wg sync.WaitGroup - for scaleDownIndex := 1; scaleDownIndex <= numberOfPodsToScaleDown; scaleDownIndex++ { - scaleDownPod := podList.Items[wildflyServerNumberOfPods-scaleDownIndex] + for _, corePod := range podsToScaleDown { + // As the variables `wildflyServerSpecPodStatus` and `corePodTemp` are created for each iteration, + // only the goroutine defined in the same iteration can access them. In this way, the situation where + //multiple goroutines access the same variable is avoided. + wildflyServerSpecPodStatus := getWildflyServerPodStatusByName(w, corePod.ObjectMeta.Name) + corePodTemp := corePod wg.Add(1) go func() { defer wg.Done() - // Scaledown scenario, need to handle transaction recovery - scaleDownPodName := scaleDownPod.ObjectMeta.Name - scaleDownPodIP := scaleDownPod.Status.PodIP - if strings.Contains(scaleDownPodIP, ":") && !strings.HasPrefix(scaleDownPodIP, "[") { - scaleDownPodIP = "[" + scaleDownPodIP + "]" // for IPv6 + podIP := wildflyServerSpecPodStatus.PodIP + if strings.Contains(podIP, ":") && !strings.HasPrefix(podIP, "[") { + podIP = "[" + podIP + "]" // for IPv6 } - podState, ok := scaleDownPodsStates.Load(scaleDownPodName) - if !ok { - scaleDownErrors.Store(scaleDownPodName+"_status-update", - fmt.Errorf("Cannot find pod name '%v' in the list of the active pods for the WildflyServer operator: %v", - scaleDownPodName, w.ObjectMeta.Name)) - _, podsStatus := getPodStatus(podList.Items, w.Status.Pods) - reqLogger.Info("Updating pod status", "Pod Status", podsStatus) - w.Status.Pods = podsStatus - updated.Set() - return - } - - if podState != wildflyv1alpha1.PodStateScalingDownClean { - reqLogger.Info("Transaction recovery scaledown processing", "Pod Name", scaleDownPodName, - "IP Address", scaleDownPodIP, "Pod State", podState, "Pod Phase", scaleDownPod.Status.Phase) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownClean { + reqLogger.Info("Transaction recovery scaledown processing", "Pod Name", corePodTemp.ObjectMeta.Name, + "IP Address", podIP, "Pod State", wildflyServerSpecPodStatus.State, "Pod's Recovery Counter", wildflyServerSpecPodStatus.RecoveryCounter) - // For full and correct recovery we need to first, run two recovery checks and second, having the orphan detection interval set to minimum - needsReconcile, setupErr := r.setupRecoveryPropertiesAndRestart(reqLogger, &scaleDownPod, w) + // For full and correct recovery we need to run two recovery checks with orphan detection interval set to minimum + needsReconcile, setupErr := r.setupRecoveryPropertiesAndRestart(reqLogger, &corePodTemp, w) if needsReconcile > mustReconcile { mustReconcile = needsReconcile } if setupErr != nil { - scaleDownErrors.Store(scaleDownPodName, setupErr) + errorsSyncMap.Store(corePodTemp.ObjectMeta.Name, setupErr) return } //The futher recovery attempts won't succeed until reconcilation loop is repeated if needsReconcile != requeueOff { return } - // Running recovery twice for orphan detection will be kicked-in - _, _, recoveryErr := r.checkRecovery(reqLogger, &scaleDownPod, w) - if recoveryErr != nil { - scaleDownErrors.Store(scaleDownPodName, recoveryErr) - return - } - success, message, recoveryErr := r.checkRecovery(reqLogger, &scaleDownPod, w) - if recoveryErr != nil { - scaleDownErrors.Store(scaleDownPodName, recoveryErr) - return + + // Running the recovery check twice to discover in-doubt transactions + var ( + outcome checkResult + message string + recoveryErr error + ) + for count := 0; count < 2; count++ { + outcome, message, recoveryErr = r.checkRecovery(reqLogger, &corePodTemp, w) + // This if handles the outcome genericError + if recoveryErr != nil { + errorsSyncMap.Store(corePodTemp.ObjectMeta.Name, recoveryErr) + return + } } - if success { + if outcome == clean { // Recovery was processed with success, the pod is clean to go - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownClean) - } else if message != "" { - // Some in-doubt transaction left in store, the pod is still dirty - reqLogger.Info("In-doubt transactions in object store", "Pod Name", scaleDownPodName, "Message", message) - scaleDownPodsStates.Store(scaleDownPodName, wildflyv1alpha1.PodStateScalingDownRecoveryDirty) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownClean { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownClean + podStatusNeedsUpdating.Set() + } + } else if outcome == recovery { + reqLogger.Info("The pod is trying to recover unfinished transactions", "Pod name", corePodTemp.ObjectMeta.Name, "Message", message) + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownRecoveryProcessing { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryProcessing + } + wildflyServerSpecPodStatus.RecoveryCounter++ + reqLogger.Info("The recovery counter of the pod is: "+strconv.Itoa(int(wildflyServerSpecPodStatus.RecoveryCounter)), "Pod name", corePodTemp.ObjectMeta.Name) + // As the RecoveryCounter increases at every recovery attempt, podStatusNeedsUpdating must be set to trigger the update of the status + podStatusNeedsUpdating.Set() + } else if outcome == heuristic { + if wildflyServerSpecPodStatus.State != wildflyv1alpha1.PodStateScalingDownRecoveryHeuristic { + wildflyServerSpecPodStatus.State = wildflyv1alpha1.PodStateScalingDownRecoveryHeuristic + podStatusNeedsUpdating.Set() + } + reqLogger.Info("The pod has heuristic transactions", "Pod name", corePodTemp.ObjectMeta.Name, "Message", message) + r.Recorder.Event(w, corev1.EventTypeWarning, "Transaction Recovery", "There are HEURISTIC transactions in "+corePodTemp.ObjectMeta.Name+"! Please, resolve them manually!") } } }() // execution of the go routine for one pod } wg.Wait() - // Updating the pod state based on the recovery processing when a scale down is in progress - for wildflyServerPodStatusIndex, v := range w.Status.Pods { - if podStateValue, exist := scaleDownPodsStates.Load(v.Name); exist { - if w.Status.Pods[wildflyServerPodStatusIndex].State != podStateValue.(string) { - updated.Set() - } - w.Status.Pods[wildflyServerPodStatusIndex].State = podStateValue.(string) - } - } // Verification if an error happened during the recovery processing var errStrings string numberOfScaleDownErrors := 0 var resultError error - scaleDownErrors.Range(func(k, v interface{}) bool { + errorsSyncMap.Range(func(k, v interface{}) bool { numberOfScaleDownErrors++ errStrings += " [[" + v.(error).Error() + "]]," return true @@ -400,7 +455,7 @@ func (r *WildFlyServerReconciler) processTransactionRecoveryScaleDown(reqLogger "Errors during transaction recovery scaledown processing. Consult operator log.") } - if updated.IsSet() { // recovery changed the state of the pods + if podStatusNeedsUpdating.IsSet() { // recovery changed the state of the pods w.Status.ScalingdownPods = int32(numberOfPodsToScaleDown) err := resources.UpdateWildFlyServerStatus(w, r.Client) if err != nil { diff --git a/controllers/transaction_recovery_test.go b/controllers/transaction_recovery_test.go index d2c91d806..a623dad1a 100644 --- a/controllers/transaction_recovery_test.go +++ b/controllers/transaction_recovery_test.go @@ -35,7 +35,7 @@ const ( ) var ( - // variables to be setup and re-used in the method over the file + // Variables to be used in the testing methods assert *testifyAssert.Assertions cl client.Client r *WildFlyServerReconciler @@ -347,6 +347,35 @@ func TestSkipRecoveryScaleDownWhenEmptyDirStorage(t *testing.T) { assert.Empty(remoteOpsMock.ExecuteMockReturn) } +func TestSkipRecoveryScaleDownWhenDeactivateTransactionRecoveryIsTrue(t *testing.T) { + wildflyServer := defaultWildflyServerDefinition.DeepCopy() + // Deactivate the Transaction Recovery feature + wildflyServer.Spec.DeactivateTransactionRecovery = true + setupBeforeScaleDown(t, wildflyServer, 1) + + log := ctrl.Log.WithName("TestSkipRecoveryScaleDownWhenDeactivateTransactionRecoveryIsTrue") + + log.Info("WildFly server was reconciled, let's scale it down.", "WildflyServer", wildflyServer) + wildflyServer.Spec.Replicas = 0 + err := cl.Update(context.TODO(), wildflyServer) + + // Reconcile for the scale down - updating the pod labels + _, err = r.Reconcile(context.TODO(), req) + require.NoError(t, err) + + // Reconcile to start the scale down procesing - recovery skipped + _, err = r.Reconcile(context.TODO(), req) + require.NoError(t, err) + // StatefulSet needs to be updated + statefulSet := &appsv1.StatefulSet{} + err = cl.Get(context.TODO(), req.NamespacedName, statefulSet) + assert.Equal(int32(0), *statefulSet.Spec.Replicas) + // WildFlyServer status needs to be updated in sclaed down manner + err = cl.Get(context.TODO(), req.NamespacedName, wildflyServer) + require.NoError(t, err) + assert.Equal(wildflyv1alpha1.PodStateScalingDownClean, wildflyServer.Status.Pods[0].State) +} + // -- // -- Mocking the remote calls and Kubernetes API --- // -- diff --git a/controllers/wildflyserver_controller.go b/controllers/wildflyserver_controller.go index fd1db0859..0dead1599 100644 --- a/controllers/wildflyserver_controller.go +++ b/controllers/wildflyserver_controller.go @@ -494,13 +494,16 @@ func getPodStatus(pods []corev1.Pod, originalPodStatuses []wildflyv1alpha1.PodSt } for _, pod := range pods { podState := wildflyv1alpha1.PodStateActive + recoveryCounter := int32(0) if value, exists := podStatusesOriginalMap[pod.Name]; exists { podState = value.State + recoveryCounter = value.RecoveryCounter } podStatuses = append(podStatuses, wildflyv1alpha1.PodStatus{ - Name: pod.Name, - PodIP: pod.Status.PodIP, - State: podState, + Name: pod.Name, + PodIP: pod.Status.PodIP, + State: podState, + RecoveryCounter: recoveryCounter, }) if pod.Status.PodIP == "" { requeue = true diff --git a/pkg/util/wildfly_mgmt.go b/pkg/util/wildfly_mgmt.go index 00fdcb014..868954d81 100644 --- a/pkg/util/wildfly_mgmt.go +++ b/pkg/util/wildfly_mgmt.go @@ -30,6 +30,8 @@ var ( MgmtOpTxnProbe = "/subsystem=transactions/log-store=log-store:probe()" // MgmtOpTxnRead is a JBoss CLI command for reading transaction log store MgmtOpTxnRead = "/subsystem=transactions/log-store=log-store:read-children-resources(child-type=transactions,recursive=true,include-runtime=true)" + // MgmtOpTxnReadHeuristic is a JBoss CLI command for scanning the log store in search of transactions in HEURISTIC status + MgmtOpTxnReadHeuristic = "/subsystem=transactions/log-store=log-store/transactions=*/participants=*:query(where={\"status\"=\"HEURISTIC\"}" // MgmtOpTxnRecoverySocketBindingRead is a JBoss CLI command for reading name of recovery socket binding MgmtOpTxnRecoverySocketBindingRead = "/subsystem=transactions:read-attribute(name=socket-binding)" // MgmtOpSocketBindingRead is a JBoss CLI command for reading all data on the socket binding group