diff --git a/.gitignore b/.gitignore index a6ef976e..47280d7f 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,5 @@ AIProject/iCo/App/Resource/GoogleService-Info.plist AIProject/fastlane/.env AIProject/fastlane/.env AIProject/fastlane/report.xml + +AIProject/build diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift deleted file mode 100644 index 0d7b9635..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ /dev/null @@ -1,249 +0,0 @@ -// -// BaseWebSocketClient.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation -import AsyncAlgorithms - -/// 웹소켓 client입니다. 메세지와 연결 상태 stream, 연결, 해제를 책임집니다. -public final actor BaseWebSocketClient: NSObject, SocketEngine { - - /// 소켓 상태 채널 - private var stateChannel: AsyncChannel - - /// 메세지 채널 - private var incomingChannel: AsyncChannel> - - private let url: URL - private let session: URLSession - private var task: URLSessionWebSocketTask? - - private var pongContinuation: CheckedContinuation? - - /// 핑 전송 task - private var healthCheck: Task? - - public nonisolated var state: AsyncStream { - AsyncStream { continuation in - Task { - for await state in await stateChannel { - continuation.yield(state) - } - continuation.finish() - } - } - } - - public nonisolated var incoming: AsyncStream> { - AsyncStream { continuation in - Task { - for await message in await incomingChannel { - continuation.yield(message) - } - continuation.finish() - } - } - } - - public init(url: URL, session: URLSession = .shared) { - - self.url = url - self.session = session - - stateChannel = AsyncChannel() - incomingChannel = AsyncChannel>() - - super.init() - debugPrint(String(describing: Self.self), #function) - } - - /// 채널을 새로 개설하고 소켓을 엽니다. - /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. - public func connect() async { - - stateChannel = .init() - incomingChannel = .init() - - await stateChannel.send(.connecting) - - self.task = session.webSocketTask(with: url) - task?.delegate = self - task?.resume() - - // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 - try? await sendPing() - } - - public func send(_ data: Data) async throws { - do { - try await task?.send(.data(data)) - } catch { - await handleClose(with: error) - } - } - - public func close() async { - guard let task else { - await handleClose(code: .normalClosure, reason: nil) - return - } - task.cancel(with: .normalClosure, reason: nil) - } - - deinit { - debugPrint(String(describing: Self.self), #function) - task?.cancel() - task = nil - stateChannel.finish() - incomingChannel.finish() - } - - - /// 상태가 연결된 동안 메세지를 수신하여 채널로 보냅니다. - private func receiveLoop() async { - guard let task else { return } - while true { - do { - let message = try await task.receive() - - switch message { - case .data(let data): - await incomingChannel.send(.success(data)) - case .string(let string): - await incomingChannel.send(.success(Data(string.utf8))) - @unknown default: - await incomingChannel.send(.failure(.frameCorrupted)) - } - } catch is CancellationError { - await handleClose(code: .normalClosure, reason: nil) - return - } catch { - if (error as NSError).code == 57 { - debugPrint("WebSocekt is not connected Error 57") - await handleClose(with: NetworkError.webSocketError) - return - } else if let urlError = error as? URLError, urlError.code == .cancelled { - await handleClose(code: .normalClosure, reason: nil) - } else { - await handleClose(with: error) - return - } - } - } - } - - private func handleClose(with error: Error) async { - guard task != nil else { return } - - await stateChannel.send(.failed(error)) - release() - } - - private func handleClose(code: URLSessionWebSocketTask.CloseCode, reason: Data?) async { - guard task != nil else { return } - - await stateChannel.send(.closed(code: code, reason: reason)) - - Task.detached { [weak self] in - await self?.release() - } - } - - func release() { - task = nil - - healthCheck?.cancel() - healthCheck = nil - stateChannel.finish() - incomingChannel.finish() - } - - /// 인터벌마다 핑 보내는 메소드 - private func checkingAlive(duration: Duration) { - self.healthCheck?.cancel() - - self.healthCheck = Task { - while task != nil { - do { - try await Task.sleep(for: duration, clock: .suspending) - try await sendPing() - } catch { - await handleClose(with: error) - break - } - } - } - } - - private func sendPing() async throws { - guard let task else { return } - - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.pongContinuation = continuation - debugPrint("Send Ping") - task.sendPing { [weak self] error in - Task { - if let error { - await self?.pongContinuation?.resume(throwing: error) - await self?.releaseCont() - debugPrint("Ping Failed: \(error)") - return - } - debugPrint("Received Pong") - await self?.pongContinuation?.resume() - await self?.releaseCont() - } - } - } - } - - private func releaseCont() async { - self.pongContinuation = nil - } - - /// Socket으로부터 Open 응답을 받으면 상태를 변경하고 메세지 stream을 시작합니다. - /// 핑을 주기적으로 보냅니다. - /// 여기서 120은 업비트의 최소 주기 시간입니다. - private func handleConnect() async { - await stateChannel.send(.connected) - - Task { - await receiveLoop() - } - - checkingAlive(duration: .seconds(120)) - } -} - -// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. -extension BaseWebSocketClient: URLSessionWebSocketDelegate { - public nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - debugPrint("didOpen") - Task { - await handleConnect() - } - } - - public nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { - debugPrint("didClose") - Task { - await handleClose(code: closeCode, reason: reason) - } - } -} - -extension AsyncChannel { - public func makeStream() -> AsyncStream { - AsyncStream { continuation in - Task { - for await value in self { - continuation.yield(value) - } - continuation.finish() - } - } - } -} diff --git a/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift b/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift deleted file mode 100644 index d80888bc..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift +++ /dev/null @@ -1,54 +0,0 @@ -// -// Reconnectable+Model.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation - -/// 재연결 정책. 지수적으로 재연결 인터벌을 증가시켜 서버 부하를 줄임 -public struct ReconnectPolicy { - public var base: Duration = .milliseconds(500) - public var factor: Double = 2.0 - public var max: Duration = .seconds(10) - public var jitter: Double = 0.3 - public var foregroundOnly: Bool = true - public var maxAttemps = 3 - - public init(base: Duration, factor: Double, max: Duration, jitter: Double, foregroundOnly: Bool) { - self.base = base - self.factor = factor - self.max = max - self.jitter = jitter - self.foregroundOnly = foregroundOnly - } - - public static func defaultPolicy() -> ReconnectPolicy { - ReconnectPolicy(base: .milliseconds(500), factor: 2.0, max: .seconds(10), jitter: 0.3, foregroundOnly: true) - } -} - -public struct ExponentialBackoff { - let policy: ReconnectPolicy - private(set) var attempt: Int = 0 - - mutating func next() -> Duration { - defer { attempt += 1 } - let expo = min( - Double(policy.base.components.seconds) * pow(policy.factor, Double(attempt)), - Double(policy.max.components.seconds) - ) - let jitterRatio = 1.0 + (Double.random(in: 0...(policy.jitter))) - let seconds = max(0.1, expo * jitterRatio) - return .seconds(seconds) - } - - mutating func reset() { attempt = 0 } -} - -private extension Duration { - var seconds: Double { - Double(components.seconds) + Double(components.attoseconds) / 1e18 - } -} diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift deleted file mode 100644 index bdbf6438..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ /dev/null @@ -1,272 +0,0 @@ -// -// ReconnectableSocket.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation -import AsyncAlgorithms - -/// Socket을 상태와 메세지를 포워딩하고 재연결을 책임집니다. -public actor ReconnectableWebSocketClient: SocketEngine { - private var stateChannel: AsyncChannel - private var incomingChannel: AsyncChannel> - - /// 시도한 횟수 - private var attempts: Int = 0 - - /// SocketEngine Protocol - private var base: Base? - - /// Socket 상태와 메세지를 forwarding - private var forwardStateTask: Task? - private var forwardIncomingTask: Task? - - /// 소켓 상태 재연결하기 위한 Loop - private var loopTask: Task? - - /// 소켓 연결 상태 flag - private var isClosed = true - - /// 지수적으로 증가하는 재연결 대기 - private var backoff: ExponentialBackoff - - /// 재연결 정책 - private let policy: ReconnectPolicy - - /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 - private let makeBase: () -> Base - - public nonisolated var state: AsyncStream { - AsyncStream { continuation in - Task { - for await state in await stateChannel { - continuation.yield(state) - } - continuation.finish() - } - } - } - - public nonisolated var incoming: AsyncStream> { - AsyncStream { continuation in - Task { - for await message in await incomingChannel { - continuation.yield(message) - } - continuation.finish() - } - } - } - - public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { - self.makeBase = makeBase - self.policy = policy - self.backoff = ExponentialBackoff(policy: policy) - - self.stateChannel = .init() - self.incomingChannel = .init() - - debugPrint(String(describing: Self.self), "init") - } - - /// 소켓 연결 및 재연결 loop 실행 - public func connect() async { - guard loopTask == nil else { return } - isClosed = false - loopTask?.cancel() - loopTask = Task { [weak self] in - do { - try await self?.runLoop() - } catch { - await self?.loopTask?.cancel() - await self?.close() - } - } - } - - public func close() async { - isClosed = true - forwardIncomingTask?.cancel() - await base?.close() - base = nil - forwardStateTask?.cancel() - - release() - } - - public func send(_ data: Data) async throws { - try await base?.send(data) - } - - deinit { - debugPrint(String(describing: Self.self), #function) - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - - base = nil - - stateChannel.finish() - incomingChannel.finish() - loopTask?.cancel() - loopTask = nil - } - - /// 소켓을 재연결하기 위한 loop입니다. - /// 소켓이 죽으면 종료 원인을 분기하여 재시도 또는 종료합니다. - private func runLoop() async throws { - while !isClosed { - let base = makeBase() - self.base = base - - // 채널 재생성 및 기존 포워딩 task 취소 - self.stateChannel = .init() - self.incomingChannel = .init() - - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - - // forwarding 채널 시작 - forwardStateTask = Task { [weak self] in await self?.forwardState(from: base) } - - forwardIncomingTask = Task { [weak self] in await self?.forwardIncoming(from: base) } - - await base.connect() - - // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 - let terminal = await waitTerminalEvent(from: base) - - // 사용자가 종료한 것이면 그냥 종료 - if isClosed { - break - } - - // 종료 원인 분기 - switch classify(closeCode: terminal.closeCode, error: terminal.error) { - case let .closed(code, reason): - await stateChannel.send(.closed(code: code, reason: reason)) - release() - return - case .nonRetryable(let error): - await stateChannel.send(.failed(error ?? URLError(.networkConnectionLost))) - release() - return - case .retryable: - - // 재시도 가능한 에러이면 재시도 - // 재연결 시간 정책 반영하여 계산 - let delay = backoff.next() - print(backoff.attempt) - attempts += 1 - print("object's attemps: \(attempts) after \(delay) sec.") - await stateChannel.send(.reconnecting(nextAttempsIn: delay)) - try await Task.sleep(for: delay) - } - } - } - - - /// 종료 원인 분기 - /// - Parameters: - /// - closeCode: 종료 코드 // 1000 정상 종료등 - /// - error: urlError // 네트워크 연결 에러 등 - /// - Returns: 에러타입 반환 예) retryable , closed, nonRetryable - private func classify(closeCode: URLSessionWebSocketTask.CloseCode?, - error: Error?) -> WebSocket.Failure { -// if closeCode == nil, error == nil { -// return .closed(code: .normalClosure, reason: nil) -// } - if let code = closeCode { - switch code { - // 일시적 - 재시도 - case .goingAway, .abnormalClosure, .internalServerError, .noStatusReceived: - return .retryable(underlying: error) - // 정상 종료 - case .normalClosure: - return .closed(code: code, reason: nil) - // 정책/프로토콜/보안 - 비재시도 - case .protocolError, .unsupportedData, .policyViolation, .messageTooBig, .tlsHandshakeFailure, .invalidFramePayloadData, .invalid, .mandatoryExtensionMissing: - return .nonRetryable(underlying: error) - default: - return .retryable(underlying: error) - } - } - - if let urlErr = error as? URLError { - switch urlErr.code { - // 일시적 네트워크 - case .notConnectedToInternet, .timedOut, .networkConnectionLost: - return .retryable(underlying: urlErr) - - // 앱 전환/작업 취소 등 - case .cancelled: - return .retryable(underlying: urlErr) - - // 환경/설정/서버 응답 이상은 보수적으로 비재시도 - case .cannotFindHost, .cannotConnectToHost, .badServerResponse, .secureConnectionFailed, .serverCertificateUntrusted, .serverCertificateHasBadDate, .serverCertificateHasUnknownRoot: - return .nonRetryable(underlying: urlErr) - default: - return .retryable(underlying: urlErr) - } - } - - // 알 수 없으면 재시도 쪽으로 - return .retryable(underlying: error) - } - - private func forwardState(from base: Base) async { - for await _state in base.state { - switch _state { - case .connecting: - await stateChannel.send(.connecting) - case .connected: - backoff.reset() - await stateChannel.send(.connected) - case .failed(let error): - await stateChannel.send(.failed(error)) - case .closed(let code, let reason): - await stateChannel.send(.closed(code: code, reason: reason)) - case .reconnecting: - break - } - } - } - - private func forwardIncoming(from base: Base) async { - for await message in base.incoming { - await incomingChannel.send(message) - } - } - - private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { - for await _state in base.state { - switch _state { - case .failed(let error): - return (nil, error) - case .closed(let code, let reason): - var reasonString: String = "내용 없음" - if let reason, let text = String(data: reason, encoding: .utf8) { - reasonString = text - } - debugPrint(reasonString) - return (code, nil) - default: continue - } - } - - return (nil, nil) - } - - private func release() { - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - - base = nil - - stateChannel.finish() - incomingChannel.finish() - loopTask?.cancel() - loopTask = nil - } -} diff --git a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift index b76f07d3..3e70a2e1 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift @@ -6,10 +6,11 @@ // import Foundation +import AsyncAlgorithms public protocol SocketEngine { - var state: AsyncStream { get } - var incoming: AsyncStream> { get } + var stateChannel: AsyncChannel { get set } + var incomingChannel: AsyncChannel> { get set } func connect() async func send(_ data: Data) async throws func close() async @@ -18,8 +19,8 @@ public protocol SocketEngine { public enum WebSocket { public enum State: Sendable { case connecting, connected - case failed(Error) - case closed(code: URLSessionWebSocketTask.CloseCode, reason: Data?) + case failed + case closed case reconnecting(nextAttempsIn: Duration) } @@ -50,10 +51,6 @@ extension WebSocket.State: Equatable { return true case (.connected, .connected): return true - case (.failed(let lhsError), .failed(let rhsError)): - return lhsError as NSError == rhsError as NSError - case (.closed(let lhsCode, let lhsReason), .closed(let rhsCode, let rhsReason)): - return lhsCode == rhsCode && lhsReason == rhsReason case (.reconnecting(let lhsDelay), .reconnecting(let rhsDelay)): return lhsDelay == rhsDelay default: diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift new file mode 100644 index 00000000..3721d207 --- /dev/null +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -0,0 +1,220 @@ +import Foundation +import AsyncAlgorithms + +public final class WebSocketClient: NSObject { + /// 소켓 상태 채널 + private var stateStream: AsyncStream + /// WebSocket의 상태 변화를 여러 Consumer에게 동시에 전달하는 브로드캐스터 + public var stateBroadCaster: AsyncStreamBroadcaster = .init() + /// 메세지 채널 + public var incomingChannel: AsyncChannel + + private let url: URL + private let session: URLSession + private var task: URLSessionWebSocketTask? + + private var stateTask: Task? + private var receiveTask: Task? + + /// 핑 전송 task + private var healthCheck: Task? + private var pingInterval: Duration = .seconds(30) + private var pingTimeout: Duration = .seconds(10) + + public init(url: URL, session: URLSession = .shared) { + self.url = url + self.session = session + + stateStream = stateBroadCaster.stream() + incomingChannel = AsyncChannel() + + super.init() + observeState() + } + + /// 웹소켓 세션을 연결하고 작업을 생성합니다. + public func connect() async { + await stateBroadCaster.send(.connecting) + self.task = session.webSocketTask(with: url) + task?.delegate = self + task?.resume() + + // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 + try? await performWithTimeout(sendPing, at: pingTimeout) + } + + /// 명시적으로 현재 WebSocket 연결을 정상적으로 종료합니다. + /// + /// 이 메서드는 서버와의 WebSocket 연결을 `normalClosure` 코드로 닫습니다. + public func disconnect() async { + task?.cancel(with: .normalClosure, reason: nil) + } + + /// 텍스트 형태의 메시지를 WebSocket 서버로 전송합니다. + public func send(text: String) async throws { + try await task?.send(.string(text)) + } + + /// 바이너리(Data) 형태의 메시지를 WebSocket 서버로 전송합니다. + public func send(data: Data) async throws { + try await task?.send(.data(data)) + } + + deinit { + debugPrint(String(describing: Self.self), #function) + task?.cancel() + task = nil + stateBroadCaster.finish() + incomingChannel.finish() + } +} + +// MARK: - Test용 메소드 +// TODO: Deprecated 예정입니다. +extension WebSocketClient { + public func sendState(with state: WebSocket.State) async { + await stateBroadCaster.send(state) + } + + public func cancel(with code: URLSessionWebSocketTask.CloseCode) { + task?.cancel(with: code, reason: nil) + task = nil + } + + public func cancel() { + task?.cancel() + task = nil + } +} + +// MARK: - Private +extension WebSocketClient { + /// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다. + private func sendPing() async throws { + return try await withCheckedThrowingContinuation { continuation in + task?.sendPing { error in + Task { + if let error { + debugPrint("Ping Failed: \(error)") + continuation.resume(throwing: error) + return + } + + continuation.resume() + } + } + } + } + + /// WebSocket의 상태 변화를 관찰하고 각 상태에 맞는 동작을 수행합니다. + private func observeState() { + stateTask = Task { + for await state in stateStream { + switch state { + case .connecting: + debugPrint("Connecting") + continue + case .connected: + debugPrint("Connected") + receive() + checkingAlive() + case .failed, .closed: + debugPrint("Closed") + release() + case .reconnecting: + debugPrint("Reconnecting") + await reconnect() + } + } + } + } + + // FIXME: 개선이 필요한지 한 번 더 생각해보기 + /// 서버로부터 WebSocket 메시지를 지속적으로 수신합니다. + private func receive() { + receiveTask?.cancel() + + receiveTask = Task { + do { + guard let task else { return } + let message = try await task.receive() + await incomingChannel.send(message) + receive() + } catch { + print("종료되어 더 이상 웹소켓 데이터를 받지 않습니다.") + } + } + } + + /// 주기적으로 Ping을 전송하여 WebSocket 연결 상태를 점검합니다. + private func checkingAlive() { + healthCheck?.cancel() + + healthCheck = Task { + do { + while true { + try await Task.sleep(until: .now + pingInterval) + try await performWithTimeout(sendPing, at: .seconds(10)) + } + } catch is CancellationError { + debugPrint("작업이 취소되었습니다.") + } catch { + await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + } + + /// WebSocket 연결 종료 시 상태를 처리합니다. + private func handleDisconnected(_ userClose: Bool) async { + if userClose { + await stateBroadCaster.send(.closed) + } else { + await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + + /// WebSocket 재연결을 시도합니다. + private func reconnect() async { + guard task?.state != .running else { + return + } + + try? await Task.sleep(for: .seconds(2)) + await connect() + } + + /// WebSocket 클라이언트의 모든 비동기 작업과 연결을 종료하고 리소스를 정리합니다. + private func release() { + receiveTask?.cancel() + receiveTask = nil + healthCheck?.cancel() + healthCheck = nil + + if task?.state == .running { + task?.cancel(with: .goingAway, reason: nil) + } + + task = nil + } +} + +// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. +extension WebSocketClient: URLSessionWebSocketDelegate { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + Task { await stateBroadCaster.send(.connected) } + } + + // 웹소켓으로부터 Close Code를 받았을 때. (정상 종료로 닫혔을 때) + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + Task { await handleDisconnected(closeCode == .normalClosure) } + } + + // 세션 레벨에서 작업이 완전히 종료됐을 때. + // 1. 네트워크 닫힘, 2. 에러로 종료, 3. 정상적으로 완료 + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + if let _ = error { + Task { await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) } + } + } +} + diff --git a/AIProject/iCo/Core/Util/Async+BroadCaster.swift b/AIProject/iCo/Core/Util/Async+BroadCaster.swift new file mode 100644 index 00000000..4b041646 --- /dev/null +++ b/AIProject/iCo/Core/Util/Async+BroadCaster.swift @@ -0,0 +1,59 @@ +// +// Async+BroadCaster.swift +// iCo +// +// Created by 강대훈 on 10/26/25. +// + +import Foundation + +@globalActor +actor BroadCaster { + static let shared = BroadCaster() + + private init() {} +} + +/// AsyncStream의 다중 소비를 위해 만든 객체로 스트림 전파 수행 +public class AsyncStreamBroadcaster { + + /// 구독할 continuation 값들 + private var continuations: [UUID: AsyncStream.Continuation] = [:] + + /// 구독 메서드로 stream을 반환 + /// - Returns: stream 반환 + public func stream() -> AsyncStream { + let id = UUID() + + return AsyncStream { continuation in + continuation.onTermination = { [weak self] _ in + guard let self = self else { return } + self.finish(id: id) + } + + continuations[id] = continuation + } + } + + + /// 전파할 메세지를 전송하고 구독자들에게 전파 + /// - Parameter element: 메세지 + @BroadCaster + public func send(_ element: Element) async { + for (_, c) in continuations { + c.yield(element) + } + } + + + /// 구독을 해제하고 스트림을 종료함 + public func finish() { + for (_, c) in continuations { + c.finish() + } + } + + private func finish(id: UUID) { + continuations[id] = nil + } +} diff --git a/AIProject/iCo/Core/Util/Async+Timeout.swift b/AIProject/iCo/Core/Util/Async+Timeout.swift new file mode 100644 index 00000000..09670e2a --- /dev/null +++ b/AIProject/iCo/Core/Util/Async+Timeout.swift @@ -0,0 +1,43 @@ +// +// Async+Timeout.swift +// iCo +// +// Created by kangho on 10/25/25. +// + +import Foundation + + +/// 두 클로저 중 먼저 완료되는 클로저만 실행하고 나머지는 취소 +/// - Parameters: +/// - Returns: 먼저 완료되는 클로저 실행 +public func race( + _ lhs: sending @escaping () async throws -> T, + _ rhs: sending @escaping () async throws -> T +) async throws -> T { + return try await withThrowingTaskGroup(of: T.self) { group in + group.addTask { try await lhs() } + group.addTask { try await rhs() } + + defer { group.cancelAll() } + + return try await group.next()! + } +} + + +/// async 작업에 timeout을 줌 +/// - Parameters: +/// - action: 수행할 비동기 클로저 +/// - timeout: 타임아웃 duration +/// - Throws: 타임아웃 에러 또는 작업에서 발생한 에러 +/// - Returns: 비동기 작업 결과값 +public func performWithTimeout( + _ action: sending @escaping () async throws -> T, + at timeout: Duration +) async throws -> T { + return try await race(action) { + try await Task.sleep(until: .now + timeout) + throw URLError(.timedOut) + } +} diff --git a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift index 9e288ce5..2bcc2abe 100644 --- a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift +++ b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift @@ -9,27 +9,21 @@ import Foundation /// 업비트 실시간 코인 시세 웹소켓 서비스 final class UpbitTickerService: RealTimeTickerProvider { - private let client: any SocketEngine + private let client: WebSocketClient /// 소켓 상태 stream private var stateStreamTask: Task? - init( - client: any SocketEngine = - ReconnectableWebSocketClient { - BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) - } - ) { + init(client: WebSocketClient = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) { self.client = client } func connect() async { await client.connect() - streamingState() } func disconnect() async { - await client.close() + await client.disconnect() } /// 업비트의 코인 시세 stream을 가져와 디코딩하여 Model로 만들고 forwarding @@ -37,21 +31,26 @@ final class UpbitTickerService: RealTimeTickerProvider { func subscribeTickerStream() -> AsyncStream { AsyncStream { continuation in Task { - for await message in client.incoming { + for await message in client.incomingChannel { switch message { - case .success(let data): + case .data(let data): if let ticker = mapTicker(data) { continuation.yield(ticker) } - case .failure(let error): - debugPrint(error) + case .string(let string): + print(string) } } + continuation.finish() } } } + func subscribeStateStream() -> AsyncStream { + client.stateBroadCaster.stream() + } + /// 티켓과 코인 ID를 가지고 업비트에 코인 시세를 구독합니다. /// - Parameters: /// - ticket: 티켓 iD @@ -61,7 +60,7 @@ final class UpbitTickerService: RealTimeTickerProvider { do { let ticketData = try JSONEncoder().encode(SubscribeRequest.ticker(ticket: ticket, codes: coins)) - try await client.send(ticketData) + try await client.send(data: ticketData) } catch { debugPrint(error) } @@ -80,16 +79,5 @@ final class UpbitTickerService: RealTimeTickerProvider { return nil } } - - - /// 소켓 상태를 구독합니다. - private func streamingState() { - self.stateStreamTask?.cancel() - - self.stateStreamTask = Task { - for await state in client.state { - debugPrint("client State: \(state)") - } - } - } } + diff --git a/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift b/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift index fff20dea..719a986d 100644 --- a/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift +++ b/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift @@ -11,5 +11,6 @@ protocol RealTimeTickerProvider { func connect() async func disconnect() async func subscribeTickerStream() -> AsyncStream + func subscribeStateStream() -> AsyncStream func sendTicket(ticket: String, coins: [CoinListModel.ID]) async } diff --git a/AIProject/iCo/Features/Market/CoinList/CoinListView.swift b/AIProject/iCo/Features/Market/CoinList/CoinListView.swift index bb712c39..678a84c1 100644 --- a/AIProject/iCo/Features/Market/CoinList/CoinListView.swift +++ b/AIProject/iCo/Features/Market/CoinList/CoinListView.swift @@ -9,7 +9,7 @@ import SwiftUI import AsyncAlgorithms struct CoinListView: View { - @Bindable var store: MarketStore + @ObservedObject var store: MarketStore @State private var visibleCoins: Set = [] @State private var isActive: Bool = false diff --git a/AIProject/iCo/Features/Market/MarketStore.swift b/AIProject/iCo/Features/Market/MarketStore.swift index f4587d94..bb8597f8 100644 --- a/AIProject/iCo/Features/Market/MarketStore.swift +++ b/AIProject/iCo/Features/Market/MarketStore.swift @@ -16,8 +16,7 @@ enum CoinFilter: Int, Equatable { /// 마켓 이벤트 처리를 담당 /// 검색 / 시세 정보 / 웹소켓 상태 / 정렬 / 필터링 이벤트 처리 @MainActor -@Observable -class MarketStore { +final class MarketStore: ObservableObject { /// 최초 한 번만 로드하기 위한 flag private var hasLoaded = false @@ -28,11 +27,12 @@ class MarketStore { private let coinService: UpBitAPIService private let tickerService: RealTimeTickerProvider private let searchRecordManager: SearchRecordManaging = SearchRecordManager() + private var stateTask: Task? private(set) var errorMessage: String? /// 변동성이 적은 메타 정보 - private(set) var coinMeta: [CoinID: Coin] = [:] + @Published private(set) var coinMeta: [CoinID: Coin] = [:] /// 변동성이 큰 시세 정보 private var ticker: [CoinID: TickerStore] = [:] @@ -113,6 +113,8 @@ class MarketStore { init(coinService: UpBitAPIService, tickerService: RealTimeTickerProvider) { self.coinService = coinService self.tickerService = tickerService + + Task { await observeState() } } } @@ -275,14 +277,6 @@ extension MarketStore { // service 연결 await tickerService.connect() - if !subscriptionSnapshot.isEmpty { - await sendTicket(subscriptionSnapshot) - } - - // 시세가 - self.tickerStreamTask = Task { - await consume() - } } /// 서비스 연결 해제 @@ -306,7 +300,6 @@ extension MarketStore { private func ticketStream() async { let stream = visibleCoinsChannel .filter { !$0.isEmpty } - .removeDuplicates() .debounce(for: .milliseconds(300)) for await visibleCoin in stream { self.subscriptionSnapshot = visibleCoin @@ -326,7 +319,24 @@ extension MarketStore { store.apply(ticker) } - + private func observeState() async { + stateTask?.cancel() + + stateTask = Task { + for await state in tickerService.subscribeStateStream() { + if case .connected = state { + if !subscriptionSnapshot.isEmpty { + await sendTicket(subscriptionSnapshot) + } + + // 시세가 + self.tickerStreamTask = Task { + await consume() + } + } + } + } + } } extension MarketStore { diff --git a/AIProject/iCo/Features/Market/MarketView.swift b/AIProject/iCo/Features/Market/MarketView.swift index 84d9079a..277091a5 100644 --- a/AIProject/iCo/Features/Market/MarketView.swift +++ b/AIProject/iCo/Features/Market/MarketView.swift @@ -9,7 +9,7 @@ import SwiftUI struct MarketView: View { - @State var store: MarketStore + @StateObject var store: MarketStore @State private var searchText: String = "" @State private var selectedCoinID: CoinID? @@ -25,7 +25,7 @@ struct MarketView: View { @State private var showNewBadge: Bool = false init(coinService: UpBitAPIService, tickerService: RealTimeTickerProvider) { - store = MarketStore(coinService: coinService, tickerService: tickerService) + _store = StateObject(wrappedValue: MarketStore(coinService: coinService, tickerService: tickerService)) } var body: some View { @@ -124,9 +124,6 @@ extension MarketView { #Preview { MarketView( coinService: UpBitAPIService(), - tickerService: UpbitTickerService(client: - ReconnectableWebSocketClient { - BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) - }) + tickerService: UpbitTickerService(client: WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) ) } diff --git a/AIProject/iCoTests/Socket/WebSocketTest.swift b/AIProject/iCoTests/Socket/WebSocketTest.swift new file mode 100644 index 00000000..005f60af --- /dev/null +++ b/AIProject/iCoTests/Socket/WebSocketTest.swift @@ -0,0 +1,71 @@ +// +// WebSocketTest.swift +// iCoTests +// +// Created by kangho on 10/25/25. +// + +import XCTest +import iCo + +final class WebSocketTest: XCTestCase { + + override func setUpWithError() throws { + // Put setup code here. This method is called before the invocation of each test method in the class. + } + + override func tearDownWithError() throws { + // Put teardown code here. This method is called after the invocation of each test method in the class. + } + + func testExample() async throws { + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) + await sut.connect() + try await sut.send(text: "[{ticket:test},{type:ticker,codes:[KRW-BTC]}]") + } + + func testReconnenctWhenAbnormalClose() async throws { + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) + + await sut.connect() + + try await Task.sleep(for: .seconds(2)) + sut.cancel(with: .abnormalClosure) + try await Task.sleep(for: .seconds(2)) + // connecting -> connected -> abnormal close -> handleDisconnect -> reconnect + // .... -> abnormal close -> didCompletWithError -> reconnect + } + + func testReceiveData() async throws { + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) + let requestFormat = "[{ticket:test},{type:ticker,codes:[KRW-BTC]}]" + + await sut.connect() + try await sut.send(text: requestFormat) + try await Task.sleep(for: .seconds(3)) + + sut.cancel() + + try await Task.sleep(for: .seconds(3)) + try await sut.send(text: requestFormat) + try await Task.sleep(for: .seconds(10)) + } + + func testUserClose() async throws { + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) + await sut.connect() + try await Task.sleep(for: .seconds(2)) + await sut.disconnect() + try await Task.sleep(for: .seconds(2)) + await sut.connect() + try await Task.sleep(for: .seconds(10)) + } + + func testPerformanceExample() throws { + // This is an example of a performance test case. + self.measure { + // Put the code you want to measure the time of here. + } + } +} + diff --git a/AIProject/iCoTests/Socket/WebSocketTests.swift b/AIProject/iCoTests/Socket/WebSocketTests.swift deleted file mode 100644 index 7bf11c3c..00000000 --- a/AIProject/iCoTests/Socket/WebSocketTests.swift +++ /dev/null @@ -1,52 +0,0 @@ -// -// WebSocketTests.swift -// AIProjectTests -// -// Created by kangho lee on 8/17/25. -// - -import XCTest -@testable import iCo - -final class WebSocketTests: XCTestCase { - - static let echoURL = URL(string: "wss://echo.websocket.org")! - static let upbitURL = URL(string: "wss://api.upbit.com/websocket/v1")! - - func test_connect() async throws { - var (socket, states) = makeSUT() - - Task { - for await state in socket.state { - states.append(state) - } - } - - await socket.connect() - XCTAssertEqual(states, [.connecting, .connected]) - } - - func test_user_disconnect() async throws { - var (socket, states) = makeSUT() - - Task { - for await state in socket.state { - states.append(state) - } - } - - await socket.connect() - await socket.close() - try await Task.sleep(for: .milliseconds(100)) - XCTAssertEqual(states, [.connecting, .connected, .closed(code: .normalClosure, reason: nil)]) - } - - // MARK: Helper - - private func makeSUT(url: URL = upbitURL) -> (SocketEngine, [WebSocket.State]) { - let socket = BaseWebSocketClient(url: url) - - return (socket, [WebSocket.State]()) - } - -}