I'm building a new blog site with Ktor, a Kotlin-native HTTP framework. It's an asynchronous framework built on top of Kotlin coroutines. So far I've liked it's simple, unopinionated interfaces that allow you to structure your application however you want.
Ktor is configured via a builder pattern, specifically a type-safe builder enabled by Kotlin's lambda functions with receiver object syntax. The code looks a little peculiar on first glance, but it makes sense once you understand the implicit receiver type declared in all the builders of the library (and associated plugins). It's basically like an explicit, compile-time declared context. It enforces the scope of all the code you write. Combined with Kotlin's extension function syntax, you can end up with really clean, declarative code.
Anyway, one issue I was dealing with lately was trying to get multiple files uploaded at once, in a single multipart request. This is how the Ktor documentation shows to how to do it:
post("/upload") {
val multipartData = call.receiveMultipart()
multipartData.forEachPart { part ->
when (part) {
is PartData.FormItem -> {
fileDescription = part.value
}
is PartData.FileItem -> {
fileName = part.originalFileName as String
val fileBytes = part.provider().readRemaining().readByteArray()
File("uploads/$fileName").writeBytes(fileBytes)
}
else -> {}
}
part.dispose()
}
So I'm trying to do the same thing, but the difference in my situation is that I need information from the PartData.FormItem
to process each file upload. So in the interim, I store each FileItem
in a list to be processed afterwards.
This is my code:
val mediaParts = mutableListOf<PartData.FileItem>()
var postId: String? = null
call.receiveMultipart().forEachPart { part ->
when(part) {
is PartData.FileItem -> {
mediaParts.add(part)
}
is PartData.FormItem -> {
postId = part.value
}
else -> {
//noop
}
}
}
mediaParts.forEach { part ->
val bytes = part.provider().readRemaining().readByteArray()
mediaRepository.savePostMedia(postId, part.originalFileName!!, bytes)
}
But for some reason, the files keep getting saved with 0 bytes:
j@flow:~/Development/flow2/src/main/resources/media$ ls -l 6769f048aa1cfe128ed672ee/
total 0
-rw-rw-r-- 1 j j 0 Dec 26 17:08 cover.png
-rw-rw-r-- 1 j j 0 Dec 26 17:08 lb.png
-rw-rw-r-- 1 j j 0 Dec 26 17:08 srle_2003.png
What's happening here? I see in the network request from the browser that the file data is being sent correctly with the request, so something funky is happening when trying to save the files to disk.
First step of debugging is always adding some logs. So I add a log in the forEach
loop:
mediaParts.forEach { part ->
val bytes = part.provider().readRemaining().readByteArray()
log.info("Saving media file ${part.originalFileName} for post $postId with ${bytes.size} bytes")
mediaRepository.savePostMedia(postId, part.originalFileName!!, bytes)
}
Which spits out:
2024-12-26 17:27:08.822 [eventLoopGroupProxy-4-1] INFO Application - Saving media file srle_2003.png for post 6769f048aa1cfe128ed672ee with 0 bytes
2024-12-26 17:27:08.841 [eventLoopGroupProxy-4-1] INFO Application - Saving media file lb.png for post 6769f048aa1cfe128ed672ee with 0 bytes
2024-12-26 17:27:08.841 [eventLoopGroupProxy-4-1] INFO Application - Saving media file cover.png for post 6769f048aa1cfe128ed672ee with 0 bytes
OK, so bytes
is clearly empty.
If I change my code to execute the same way the documentation example works, the files get saved successfully!
call.receiveMultipart().forEachPart { part ->
when(part) {
is PartData.FileItem -> {
val bytes = part.provider().readRemaining().readByteArray()
mediaRepository.savePostMedia("foo", part.originalFileName!!, bytes)
}
is PartData.FormItem -> {
postId = part.value
}
else -> {
//noop
}
}
}
Of course, in place of the postId
argument for savePostMedia
, I use a dummy value in my test. But this works. So the question is, why can I read the file data outside of this forEachPart
lambda? My original code keeps a reference to the PartData.FileItem
and then loops through them later. Why doesn't that work?
It's because call.receiveMultipart()
returns a MultipartData
object. Let's look at the source code for it's forEachPart
method:
/**
* Transforms the multipart data stream into a [Flow] of [PartData].
* * @return a [Flow] emitting each part of the multipart data until the end of the stream. */
public fun MultiPartData.asFlow(): Flow<PartData> = flow {
while (true) {
val part = readPart() ?: break
emit(part)
}
}
/**
* Parse multipart data stream and invoke [partHandler] for each [PartData] encountered. * @param partHandler to be invoked for every part item
*/
public suspend fun MultiPartData.forEachPart(partHandler: suspend (PartData) -> Unit): Unit =
asFlow().collect(partHandler)
So under the hood, it's converting the multipart data stream into a Flow and calling collect
on that Flow with the supplied lambda function. Flows are a native feature of the Kotlin Coroutine library.
Flows are asynchronous data streaming primitive to the language. A cold stream apparently. They operate like generators, but combined with coroutines they enable really suspendable code.
Digging into the Ktor source code, I find the problem is in readPart
:
override suspend fun readPart(): PartData? {
previousPart?.dispose?.invoke()
while (true) {
val event = events.tryReceive().getOrNull() ?: break
eventToData(event)?.let {
previousPart = it
return it
}
}
return readPartSuspend()
}
As part of reading the next part, the previous part's dispose
function is called. This discards the body content of the previous part and allows the memory to be reclaimed.
So, for my case I had to modify my code to read each file part's data as it's emitted and store that in memory before I can save it:
val files = mutableMapOf<String, ByteArray>()
var postId: String? = null
call.receiveMultipart().forEachPart { part ->
when(part) {
is PartData.FileItem -> {
files[part.originalFileName!!] = part.provider().readRemaining().readByteArray()
}
is PartData.FormItem -> {
postId = part.value
}
else -> {
//noop
}
}
}
if (postId == null || files.isEmpty()) {
call.respond(HttpStatusCode.BadRequest)
return@post
}
files.forEach { filename, data ->
mediaRepository.savePostMedia(postId, filename, data)
}
Of course, I could also change the API signature to retrieve the post ID in the upload endpoint's path parameters instead, so I could actually save each file to disk as it's processed. Needless optimization so i'll save it for another day.