Fixing Akka Http "Substream Source cannot be materialized more than once" error

 ( 10 min read ) 

Last week I was mostly heads-down trying to patch a distributed service we use at work that was having problems.

Its a Scala app that uses Akka Streams to transfer large, multi-GB, files from one service to another via download and upload requests. A simple service that would help you archive a compressed zip that you wanted to keep in a storage bucket to extend its retention date.

Akka Streams can easily handle the job and transfer files that are larger than the memory capacity of the container itself. And it also let us scale horizontally to keep up with the constant demand put on the service. But there was just one problem.

I kept seeing an exception thrown that would cause some containers to crash:

[ERROR] Outgoing request stream error
java.lang.IllegalStateException: Substream Source(EntitySource) cannot be materialized more than once
    at akka.stream.impl.fusing.SubSource$$anon$13.createMaterializedTwiceException(StreamOfStreams.scala:830)
    at akka.stream.impl.fusing.SubSource$$anon$13.<init>(StreamOfStreams.scala:801)
    at akka.stream.impl.fusing.SubSource.createLogic(StreamOfStreams.scala:797)
    at akka.stream.stage.GraphStage.createLogicAndMaterializedValue(GraphStage.scala:107)
  # ...
[ERROR] Slot execution failed. That's probably a bug. Please file a bug at https://github.com/akka/akka-http/issues. Slot is restarted.
java.lang.IllegalStateException: Cannot open connection when slot still has an open connection
    at akka.http.impl.engine.client.pool.NewHostConnectionPool$HostConnectionPoolStage$$anon$1$Slot.openConnection(NewHostConnectionPool.scala:389)
    at akka.http.impl.engine.client.pool.SlotState$UnconnectedState.onNewRequest(SlotState.scala:182)
    at akka.http.impl.engine.client.pool.SlotState$UnconnectedState.onNewRequest$(SlotState.scala:181)
    at akka.http.impl.engine.client.pool.SlotState$Unconnected$.onNewRequest(SlotState.scala:192)
    # ...

Unfortunately, the stacktrace here doesn’t show which line of the service’s code is the root cause. So I’d have to check all usages of Source[ByteString, Any] and find any reuses of that object and remove it.

When you Google this exception, most encounters other people have with this exception are from Akka Http server-side code that doesn’t properly unmarshall incoming requests. i.e. They didn’t call toStrict on request entity. But my service wasn’t having that problem. It was coming from the Akka Http client-side calls. Somewhere in the code to upload files it was consuming the Source stream twice.

 

Fixing Retries

My first thought, was because of the retry mechanism.

If a failure or network disconnect occurred during the file transfer stream, it would try the transfer again. The code basically looked like this:

/**
 * Transfers a file from the source URL to the destination URL.
 */
def transferFileStream1(downloadUrl: String,
                       uploadUrl: String,
                       downloadMethod: HttpMethod = HttpMethods.GET,
                       uploadMethod: HttpMethod = HttpMethods.PUT,
                       retryPolicy: retry.Policy = retry.Backoff(2, 1.second)
                      ): Future[Either[String, Done]] = {
  // start download stream, and retry on failure
  retryPolicy { // <-------------------------------------------- retry!
    downloadStream(downloadUrl, downloadMethod)
  }.flatMap {
    case Left(error) =>
      Future.successful(Left(error))

    case Right((contentLength, stream)) =>
      // start upload stream, and retry on failure
      retryPolicy { // <---------------------------------------- retry!
        uploadStream(uploadUrl, uploadMethod, contentLength, stream)
      }
  }
}

private def downloadStream(url: String,
                           method: HttpMethod): Future[Either[String, (Long, Source[ByteString, Any])]] = {
  Http().singleRequest(HttpRequest(
    method = method,
    uri = url,
    entity = HttpEntity.Empty
  )).flatMap {
    case resp@HttpResponse(code, _, _, _) if !code.isSuccess =>
      resp.entity.discardBytes().future.map { _ =>
        Left(s"Error during Download. Code: ${code.intValue}")
      }

    case resp@HttpResponse(code, _, _, _) if code.isSuccess() =>
      // Get the size of the download stream
      val contentLengthOp = resp.getHeader("Content-Length")
      val cl = if (contentLengthOp.isPresent) {
        contentLengthOp.get().value().toLong
      } else {
        0L
      }

      Future.successful(if (cl <= 0) {
        Left(s"Expected download stream Content-Length > 0, but got: $cl")
      } else {
        Right((cl, resp.entity.dataBytes))
      })
  }
}

