こんにちは。コミュニケーションアプリ「LINE」のモバイルクライアントを開発している大石です。
この記事は、"Review Committee Report" 共有の連載第 71 回です。LINEヤフー社内には、高い開発生産性を維持するための Review Committee という活動があります。ここで集まった知見を定期的に社内に共有しており、その一部を本ブログ上でも公開しています。(Review Committee Report の詳細については、過去の記事一覧を参照してください)
Mutexの競合に注意
不定期に発生するイベントを以下のルールに従って処理するコードを書きたいとします。
- 一度イベントを処理したら、次のイベントを処理するまで一定のインターバルを空ける必要がある。
- このインターバル中に複数のイベントが発生した場合は、それらをまとめて一度に処理する。
この動作を図示すると以下のようになります。
この挙動は、debounceやthrottleといった操作に似ていますが、途中のイベントが削除されることがないようになっています。
これを実現するコードを以下のように実装してみました。
class EventProcessor(
// Injectable for unit testing.
private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
) {
private val queuingMutex: Mutex = Mutex()
private val consumingMutex: Mutex = Mutex()
@GuardedBy("queuingMutex")
private val eventQueue: MutableList<Event> = mutableListOf()
fun postEvent(event: Event) {
coroutineScope.launch {
enqueue(event)
consumeEvents()
}
}
private suspend fun enqueue(event: Event): Unit = queuingMutex.withLock {
eventQueue.add(event)
}
private suspend fun consumeEvents() {
if (!consumingMutex.tryLock()) {
// この場合、別のコルーチンが `consumeEvents()` を実行中だとわかるのでスキップする。
return
}
try {
while (true) {
val events = dequeueAll()
if (events.isEmpty()) {
break
}
processEvents(events)
delay(THROTTLING_DELAY_MILLIS)
}
} finally {
consumingMutex.unlock()
}
}
private suspend fun dequeueAll(): List<Event> = queuingMutex.withLock {
val events = eventQueue.toList()
eventQueue.clear()
events
}
private suspend fun processEvents(events: List<Event>) {
TODO()
}
companion object {
private const val THROTTLING_DELAY_MILLIS: Long = 1000
}
}
このコードにある問題がわかりますでしょうか?
tryLock の落とし穴
上記のコードにはレースコンディション(競合状態)の問題があります。それは以下のようなシナリオです。
コルーチンA | コルーチンB |
---|---|
enqueue(eventA) | |
consumingMutex.tryLock() が true を返す | |
dequeueAll() が [eventA] を返す | |
processEvents([eventA]) | |
delay(THROTTLING_DELAY_MILLIS) | |
dequeueAll() が [] を返す | |
enqueue(eventB) | |
consumingMutex.tryLock() が false を返す | |
consumingMutex.unlock() | |
(eventB がキューに残ったまま、AもBも終了してしまう) |
これは「tryLock()
が失敗したときは他のコルーチンが実行中だとみなし、自身の処理をスキップする」というロジックに起因する問題です。
そのため、このレースコンディションは tryLock()
を使わずに withLock { ... }
を普通に使用することで解消できます。さらに、そうすることで consumeEvents()
内の while (true)
ループも不要になります。
最終的に、修正後の consumeEvents()
は以下のようになります。
private suspend fun consumeEvents(): Unit = consumingMutex.withLock {
val events = dequeueAll()
if (events.isNotEmpty()) {
processEvents(events)
delay(THROTTLING_DELAY_MILLIS)
// もしここの時点で eventQueue が空でないのなら、キューにイベントを追加した別のコルーチンが居ることを意味し、
// そのコルーチンが次にこの consumeEvents() の処理を実行することが保証されている。
// よって、現在のコルーチンはここで単に終了してしまってよい。
}
}
tryLock の正しい使い方
tryLock()
の使用には注意が必要です。これは Kotlin に限らず、どのようなプログラミング言語でも同様です。
たとえば、Go言語の TryLock
には以下のような警告が記載されています。
https://pkg.go.dev/sync#Mutex.TryLock
(訳) TryLock の正しい使い方は確かに存在するが、それは稀であり、TryLock の使用は大抵 Mutex の使い方に根本的な問題があることの兆候である。
tryLock()
は通常、複数の Mutex をロックする際のデッドロック回避のために使用されます。つまり、複数の Mutex に tryLock()
を試みて一つでも失敗したら、すべて unlock()
してから適当なランダム遅延の後にはじめから再試行するような方法が、tryLock()
の典型的なユースケースです。
このように、tryLock()
に失敗した場合は 、必ず後で再試行するようになっていなければなりません。再試行するようになっていない場合、その tryLock()
の使い方は誤っていると考えるべきです。
補足
今回の題材となったアルゴリズムは Kotlin Coroutines の Flow
を使えば簡潔に書けるのでは?と思った方もいるかもしれません。
実際、こうした用途に適した Flow オペレータを追加しようという提案についての議論があります。
https://github.com/Kotlin/kotlinx.coroutines/issues/1302
しかし、汎用的なオペレータの設計は難しいようで、議論はあまり進展していないようです。
一言まとめ
Mutex.tryLock()
の使用には注意が必要です。誤った使い方はレースコンディションを引き起こします。
キーワード: race condition
, mutex
, tryLock