프로그래밍/Android

Android Room은 어떻게 Flow를 지원할 수 있었을까?

Lou Park 2024. 9. 14. 00:03

Room의 room-ktx 종속항목을 추가하면 Kotlin Extension과 Coroutine 지원을 받을 수 있게 된다. 이는 가져다쓰는 개발자 입장에서도 굉장히 편리해지는데, 그중에서도 Flow의 지원은 DB내의 데이터가 바뀔때마다 쿼리를 실행해서 결과값을 방출 할 수 있도록 해준다.

implementation "androidx.room:room-ktx:$room_version"

 

데이터의 변화가 일어났을때 이를 감지해서 새로운 Flow를 방출하는 구조를 짜고싶었는데, 문득 Room이 생각나서 내부 구현을 찾아보게 되었다.

@JvmStatic  
public fun <R> createFlow(  
    db: RoomDatabase,  
    inTransaction: Boolean,  
    tableNames: Array<String>,  
    callable: Callable<R>  
): Flow<@JvmSuppressWildcards R> = flow {  
    coroutineScope {  
        val observerChannel = Channel<Unit>(Channel.CONFLATED)  
        val observer = object : InvalidationTracker.Observer(tableNames) {  
            override fun onInvalidated(tables: Set<String>) {  
                observerChannel.trySend(Unit)  
            }  
        }  
        observerChannel.trySend(Unit)
        val queryContext = coroutineContext[TransactionElement]?.transactionDispatcher  
            ?: if (inTransaction) db.transactionDispatcher else db.getQueryDispatcher()  
        val resultChannel = Channel<R>()  
        launch(queryContext) {  
            db.invalidationTracker.addObserver(observer)  
            try {               
                for (signal in observerChannel) {  
                    val result = callable.call()  
                    resultChannel.send(result)  
                }  
            } finally {  
                db.invalidationTracker.removeObserver(observer)  
            }  
        }  
        emitAll(resultChannel)  
    }  
}

 

Room에는 InvalidationTracker라는 존재가 있는데 이는 쿼리에 의해 수정된 테이블 목록을 관리하고 이 테이블들의 목록을 옵저버들에게 알리는 역할을 한다.

 

코드에서 observer선언부를 보면 InvalidationTracker에서 테이블의 수정이 일어날 때마다observerChannelUnit 신호를 보내는 것을 확인할 수 있다. observerChannelChannel(Channel.CONFLATED)로, 가장 최신의 신호 하나만 유지하는 채널이다.

try {  
    for (signal in observerChannel) {  
        val result = callable.call()  
        resultChannel.send(result)  
    }  
} finally {  
    db.invalidationTracker.removeObserver(observer)  
}  

 

Channel을 for문으로 돌리면 새로운 신호가 전송될 때까지 기다리게 된다.
observerChannel로 새로운 신호가 보내지면 callable.call()을 통해 다시 쿼리를 실행하고 resultChannel에 해당 쿼리에 대한 결과값을 담아서 최종 Flow에 방출하게 된다. (emitAll 부분)

 

이러한 패턴을 활용하면 내가 원하는 조건에서 원하는 함수가 실행되어 Flow가 방출되도록 할 수 있다. 활용할 부분은 무궁무진하다.