From a31e2b8b4a7cb7ccaf814497f733b410acf81205 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Wed, 21 Aug 2024 18:24:15 +0800 Subject: [PATCH] add discovery implementation and test --- build.sbt | 2 +- .../src/main/resources/reference.conf | 2 + .../discovery/eureka/EurekaResponse.scala | 32 +++++++ .../eureka/EurekaServiceDiscovery.scala | 64 +++++++++++++- .../discovery/eureka/EurekaSettings.scala | 8 +- .../pekko/discovery/eureka/JsonFormat.scala | 50 +++++++++++ .../scala/EurekaServiceDiscoverySpec.scala | 85 +++++++++++++++++++ .../pekko/discovery/marathon/AppList.scala | 2 - .../MarathonApiServiceDiscovery.scala | 1 - project/Dependencies.scala | 2 + 10 files changed, 237 insertions(+), 11 deletions(-) create mode 100644 discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaResponse.scala create mode 100644 discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/JsonFormat.scala create mode 100644 discovery-eureka/src/test/scala/EurekaServiceDiscoverySpec.scala diff --git a/build.sbt b/build.sbt index ad2e7194..891ce0e0 100644 --- a/build.sbt +++ b/build.sbt @@ -109,7 +109,7 @@ lazy val discoveryEureka = pekkoModule("discovery-eureka") .enablePlugins(AutomateHeaderPlugin, ReproducibleBuildsPlugin) .settings( name := "pekko-discovery-eureka", - libraryDependencies := Dependencies.discoveryMarathonApi, + libraryDependencies := Dependencies.discoveryEureka, mimaPreviousArtifactsSet) // gathers all enabled routes and serves them (HTTP or otherwise) diff --git a/discovery-eureka/src/main/resources/reference.conf b/discovery-eureka/src/main/resources/reference.conf index 394dc169..f1a852f1 100644 --- a/discovery-eureka/src/main/resources/reference.conf +++ b/discovery-eureka/src/main/resources/reference.conf @@ -8,6 +8,8 @@ pekko.discovery { eureka { class = org.apache.pekko.discovery.eureka.EurekaServiceDiscovery + # default eureka schema + eureka-schema = "http" # default eureka host eureka-host = "127.0.0.1" # default eureka port diff --git a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaResponse.scala b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaResponse.scala new file mode 100644 index 00000000..fc3f52ce --- /dev/null +++ b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaResponse.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pekko.discovery.eureka + +object EurekaResponse { + case class Application(name: String, instance: Seq[Instance]) + case class Instance(hostName: String, app: String, vipAddress: String, secureVipAddress: String, ipAddr: String, status: String, port: PortWrapper, securePort: PortWrapper, healthCheckUrl: String, statusPageUrl: String, homePageUrl: String, appGroupName: String, dataCenterInfo: DataCenterInfo, lastDirtyTimestamp: String) + case class Status() + case class PortWrapper(port: Int, enabled: Boolean) + case class DataCenterInfo(name: String = "MyOwn", clz : String = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo") +} + +import EurekaResponse._ + +case class EurekaResponse(application: Application, errorCode: Option[String]) diff --git a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaServiceDiscovery.scala b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaServiceDiscovery.scala index 7461f352..5685758a 100644 --- a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaServiceDiscovery.scala +++ b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaServiceDiscovery.scala @@ -20,12 +20,70 @@ package org.apache.pekko.discovery.eureka import org.apache.pekko.actor.ActorSystem -import org.apache.pekko.discovery.{ Lookup, ServiceDiscovery } +import org.apache.pekko.discovery.ServiceDiscovery.{Resolved, ResolvedTarget} +import org.apache.pekko.discovery.eureka.JsonFormat._ +import org.apache.pekko.discovery.{Lookup, ServiceDiscovery} +import org.apache.pekko.event.{LogSource, Logging} +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.model.headers._ +import org.apache.pekko.http.scaladsl.model.{HttpRequest, MediaRange, MediaTypes, Uri} +import org.apache.pekko.http.scaladsl.unmarshalling.Unmarshal +import java.net.InetAddress import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.util.Try -class EurekaServiceDiscovery(system: ActorSystem) extends ServiceDiscovery { +class EurekaServiceDiscovery(implicit system: ActorSystem) extends ServiceDiscovery { + + import system.dispatcher + + private val log = Logging(system, getClass)(LogSource.fromClass) + private val settings = EurekaSettings(system) + private val (schema, host, port, path, group) = (settings.schema, settings.host, settings.port, settings.path, settings.groupName) + private val http = Http() + + override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = { + + val uriPath = Uri.Path.Empty / path / "apps" / lookup.serviceName + val uri = Uri.from(scheme = schema, host = host, port = port).withPath(uriPath) + val request = HttpRequest(uri = uri, headers = Seq(`Accept-Encoding`(HttpEncodings.gzip), Accept(MediaRange(MediaTypes.`application/json`)))) + + log.info("Requesting seed nodes by: {}", request.uri) + + for { + response <- http.singleRequest(request) + entity <- response.entity.toStrict(resolveTimeout) + response <- { + log.debug("Eureka response: [{}]", entity.data.utf8String) + val unmarshalled = Unmarshal(entity).to[EurekaResponse] + unmarshalled.failed.foreach { _ => + log.error( + "Failed to unmarshal Eureka response status [{}], entity: [{}], uri: [{}]", + response.status.value, + entity.data.utf8String, + uri) + } + unmarshalled + } + instances <- pick(response.application.instance) + } yield Resolved(lookup.serviceName, targets(instances)) + + } + + private[eureka] def pick(instances: Seq[EurekaResponse.Instance]): Future[Seq[EurekaResponse.Instance]] = { + Future.successful(instances.collect({ + case instance if instance.status == "UP" && instance.appGroupName == group => instance + })) + } + + private[eureka] def targets(instances: Seq[EurekaResponse.Instance]): Seq[ResolvedTarget] = { + instances.map { instance => + ResolvedTarget( + host = instance.ipAddr, + port = Some(instance.port.port), + address = Try(InetAddress.getByName(instance.ipAddr)).toOption) + } + } - override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[ServiceDiscovery.Resolved] = ??? } diff --git a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaSettings.scala b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaSettings.scala index 897b237f..3d6f1f01 100644 --- a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaSettings.scala +++ b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/EurekaSettings.scala @@ -34,16 +34,16 @@ import pekko.annotation.ApiMayChange final class EurekaSettings(system: ExtendedActorSystem) extends Extension { private val eurekaConfig = system.settings.config.getConfig("pekko.discovery.eureka") - val eurekaHost: String = eurekaConfig.getString("eureka-host") - val eurekaPort: Int = eurekaConfig.getInt("eureka-port") - val eurekaPath: String = eurekaConfig.getString("eureka-path") + val schema: String = eurekaConfig.getString("eureka-schema") + val host: String = eurekaConfig.getString("eureka-host") + val port: Int = eurekaConfig.getInt("eureka-port") + val path: String = eurekaConfig.getString("eureka-path") val groupName: String = eurekaConfig.getString("group-name") val statusPageUrl: String = eurekaConfig.getString("status-page-url") val healthCheckUrl: String = eurekaConfig.getString("health-page-url") val homePageUrl: String = eurekaConfig.getString("home-page-url") val servicePort: Int = eurekaConfig.getInt("service-port") val serviceName: String = system.name - val managementPort: Int = system.settings.config.getInt("pekko.management.http.port"); val renewInterval: Long = eurekaConfig.getLong("renew-interval") } diff --git a/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/JsonFormat.scala b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/JsonFormat.scala new file mode 100644 index 00000000..0c7413a8 --- /dev/null +++ b/discovery-eureka/src/main/scala/org/apache/pekko/discovery/eureka/JsonFormat.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, which was derived from Akka. + */ + +/* + * Copyright (C) 2017-2021 Lightbend Inc. + */ + +package org.apache.pekko.discovery.eureka + +import org.apache.pekko.discovery.eureka.EurekaResponse.{Application, DataCenterInfo, Instance, PortWrapper} +import org.apache.pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json._ + +object JsonFormat extends SprayJsonSupport with DefaultJsonProtocol { + implicit val portFormat: JsonFormat[PortWrapper] = new JsonFormat[PortWrapper] { + + override def read(json: JsValue): PortWrapper = { + json.asJsObject.getFields("$", "@enabled") match { + case Seq(JsNumber(port), JsString(enabled)) => PortWrapper(port.toInt, enabled.toBoolean) + case _ => throw DeserializationException("PortWrapper expected") + } + } + + override def write(obj: PortWrapper): JsValue = JsObject( + "$" -> JsNumber(obj.port), + "@enabled" -> JsString(obj.enabled.toString)) + } + implicit val dataCenterInfoFormat: JsonFormat[DataCenterInfo] = new JsonFormat[DataCenterInfo] { + + override def read(json: JsValue): DataCenterInfo = { + json.asJsObject.getFields("name","@class") match { + case Seq(JsString(name), JsString(clz)) => DataCenterInfo(name, clz) + case _ => throw DeserializationException("DataCenterInfo expected") + } + } + + override def write(obj: DataCenterInfo): JsValue = JsObject( + "name" -> JsString(obj.name), + "@class" -> JsString(obj.clz)) + } + implicit val instanceFormat: JsonFormat[Instance] = jsonFormat14(Instance.apply) + implicit val applicationFormat: JsonFormat[Application] = jsonFormat2(Application.apply) + implicit val rootFormat: RootJsonFormat[EurekaResponse] = jsonFormat2(EurekaResponse.apply) +} diff --git a/discovery-eureka/src/test/scala/EurekaServiceDiscoverySpec.scala b/discovery-eureka/src/test/scala/EurekaServiceDiscoverySpec.scala new file mode 100644 index 00000000..2297a5c6 --- /dev/null +++ b/discovery-eureka/src/test/scala/EurekaServiceDiscoverySpec.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.discovery.ServiceDiscovery.ResolvedTarget +import org.apache.pekko.discovery.eureka.EurekaServiceDiscovery +import org.apache.pekko.testkit.TestKitBase +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.wordspec.AnyWordSpecLike + +import java.net.InetAddress +import scala.concurrent.duration.DurationInt +import scala.io.Source +import scala.util.Try + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +class EurekaServiceDiscoverySpec + extends AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with TestKitBase + with ScalaFutures { + "Eureka Discovery" should { + "work for defaults" in { + + val lookupService = new EurekaServiceDiscovery() + val resolved = lookupService.lookup("BANK-ACCOUNT", 10.seconds).futureValue + resolved.addresses should contain( + ResolvedTarget( + host = "127.0.0.1", + port = Some(8558), + address = Try(InetAddress.getByName("127.0.0.1")).toOption)) + + } + } + + private def resourceAsString(name: String): String = + Source.fromInputStream(getClass.getClassLoader.getResourceAsStream(name)).mkString + + override def afterAll(): Unit = { + super.afterAll() + print("clean up \n") + } + + override implicit lazy val system: ActorSystem = ActorSystem("test") + + implicit override val patienceConfig: PatienceConfig = + PatienceConfig(timeout = scaled(Span(30, Seconds)), interval = scaled(Span(50, Millis))) +} diff --git a/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/AppList.scala b/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/AppList.scala index 2e023647..bbe82269 100644 --- a/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/AppList.scala +++ b/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/AppList.scala @@ -13,8 +13,6 @@ package org.apache.pekko.discovery.marathon -import scala.collection.immutable.Seq - object AppList { case class App(container: Option[Container], portDefinitions: Option[Seq[PortDefinition]], tasks: Option[Seq[Task]]) case class Container(portMappings: Option[Seq[PortMapping]], docker: Option[Docker]) diff --git a/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/MarathonApiServiceDiscovery.scala b/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/MarathonApiServiceDiscovery.scala index aa67035a..82397729 100644 --- a/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/MarathonApiServiceDiscovery.scala +++ b/discovery-marathon-api/src/main/scala/org/apache/pekko/discovery/marathon/MarathonApiServiceDiscovery.scala @@ -20,7 +20,6 @@ import pekko.http.scaladsl._ import pekko.http.scaladsl.model._ import pekko.http.scaladsl.unmarshalling.Unmarshal -import scala.collection.immutable.Seq import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.util.Try diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 5ce85c14..e13769d2 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -81,6 +81,8 @@ object Dependencies { "org.apache.pekko" %% "pekko-stream" % pekkoVersion, "org.apache.pekko" %% "pekko-http" % pekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion, + "org.apache.pekko" %% "pekko-testkit" % pekkoVersion % Test, + "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion % Test, "org.scalatest" %% "scalatest" % scalaTestVersion % Test) val discoveryKubernetesApi = Seq(