WWDC21: Meet AsyncSequence | Apple
AsyncSequence란?
AsyncSequence는 본질적으로 async가 적용된 Sequence이다.
- Sequence: 일반 Sequence와 마찬가지로 정해진 순서를 가진다. 순서가 뒤바뀌거나 임의로 접근(random access)하는 것은 불가능하다.
- Suspend & Resume: 각각의 요소(element)에 대해 작업을 일시 중단(suspend)하고, iterator가 값을 생성하거나 에러를 방출하면 다시 재개(resume)된다.
- 에러 처리와 종료: 비동기 작업을 수행하므로 에러가 발생할 수 있다. 컴파일러는 이를 일반 throwing 함수와 동일하게 처리한다. 에러가 발생하거나 더 이상 데이터가 없으면 iterator.next()는 nil을 반환하며 종료된다.
사용 방법
- 표준 방식: for-await-in 또는 for-try-await-in(에러 발생 가능성이 있는 경우) 구문을 사용한다.
- iterator 사용하기: makeAsyncIterator()를 통해 명시적으로 iterator를 생성하고, while let quake = await iterator.next() 방식으로 순회할 수도 있다.
- 일반 for문과 동일하게 break나 continue를 사용하여 반복문을 종료하거나 스킵할 수 있다.
// 표준 방식
for await quake in quakes {
if quake.magnitude > 3 {
displaySignificantEarthquake(quake)
}
}
// iterator 방식
var iterator = quakes.makeAsyncIterator()
while let quake = await iterator.next() {
if quake.magnitude > 3 {
displaySignificantEarthquake(quake)
}
}
무기한 AsyncStream
여러 AsyncSequence를 동시에 순회해야 하거나, 무기한으로 실행될 가능성이 있는 스트림을 다룰 때는 Task로 감싸서(encapsulate) 실행하면 된다. 생성한 Task를 변수에 저장해 두면 추후 외부에서 .cancel()을 호출하여 작업을 안전하게 중단시킬 수 있다.
let quakeTask = Task {
for await quake in quakes {
if quake.magnitude > 3 {
displaySignificantEarthquake(quake)
}
}
}
// 나중에...
quakeTask.cancel()
AsyncStream 사용처 및 제공 API
기존에 복잡했던 비동기 데이터 처리를 AsyncStream을 이용해 손쉽게 처리할 수 있도록 Apple에서는 다양한 표준 API를 제공한다.
FileHandle
- public var bytes: AsyncBytes
파일에서 바이트를 비동기적으로 읽어온다.
bytes | Apple Developer Documentation
The file’s contents, as an asynchronous sequence of bytes.
developer.apple.com
for try await line in FileHandle.standardInput.bytes.lines {
/* 생략 */
}
URL
- public var resourceBytes: AsyncBytes
URL에서 데이터를 바이트 단위로 읽어온다. - public var lines: AsyncLineSequence<AsyncBytes>
URL에서 데이터를 줄 단위로 읽어온다.
let url = URL(fileURLWithPath: "/tmp/somefile.txt")
for try await line in url.lines {
/* 생략 */
}
URLSession
네트워크 요청 시 데이터를 바이트 스트림으로 받는다. 응답과 비동기 바이트 시퀀스를 함께 리턴받아 사용할 수 있다.
- func bytes(from: URL) async throws -> (AsyncBytes, URLResponse)
- func bytes(for: URLRequest) async throws -> (AsyncBytes, URLResponse)
let (bytes, response) = try await URLSession.shared.bytes(from: url)
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw MyNetworkingError.invalidServerResponse
}
for try await byte in bytes {
/* 생략 */
}
Notifications
Notification을 비동기 시퀀스로 대기하며 처리할 수 있다.
- public func notificatoins(named: Notification.Name, object: AnyObject) -> Notifications
let center = NotificationCenter.default
let notification = await center.notifications(named: .NSPersistentStoreRemoteChange).first {
$0.userInfo[NSStoreUUIDKey] == storeUUID
}
🛠️ AsyncSequence 값 가공하기 (Manipulating)
일반적인 Sequence를 다룰 때 사용하던 고차 함수들을 AsyncSequence에서도 그대로 사용할 수 있다.
지원 함수 예시:
allSatisfy, max, prefix, joined, dropFirst, flatMap, zip, compactMap, min, reduce, filter, contains, first 등..
AsyncSequence 도입하기
기존에 콜백(Callback)이나 델리게이트(Delegate) 패턴으로 작성된 코드는 AsyncStream를 사용하면 AsyncSequence 형태로 변환할 수 있다.
우선 AsyncStream을 어떻게 사용하는지 알아보면 아래와 같다.
public struct AsyncStream<Element>: AsyncSequence {
public init(_ elementType: Element.Type = Element.self,
maxBufferedElements limit: Int = .max,
_ build: (Continuation) -> Void)
}
- 데이터 방출: AsyncStream.Continuation을 사용해 continuation.yield(값) 형태로 핸들러가 호출될 때마다 데이터를 방출한다.
- 정상 종료/에러 방출: continuation.finish()를 호출하면 iterator가 nil을 생성하여 시퀀스를 종료한다. (별도의 finish 호출이 없는 경우 Task가 취소될 때까지 무한 수신)
- 버퍼링 지원: 버퍼가 가득 차면 이전 데이터를 버리는 등의 처리 방식을 자체적으로 지원한다.
- Task 취소: Task가 취소되면 continuation.onTermination 클로저가 호출된다. 클로저의 termination 파라미터를 통해 finished/cancelled 상태 확인이 가능하며, 해당 클로저를 통해 네트워크나 타이머 등을 안전하게 중단(Cleanup)시킬 수 있다.
원래 코드가 아래와 같았다면,
class QuakeMonitor {
var quakeHandler: (Quake) -> Void
func startMonitoring()
func stopMonitoring()
}
// ------
let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
...
}
monitor.startMonitoring()
...
monitor.stopMonitoring()
AsyncStream을 사용하면 아래로 변환할 수 있다.
let quakes = AsyncStream(Quake.self) { continuation in
let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
continuation.yield(quake) // 핸들러 호출될 때마다 element 방출
}
continuation.onTermination = { _ in
monitor.stopMonitoring() // onTermination 에서 cleanup 해주기
}
monitor.startMonitoring()
}
// ------
// 아래와 같이 사용할 수 있다.
let significantQuakes = quakes.filter { quake in
quake.magnitude > 3
}
for await quake in significantQuakes {
...
}
🚨 에러 처리가 필요한 경우
에러를 방출해야 하는 커스텀 시퀀스를 만들 때는 AsyncThrowingStream을 사용하며, 호출부에서는 for-try-await-in을 사용해 에러를 안전하게 처리(do-catch)하면 된다.
유익했다면 댓글/공감 남겨주세요~~ 작성자에게 큰 힘이 됩니다 ☺️
'Swift' 카테고리의 다른 글
| Swift Concurrency #마무리 - 여러 번 보면서 이해한 Swift Concurrency (0) | 2026.03.02 |
|---|---|
| Swift Concurrency #6 - Data Race 제거하기 (0) | 2026.03.02 |
| Swift Concurrency #5 - Concurrency와 Thread (0) | 2026.02.24 |
| Swift Concurrency #4 - Actor로 Mutable State 보호하기 (0) | 2026.02.24 |
| Swift Concurrency #3 - Task cancellation과 hierarchy 심화 (1) | 2026.01.14 |