Graceful shutdown of Kubernetes pod when using istio

Recently I worked on a project for which we noticed a lot of database connection errors when a new version of the pod was deployed on Kubernetes.

At first we thought that the during the shutdown the application cleaned up the database connection pool too soon, but it appeared the application had database connection errors before the connection pool was cleaned up.

Because the shutdown of the application took some time, we had defined in the deployment description a terminationGracePeriodSeconds of 30 seconds. We use an istio sidecar in the pod and it appeared that the istio pod has its own graceful shutdown setting, which is independent of the one of the application container.

When you don’t define a graceful shutdown period for istio, the default is 5 seconds, so while the application was still shutting down, after 5 seconds all external connectivity of the application was broken, leading to errors.

After setting the graceful shutdown period of the istio side car to the same value as the terminationGracePeriodSeconds of the application container, the issue was solved.

To change the graceful shutdown period of the istio container to e.g. 30 seconds, add the following annotions to your template:

proxy.istio.io/config: |
          terminationDrainDuration: 30s

Note that the value of proxy.istio.io/config needs to be in YAML format. For that reason, leaving out the pipe symbol (|) wouldn’t work: it needs to be interpreted as a string.

Combining fixed arguments with variable arguments in Mockito with Scala

When you use Mockito to mock an object and add behavior, you sometimes need to combine variable arguments with fixed arguments. When you do that in this way

