Swift

Swift Concurrency #7 - AsyncSequence와 AsyncStream

빨간체리반지 2026. 3. 2. 13:47

WWDC21: Meet AsyncSequence | Apple

AsyncSequence란?

AsyncSequence는 본질적으로 async가 적용된 Sequence이다.

  • Sequence: 일반 Sequence와 마찬가지로 정해진 순서를 가진다. 순서가 뒤바뀌거나 임의로 접근(random access)하는 것은 불가능하다.
  • Suspend & Resume: 각각의 요소(element)에 대해 작업을 일시 중단(suspend)하고, iterator가 값을 생성하거나 에러를 방출하면 다시 재개(resume)된다.
  • 에러 처리와 종료: 비동기 작업을 수행하므로 에러가 발생할 수 있다. 컴파일러는 이를 일반 throwing 함수와 동일하게 처리한다. 에러가 발생하거나 더 이상 데이터가 없으면 iterator.next()는 nil을 반환하며 종료된다.

 

사용 방법

  • 표준 방식: for-await-in 또는 for-try-await-in(에러 발생 가능성이 있는 경우) 구문을 사용한다.
  • iterator 사용하기: makeAsyncIterator()를 통해 명시적으로 iterator를 생성하고, while let quake = await iterator.next() 방식으로 순회할 수도 있다.
  • 일반 for문과 동일하게 break나 continue를 사용하여 반복문을 종료하거나 스킵할 수 있다.
// 표준 방식
for await quake in quakes {
    if quake.magnitude > 3 {
        displaySignificantEarthquake(quake)
    }
}

// iterator 방식
var iterator = quakes.makeAsyncIterator()
while let quake = await iterator.next() {
    if quake.magnitude > 3 {
        displaySignificantEarthquake(quake)
    }
}

 

무기한 AsyncStream

여러 AsyncSequence를 동시에 순회해야 하거나, 무기한으로 실행될 가능성이 있는 스트림을 다룰 때는 Task로 감싸서(encapsulate) 실행하면 된다. 생성한 Task를 변수에 저장해 두면 추후 외부에서 .cancel()을 호출하여 작업을 안전하게 중단시킬 수 있다.

let quakeTask = Task {
    for await quake in quakes {
        if quake.magnitude > 3 {
            displaySignificantEarthquake(quake)
        }
    }
}

// 나중에...
quakeTask.cancel()

AsyncStream 사용처 및 제공 API

기존에 복잡했던 비동기 데이터 처리를 AsyncStream을 이용해 손쉽게 처리할 수 있도록 Apple에서는 다양한 표준 API를 제공한다.

FileHandle

 

bytes | Apple Developer Documentation

The file’s contents, as an asynchronous sequence of bytes.

developer.apple.com

 

for try await line in FileHandle.standardInput.bytes.lines {
    /* 생략 */
}

 

URL

let url = URL(fileURLWithPath: "/tmp/somefile.txt")

for try await line in url.lines {
    /* 생략 */	
}

 

URLSession

네트워크 요청 시 데이터를 바이트 스트림으로 받는다. 응답과 비동기 바이트 시퀀스를 함께 리턴받아 사용할 수 있다.

  • func bytes(from: URL) async throws -> (AsyncBytes, URLResponse)
  • func bytes(for: URLRequest) async throws -> (AsyncBytes, URLResponse)
let (bytes, response) = try await URLSession.shared.bytes(from: url)

guard let httpResponse = response as? HTTPURLResponse,
    httpResponse.statusCode == 200 else {
    throw MyNetworkingError.invalidServerResponse
}

for try await byte in bytes {
    /* 생략 */
}

 

Notifications

Notification을 비동기 시퀀스로 대기하며 처리할 수 있다.

  • public func notificatoins(named: Notification.Name, object: AnyObject) -> Notifications
let center = NotificationCenter.default

let notification = await center.notifications(named: .NSPersistentStoreRemoteChange).first {
    $0.userInfo[NSStoreUUIDKey] == storeUUID
}

 

🛠️ AsyncSequence 값 가공하기 (Manipulating)

일반적인 Sequence를 다룰 때 사용하던 고차 함수들을 AsyncSequence에서도 그대로 사용할 수 있다.

 

지원 함수 예시:
allSatisfy, max, prefix, joined, dropFirst, flatMap, zip, compactMap, min, reduce, filter, contains, first 등..


AsyncSequence 도입하기

기존에 콜백(Callback)이나 델리게이트(Delegate) 패턴으로 작성된 코드는 AsyncStream를 사용하면 AsyncSequence 형태로 변환할 수 있다.

 

우선 AsyncStream을 어떻게 사용하는지 알아보면 아래와 같다.

public struct AsyncStream<Element>: AsyncSequence {
    public init(_ elementType: Element.Type = Element.self,
                maxBufferedElements limit: Int = .max,
                _ build: (Continuation) -> Void)
}
  • 데이터 방출: AsyncStream.Continuation을 사용해 continuation.yield(값) 형태로 핸들러가 호출될 때마다 데이터를 방출한다.
  • 정상 종료/에러 방출: continuation.finish()를 호출하면 iterator가 nil을 생성하여 시퀀스를 종료한다. (별도의 finish 호출이 없는 경우 Task가 취소될 때까지 무한 수신)
  • 버퍼링 지원: 버퍼가 가득 차면 이전 데이터를 버리는 등의 처리 방식을 자체적으로 지원한다.
  • Task 취소: Task가 취소되면 continuation.onTermination 클로저가 호출된다. 클로저의 termination 파라미터를 통해 finished/cancelled 상태 확인이 가능하며, 해당 클로저를 통해 네트워크나 타이머 등을 안전하게 중단(Cleanup)시킬 수 있다.

 

원래 코드가 아래와 같았다면,

class QuakeMonitor {
    var quakeHandler: (Quake) -> Void
    func startMonitoring()
    func stopMonitoring()
}

// ------

let monitor = QuakeMonitor()
monitor.quakeHandler = { quake in
	...
}

monitor.startMonitoring()
...
monitor.stopMonitoring()

 

AsyncStream을 사용하면 아래로 변환할 수 있다.

let quakes = AsyncStream(Quake.self) { continuation in
    let monitor = QuakeMonitor()
    monitor.quakeHandler = { quake in
        continuation.yield(quake) // 핸들러 호출될 때마다 element 방출
    }
    continuation.onTermination = { _ in
        monitor.stopMonitoring() // onTermination 에서 cleanup 해주기
    }

    monitor.startMonitoring()
}

// ------
// 아래와 같이 사용할 수 있다.

let significantQuakes = quakes.filter { quake in
    quake.magnitude > 3
}
for await quake in significantQuakes {
    ...
}

 

🚨 에러 처리가 필요한 경우

에러를 방출해야 하는 커스텀 시퀀스를 만들 때는 AsyncThrowingStream을 사용하며, 호출부에서는 for-try-await-in을 사용해 에러를 안전하게 처리(do-catch)하면 된다.


유익했다면 댓글/공감 남겨주세요~~ 작성자에게 큰 힘이 됩니다 ☺️