Android Room은 어떻게 Flow를 지원할 수 있었을까?
Room의 room-ktx
종속항목을 추가하면 Kotlin Extension과 Coroutine 지원을 받을 수 있게 된다. 이는 가져다쓰는 개발자 입장에서도 굉장히 편리해지는데, 그중에서도 Flow의 지원은 DB내의 데이터가 바뀔때마다 쿼리를 실행해서 결과값을 방출 할 수 있도록 해준다.
implementation "androidx.room:room-ktx:$room_version"
데이터의 변화가 일어났을때 이를 감지해서 새로운 Flow를 방출하는 구조를 짜고싶었는데, 문득 Room이 생각나서 내부 구현을 찾아보게 되었다.
@JvmStatic
public fun <R> createFlow(
db: RoomDatabase,
inTransaction: Boolean,
tableNames: Array<String>,
callable: Callable<R>
): Flow<@JvmSuppressWildcards R> = flow {
coroutineScope {
val observerChannel = Channel<Unit>(Channel.CONFLATED)
val observer = object : InvalidationTracker.Observer(tableNames) {
override fun onInvalidated(tables: Set<String>) {
observerChannel.trySend(Unit)
}
}
observerChannel.trySend(Unit)
val queryContext = coroutineContext[TransactionElement]?.transactionDispatcher
?: if (inTransaction) db.transactionDispatcher else db.getQueryDispatcher()
val resultChannel = Channel<R>()
launch(queryContext) {
db.invalidationTracker.addObserver(observer)
try {
for (signal in observerChannel) {
val result = callable.call()
resultChannel.send(result)
}
} finally {
db.invalidationTracker.removeObserver(observer)
}
}
emitAll(resultChannel)
}
}
Room에는 InvalidationTracker라는 존재가 있는데 이는 쿼리에 의해 수정된 테이블 목록을 관리하고 이 테이블들의 목록을 옵저버들에게 알리는 역할을 한다.
코드에서 observer
선언부를 보면 InvalidationTracker에서 테이블의 수정이 일어날 때마다observerChannel
로 Unit
신호를 보내는 것을 확인할 수 있다. observerChannel
은 Channel(Channel.CONFLATED)
로, 가장 최신의 신호 하나만 유지하는 채널이다.
try {
for (signal in observerChannel) {
val result = callable.call()
resultChannel.send(result)
}
} finally {
db.invalidationTracker.removeObserver(observer)
}
Channel을 for
문으로 돌리면 새로운 신호가 전송될 때까지 기다리게 된다.observerChannel
로 새로운 신호가 보내지면 callable.call()
을 통해 다시 쿼리를 실행하고 resultChannel
에 해당 쿼리에 대한 결과값을 담아서 최종 Flow에 방출하게 된다. (emitAll
부분)
이러한 패턴을 활용하면 내가 원하는 조건에서 원하는 함수가 실행되어 Flow가 방출되도록 할 수 있다. 활용할 부분은 무궁무진하다.