when(client.execute("myId1", any[String])

You get as error Invalid use of argument matchers with the explanation that you should use eq as a matcher for the fixed argument.

However, eq defines referential equality in Scala, so you should use ArgumentMatchers.eq instead or use e.g. import org.mockito.ArgumentMatchers.{eq => eqTo} and use eqTo instead of eq. So the example above would become

import org.mockito.ArgumentMatchers.{eq => eqTo}

when(client.execute(eqTo("myId1"), any[String])

Using eventually in an AsyncWordSpec

This is my second post regarding the use of eventually in an AsyncWordSpec. The previous post was about using futures in an eventually block in an AsyncWordSpec. This one deals with how to use eventually in an AsyncWordSpec when you don’t use a future. The way to use eventually is this case applies to ScalaTest 3.10, and is possibly also necessarily for other versions.

Suppose you have the following method

def determineValue: Int = {
....
}

that you want to test and this method will after a while return the desired value 1. In an AnyWordSpec, you would test it in the following way:

class TestSpec extends AnyWordSpec with Eventually with Matchers {
  "determineValue" should {
    "return the correct value" in {
      eventually {
        determineValue shouldBe 1
      }
    }
  }
}

Suppose you’re using an AsyncWordSpec, perhaps because other methods of the class you’re testing return futures, then the following code will call determineValue only once:

class TestSpec extends AsyncWordSpec with Eventually with Matchers {
  "determineValue" should {
    "return the correct value" in {
      eventually {
        determineValue shouldBe 1
      }
    }
  }
}

You’ll also see this from the output of the test: normally when using eventually you see the number of times the method method was tested and the last result of the test when the test fails. In this case not. The fix is simple: wrap the eventually block in a Future.successful:

class TestSpec extends AsyncWordSpec with Eventually with Matchers {
  "determineValue" should {
    "return the correct value" in {
      Future.successful {
        eventually {
          determineValue shouldBe 1
        }
      }
    }
  }
}

When the test fails, you will now see that the test is retried multiple times.

Using different event journal configurations in one actor system

If you use multiple persistent actors with Akka, you normally configure one event journal, but in case you want to configure different event journals for different persistent actors, you need to be careful to configure everything correctly, otherwise the events are persisted, but when the persistent actor is reinstantiated, the state isn’t recovered because the events aren’t replayed.

A use case for wanting to have different configurations is e.g. that you’re using Cassandra as the event store and want different keyspaces for different persistent actors.

To do this, you can make the configuration as usual, overriding the cassandra-journal and cassandra-query-journal settings in the application.conf if necessary.

In the section of the application.conf for your persistent actor (in this example named user), you make the following definitions:

user {
  event-journal = ${cassandra-journal}
  event-journal.keyspace = "user"
  event-journal.query-plugin = "user.custom-cassandra-query-plugin"

  custom-cassandra-query-plugin = ${cassandra-query-journal}
    write-plugin = "user.event-journal"
  }
}

The configuration of the query plugin is very important, without it the events are stored but not replayed. Note that the values of these settings must use the full name of the setting in the application.conf file.

In the cassandra-journal settings in the application.conf, you need to set the keyspace setting to an existing keyspace.

You can now make another configuration with a different keyspace for another persistent actor.

In the persistent actor, you override the journalPluginId and give it as value the full name of the event-journal settings. In the example above, it would be:

override def journalPluginId = "user.event-journal"

With these settings, different persistent actors use different settings within one actor system.

Using futures in eventually with an asynchronous ScalaTest

In ScalaTest you can use asynchronous specs, like AsyncWordSpec. In the tests you can then use the assertions like you do in a non-asynchronous spec, like WordSpec, but if a method you want to test returns a Future, you can map the result of the method to an assertion. E.g.:
class AsyncTestSpec extends AsyncWordSpec with Matchers {
"the method" should {
"return the expected result" in {
determineValue.map(_ shouldBe 1)
}
}
}
In the example above, determineValue return a Future[Int]. If the method that you want to test eventually returns the expected value, but initially not yet, perhaps due to initialization, you would normally use Eventually. Using eventually in an asynchronous test however, doesn’t work as expected, at least in ScalaTest 3.0: the test isn’t executed repeatedly. So the following test is only tried once and not repeatedly like you would expect when using eventually:
class AsyncTestSpec extends AsyncWordSpec with Eventually with Matchers {

private val atMost = 1.second

"the method" should {
"return the expected result" in {
eventually {
determineValue.map(_ shouldBe 1)
}
}
}
The easiest way to solve this, is not using an asynchronous spec with eventually: e.g. use a WordSpec instead and use Await.result in the eventually block. If for some reason, you need to use an asynchronous spec, you must not only use Await.result in the eventually block, but you must also explicitly give an execution context to the method that returns the Future: using the execution context of the asynchronous spec will result in a test that fails because the execution took too long. A complete example of using eventually with an asynchronous spec:
class AsyncTestSpec extends AsyncWordSpec with Eventually with Matchers {
private val customExecutionContext: ExecutionContext = ExecutionContext.fromExecutor(new ForkJoinPool())

private val atMost = 1.second

"the method" should {
"eventually return the expected result" in {
eventually {
implicit val executionContext: ExecutionContext = customExecutionContext
Await.result(determineValue, atMost) shouldBe 1
}
}
}
}
In the example above, determineValue has an implicit execution context as parameter.

Using Akka Streams with Kafka and Avro

The easiest way to use Akka Streams with Kafka is by using the Alpakka Kafka Connector.  The documentation page describes in detail how to use Akka Streams both as a Kafka producer and as a consumer.

Even though Kafka is agnostic regarding to the message format, the preferred message format is Avro and the preferred solution is using it in combination with a Schema Registry.

Avro has a schema so unlike when using JSON, it is known which fields the message contains. Normally the schema is included with the data itself with Avro. Including the schema with every message can have a significant overhead, but when the Schema Registry is used, the schema is registered in the Schema Registry and the message contains a reference to the schema.

You can configure backwards compatibility for your schema and messages with a new schema that isn’t backwards compatible will then be rejected. So you have the guarantee that consumers continue to be able to read messages even though they expect a lower schema version.

Since I couldn’t find a complete example on how to use Akka Streams with Kafka and a Schema Registry for messages in Avro format, I created a sample project which you can find at https://github.com/jaspervz/akkastreams-kafka-example.

The README.md of the project contains information on how to run the project.

Loading a configuration in Scala

For reading a configuration in Scala, TypeSafe Config is often used. This blog post discusses the  problems with TypeSafe Config and a better alternative, PureConfig.

As an example I’m using the following small configuration file:

database {
  schema = "jdbc:postgresql://localhost:5432/my_database"
  user = "username"
  password = "pw"
}

TypeSafe Config

TypeSafe Config is a well known and often used library to read configuration files for an application. The disadvantage of this library is that you need to read the individual configuration parameters manually and you can get an error when you read a configuration parameter. If you don’t read all configuration parameters at the start of your program, you could get an exception when reading a configuration parameter during running your program.

This is how you would read the sample configuration file using TypeSafe Config:

val config = ConfigFactory.load()
val schema = config.getString("database.schema")
val user = config.getString("database.user")
val password = config.getString("database.password")

println(s"schema $schema")
println(s"user $user")
println(s"password $password")

PureConfig

PureConfig reads TypeSafe Config’s configuration files, so both libraries use the same configuration file format, only PureConfig makes it easier to read your configuration and to detect errors in your configuration: you define case classes matching your configuration and use pureconfig.loadConfig[Config] where Config is your configuration case class to read the configuration.

Reading the configuration in this way when starting your program will get an Either of configuration failures or your Config case class. Reading the configuration in this way you can ensure that the configuration is as expected before starting the rest of your program.

This is how you would read the sample configuration file using PureConfig:

case class DatabaseConfig(schema: String, user: String, password: String)

case class Config(database: DatabaseConfig)

pureconfig.loadConfig[Config] match {
  case Left(configReaderFailures) =>
    sys.error(s"Encountered the following errors reading the configuration: ${configReaderFailures.toList.mkString("\n")}")
  case Right(config) =>
    println(s"schema ${config.database.schema}")
    println(s"user ${config.database.user}")
    println(s"password ${config.database.password}")
}

Configuration for different environments

You typically have different configurations for different environments. E.g. the host for your database and username and password for it usually differ for a development and a production environment. You can customize a configuration for a different environment in two different ways:

  • Use a different configuration file per environment to override default values.
  • Use environment variables to override default values.

Using a different configuration file per environment

Use a reference.conf file in your project and create an application.conf (or named differently) for your different environments. The first line of your configuration should be include "reference.conf". This loads the default values from the reference.conf file so you only need to override values when the default values aren’t appropriate.

Here’s an example that overrides the database password:

include "reference.conf"

database.password="?*%b~gA-}HT!b=3'"

To use the configuration file, set the config.file system property to the path of your configuration file when you run the application, e.g.:

sbt -Dconfig.file=application.conf run

Local development

For local development you can create a file with your overrides and use the config.file property to use it. If everybody working on the project uses the same name for this file (e.g. development.conf), you can add it to .gitignore to prevent that it accidentally ends up in git.

Use environment variables to override default values

Use again a reference.conf file in your project and include for the settings that possibly need be overridden an environment variable, which if present, will override the default setting, e.g.:

password = "pw"
password = ${?DATABASE_PASSWORD}

The question mark before DATABASE_PASSWORD makes the presence of the environment variable optional. So if the environment variable DATABASE_PASSWORD is present, it will be used, otherwise the default value pw will be used.

Before starting the program, make sure the environment variables have been defined, e.g. (for Linux):

DATABASE_PASSWORD=secret
export DATABASE_PASSWORD
sbt run

Omitting the question mark is generally not a good idea because this forces someone who wants to run the application locally to set environment variables. It is better to always have sensible defaults.

Local development

For local development you can use the sbt plugin sbt-dotenv. This allows you to create a .env file in which you set your environment variables. Adding this file to .gitignore ensures that is doesn’t accidentally ends up in git. It is a good practise to provide a .env.sample file with the environment variables and the default settings so every developer can use this to create his or her .env file.

Combining both methods

Sometimes when you use a different configuration file per environment you also may want to use environment variables because you don’t want passwords to be available in plain text in a configuration file, but read these from a vault and set these through environment variables.

Code example

A project with sample code is available at https://github.com/jaspervz/configexample

Integration tests using databases in Scala

If you use a SQL or NoSQL database, you want to have an integration test in place that tests the code that uses the database. One way to do this, is to require that an actual database is running, but this requires a database to be running on your build server and you need to be sure that the database is in the desired state before each test. Furthermore, if multiple builds run simultaneously using the same database, the results are unpredictable.

Sometimes the in memory SQL database H2 is used for tests, configured in such a way that it mimics the behavior of the actual database. For NoSQL databases this can’t be done and for SQL databases this is not a good idea because you’re never entirely sure that H2 will exactly mimic the behavior so testing with an actual database is still necessary.

The easiest way to create integration tests using a database is having the test start a Docker container running the database. Starting and running the Docker is lightweight and can be done on the build server. Furthermore, having a new container for every test run guarantees that the database is in a known state before the test starts.

There are a few libraries to make starting and stopping a Docker container before and after your test easier. A well known one is Testcontainers. For this library there is also the Scala wrapper Testcontainers-scala available.

Using Testcontainers-scala

If you want to use Testcontainers-scala for your Scala integration tests, add the following library to your build.sbt:

"com.dimafeng" %% "testcontainers-scala" % TestcontainersVersion % "test"

Using ScalaTest you can now extend ForAllTestContainer if you want the container to be started before all tests and stopped after all tests or ForEachTestContainer if you want the container to be started and stopped for each test.

You need to override the val container to provide the container to start. E.g. for a test using PostgreSQL:

class MySpec extends FlatSpec with ForAllTestContainer {
 override val container = PostgreSQLContainer()
...
}

There are couple of containers defined for frequently used SQL databases.

To connect to PostgreSQL in a test, you use container.jdbcUrl. This will return a JDBC URL with the correct host and port to connect to the Docker container running PostgreSQL.  Use container.username and container.password for the username and password of the database.

Note: these values are only available after the container has been started, so use a lazy val if you define a class variable using these values.

Using a generic container

If you use a database for which there is no standard container class available, you need to use the GenericContainer class to define your own. GenericContainer.apply has several parameters you can set. The most important ones are the imageName, exposedPorts, and waitStrategy.

imageName obviously defines the docker image to use. exposedPorts is a Seq[Int] of internal ports that are mapped and exposed to outside the container and waitStrategy defines how to wait until the container has fully started before running the tests.

There are different wait strategies available and you could also implement your own, but one of the available ones will probably suit your needs. The default strategy is HostPortWaitStrategy, which waits for exposed ports to be available.

Another one is the HttpWaitStrategy which waits for a path to return a defined status code.

Finally, there is LogMessageWaitStrategy, which waits for a message to appear in the output.

As an example, we’re looking at how to define a container for Aerospike, a key/value NoSQL database. Aerospike listens by default on port 3000 for connections. Using the HostPortWaitStrategy, however doesn’t work because if Aerospike is already listening on this port, it hasn’t fully started yet, so connecting at that time would result in errors. We use the LogMessageWaitStrategy instead to wait for the log message migrations: complete to appear in the output.

So we create the Aerospike container in our test class with:

override val container = GenericContainer(
 imageName = "aerospike:latest",
 exposedPorts = Seq(3000),
 waitStrategy = new LogMessageWaitStrategy().withRegEx(".*migrations: complete.*\\s"))

Using container.containerIpAddress we get the host and with container.mappedPort(3000) we get the port. Using the host and port we can now create a connection to the Aerospike container.

cats’ IO Monad

As a functional programmer, we want to push the side effects to the border of our program, to reason easier about our program. We want to use pure functions, so when reasoning about a program, we can replace the function call with the result, without changing the meaning of the program.

Using standard Scala futures doesn’t accomplish this because a future is normally executed right away: we don’t have any control over when the execution starts and if it contains a side effect (e.g. access a database or read from a file) this side effect is executed right away.

To complish pureness and to know which part of our programs contain side effects when executed, we can use an IO monad. There are a couple available for Scala. Here we look at the new IO monad for cats,

Using this IO monad we have control over when the side effect is executed. Compare the following two code fragments:

val a = Future {
  println("something)
  5
}

And

val a = IO {
  println("something")
  5
}

In the first case something is printed and in the second case we’ve only described the side effect and not actually run the code. We can run the code using a.unsafeRunSync().

This is something that is typically performed in one place in your program: you normally compose the side effects until you have the complete program and in your main function, you run the side effects.

In code, when you see that a function returns an IO, this means that a side effect is contained in the IO and will be performed when running it. Using an IO monad to contain side effects not only makes the code more functional, it also clearly marks which parts of the code, when executed, will perform side effects.

Composing IO monads is easy: because they are monads, you can use a for comprehension, e.g.:

def getPlayer(): IO[Player] = {
  // Read player from database.
  ...
}

def getHighscore(playerId, gameId: Int): IO[Option[Int]] = {
  // Read high score from database
  ...
}

val result: IO[Option[Int]] = for {
  player <- getPlayer(playerId)
  highscore <- getHighscore(playerId, player.favoriteGameId)
} yield highscore

We have now composed a side effect that gets the highscore for the player’s favorite game.

When using the IO monad, we must be aware not to block the program: if we use a blocking API inside an IO monad, this will block the current thread. When using blocking operations, we need to use a separate execution context for the blocking operation and afterwards switch back to the original execution context.

Using IO.shift we can shift to a different execution context:

def blockingOperation(): String = {
  // Read from file
  ...
}

val result: IO[String] = for {
  _ <- IO.shift(executionContextForBlockingOperations)
  result <- IO { blockingOperation() }
  _ <- IO.shift(mainExecutionContext)
} yield result

The execution context for blocking operations will usually not have a fixed number of threads, unlike the main execution context, but will grow and shrink depending upon the number of blocking operations that are being executed.

Since the 0.9 version of Cats Effect, it is possible to perform multiple operations in parallel, provided that the IO contains an asynchronous operation, see also the parallelism section of the Cats Effect site.

Suppose we want to retrieve all the friends of a player in parallel:

// Enable usage of .parSequence
import cats.syntax.all._

def getPlayer(playerId: Int): IO[Player] = {
  // Read player from database.
  ...
}

val friendIds: NonEmptyList[Int] = player.friendIds
val getPlayers: NonEmptyList[IO[Player]] = friendIds.map(getPlayer)
val friends: IO[NonEmptyList[Player]] = getPlayers.parSequence

We now retrieve all the friends in parallel and combine them in one side effect contained in an IO monad (for simplicity in this example, we assume that players have at least one friend, hence the NonEmptyList. For a normal List, the parSequence isn’t available).

There are several libraries that can be used with cats’ IO Monad. E.g. http4s , which provides functional, streaming HTTP, and doobie, which provides pure functional access to SQL databases, can be used with cats’ IO Monad. Note that all these libraries are generic in which IO Monad they use, so you could e.g. also use the one from Scalaz.