private def uploadStream(url: String,
                         method: HttpMethod,
                         contentLength: Long,
                         stream: Source[ByteString, Any]): Future[Either[String, Done]] = {
  Http().singleRequest(HttpRequest(
    method = method,
    uri = url,
    entity = HttpEntity(
      contentType = ContentType(MediaTypes.`application/octet-stream`),
      contentLength = contentLength,
      data = stream
    )
  )).flatMap {
    case resp@HttpResponse(code, _, _, _) if !code.isSuccess =>
      resp.entity.discardBytes().future.map { _ =>
        Left(s"Error during Upload. Code: ${code.intValue}")
      }

    case resp@HttpResponse(code, _, _, _) if code.isSuccess() =>
      resp.entity.discardBytes().future.map { _ =>
        Right(Done)
      }
  }
}

Notice the retryPolicy with default retry.Backoff(2, 1.second) settings.

It basically checks the result of the inner function and determines if a retry is needed. So since the result is an Either[String, Done] type, it will retry on Left errors and pass-through on Right objects.

This looks fine at a glance… but its wrong.

I have a retry around the uploadStream method. So if a failure is encountered during upload, like EntityStreamException, TimeoutException, or SocketException, then it would try again. But that means I would be consuming the Source stream more than once, and bam! IllegalStateException: Substream Source(EntitySource) cannot be materialized more than once happens. So I needed to fix this method to re-download if a failure occurs during upload. Rather than try to re-used the same stream again.

This was easy to fix; I just moved the retryPolicy to wrap the overall function rather than individual requests.

/**
 * Transfers a file from the source URL to the destination URL.
 */
def transferFileStream(downloadUrl: String,
                       uploadUrl: String,
                       downloadMethod: HttpMethod = HttpMethods.GET,
                       uploadMethod: HttpMethod = HttpMethods.PUT,
                       retryPolicy: retry.Policy = retry.Backoff(2, 1.second)
                      ): Future[Either[String, Done]] = {
  retryPolicy { // <------------------------------ retry if any part of this function fails
    // start download stream
    downloadStream(downloadUrl, downloadMethod)
      .flatMap {
        case Left(error) =>
          Future.successful(Left(error))

        case Right((contentLength, stream)) =>
          // start upload stream
          uploadStream(uploadUrl, uploadMethod, contentLength, stream)
      }
  }
}

Ok, that looks better.

After a adding a unit-test and a quick code-review by my team, I merged the change and deployed the new service. However, to my surprise the exception occurred again…

WHAT?! How?

I’m clearly not consuming the source stream twice anymore. Where the heck is this trying to reuse the original source stream?

  • The Stacktrace is not useful here since it doesn’t have line numbers to my code
  • The Exception occurred intermittently in the service, maybe once or twice every 10 minutes
  • It didn’t occur on any particular inputs, i.e. really large files, certain file types, etc.

 

Removing Retries

I proceeded to spend the majority of a day tracking down this error. I was tracing requests logs, comparing files and metrics with other instances of the service, hoping to find something. I setup the service on my MacBook and tried to reproduce the exception with fake data at first, then used real data afterwards. Eventually, after repeated attempts of transferring large files, I got the IllegalStateException to occur.

I was able to reproduce the error was by disconnecting the network connection for 17 seconds. I would start an instance at my desk and begin a stream transfer and let it begin uploading for a few seconds, then disconnect from the WiFi, wait, then reconnect.

This caused Akka’s request-timeout = 20 s to be tripped, and a second request was attempted. Only, I had previously disabled my retryPolicy using retry.Directly(0), so there shouldn’t be a second request after a failure like the WiFi getting disconnected, or so I thought.

I missed the fact that Akka Http has its own retry mechanism built-in. Its listed in the Akka Http Configuration as akka.http.host-connection-pool.max-retries.

The docs clearly state:

# The maximum number of times failed requests are attempted again,
# (if the request can be safely retried) before giving up and returning an error.
# Set to zero to completely disable request retries.
max-retries = 5

Ahh! So I wasn’t explicitly retrying, but Akka was. How did I miss this?

After cursing for a bit and getting over the strong impostor syndrome feeling, I flipped it to 0 retries and tried my test again and it threw a different exception:

akka.io.TcpOutgoingConnection$$anon$2: Connect timeout of Some(30 seconds) expired
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
Caused by: akka.io.TcpOutgoingConnection$$anon$2: Connect timeout of Some(30 seconds) expired

This is good! I can work with this.

Now I can re-enable my retryPolicy to recover from this error and try the transfer again. Thus, I fixed the transfer service from crashing anymore. Hoorah!

The reason the original exception occurred intermittently, was simply because the download service was having trouble staying connected during high-traffic/load, and it would just disconnect. Most likely from common “network was lost” type of situations that are unavoidable in distributed systems.

Akka Http will then retry requests in those situations, making life easier for users. However, for my service it was the cause of the Source stream to be consumed twice. Simply because Akka Http was just trying to be helpful.

 

Thoroughly Check your Akka Configs

Admittedly, I should have read the Akka Http Configuration more carefully. There are lots of settings that can make or break your service if used incorrectly. But now this is a mistake I shouldn’t encounter ever again, and hopefully you won’t either.



Published: May 10, 2020
Category: code
Tags: scala, akka