Skip to content

Commit

Permalink
fix: shutdown executors as job finished
Browse files Browse the repository at this point in the history
  • Loading branch information
betarixm committed Dec 11, 2023
1 parent 49d0b7a commit 5b1a42f
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions master/src/main/scala/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kr.ac.postech.paranode.utils.Progress._
import org.apache.logging.log4j.scala.Logging

import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor
Expand All @@ -28,10 +29,18 @@ object Master extends Logging {
masterArguments.numberOfWorkers
)

val executor = Executors.newCachedThreadPool()

val executionContext: ExecutionContext =
ExecutionContext.fromExecutor(executor)

Await.result(
master.run()(ExecutionContext.global),
master.run()(executionContext),
scala.concurrent.duration.Duration.Inf
)

executor.shutdown()
executor.awaitTermination(10, java.util.concurrent.TimeUnit.SECONDS)
}

}
Expand All @@ -54,10 +63,10 @@ class Master(host: String, port: Int, numberOfWorkers: Int) extends Logging {
s"numberOfWorkers: ${numberOfWorkers}\n"
)

val serviceExecutor = Executors.newCachedThreadPool()

val serviceExecutionContext: ExecutionContextExecutor =
ExecutionContext.fromExecutor(
Executors.newCachedThreadPool()
)
ExecutionContext.fromExecutor(serviceExecutor)

val server =
new GrpcServer(
Expand Down Expand Up @@ -87,10 +96,12 @@ class Master(host: String, port: Int, numberOfWorkers: Int) extends Logging {
WorkerClient(worker.host, worker.port)
}

val requestExecutor = Executors
.newFixedThreadPool(registeredWorkers.size)

val requestExecutionContext: ExecutionContextExecutor =
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors
.newFixedThreadPool(registeredWorkers.size)
ExecutionContext.fromExecutor(
requestExecutor
)

logger.info(s"[Master] Clients: $clients")
Expand Down Expand Up @@ -143,6 +154,18 @@ class Master(host: String, port: Int, numberOfWorkers: Int) extends Logging {
server.stop()

serverState.update(_ => Progress.Finished)

serviceExecutor.shutdown()
requestExecutor.shutdown()

serviceExecutor.awaitTermination(
5,
TimeUnit.SECONDS
)
requestExecutor.awaitTermination(
5,
TimeUnit.SECONDS
)
}

def blockUntilRunning(): Unit = {
Expand Down

0 comments on commit 5b1a42f

Please sign in to comment.