From 727efeab07c7dce489a41e1a752484d7efb7313d Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 15:54:54 +0900 Subject: [PATCH 01/33] =?UTF-8?q?test:=20=EC=9B=B9=EC=86=8C=EC=BC=93=20?= =?UTF-8?q?=EC=9E=AC=EC=97=B0=EA=B2=B0=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= =?UTF-8?q?=EB=A5=BC=20=EC=9C=84=ED=95=9C=20=EC=A0=84=EC=97=AD=20=EB=B3=80?= =?UTF-8?q?=EC=88=98=20=EC=83=9D=EC=84=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 0d7b9635..71ebcf62 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -8,6 +8,8 @@ import Foundation import AsyncAlgorithms +var flag = true + /// 웹소켓 client입니다. 메세지와 연결 상태 stream, 연결, 해제를 책임집니다. public final actor BaseWebSocketClient: NSObject, SocketEngine { From ffda9e07c77d1a62c5454116b8a9a1f0eef74079 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 15:59:02 +0900 Subject: [PATCH 02/33] =?UTF-8?q?refactor:=20=EC=95=A1=ED=84=B0=EC=97=90?= =?UTF-8?q?=EC=84=9C=20=ED=81=B4=EB=9E=98=EC=8A=A4=EB=A1=9C=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 2 +- .../iCo/Core/Remote/WebSocket/ReconnectableSocket.swift | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 71ebcf62..67e1c7fd 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -11,7 +11,7 @@ import AsyncAlgorithms var flag = true /// 웹소켓 client입니다. 메세지와 연결 상태 stream, 연결, 해제를 책임집니다. -public final actor BaseWebSocketClient: NSObject, SocketEngine { +public final class BaseWebSocketClient: NSObject { /// 소켓 상태 채널 private var stateChannel: AsyncChannel diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index bdbf6438..647bb194 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -9,10 +9,7 @@ import Foundation import AsyncAlgorithms /// Socket을 상태와 메세지를 포워딩하고 재연결을 책임집니다. -public actor ReconnectableWebSocketClient: SocketEngine { - private var stateChannel: AsyncChannel - private var incomingChannel: AsyncChannel> - +public class ReconnectableWebSocketClient { /// 시도한 횟수 private var attempts: Int = 0 From 06eaa8c9d7e61189701bfb3772549f846e832170 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 15:59:32 +0900 Subject: [PATCH 03/33] =?UTF-8?q?refactor:=20=EC=8A=A4=ED=8A=B8=EB=A6=BC?= =?UTF-8?q?=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocket/BaseWebSocketClient.swift | 22 ------------------- .../WebSocket/ReconnectableSocket.swift | 22 ------------------- 2 files changed, 44 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 67e1c7fd..de4ae19d 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -28,28 +28,6 @@ public final class BaseWebSocketClient: NSObject { /// 핑 전송 task private var healthCheck: Task? - public nonisolated var state: AsyncStream { - AsyncStream { continuation in - Task { - for await state in await stateChannel { - continuation.yield(state) - } - continuation.finish() - } - } - } - - public nonisolated var incoming: AsyncStream> { - AsyncStream { continuation in - Task { - for await message in await incomingChannel { - continuation.yield(message) - } - continuation.finish() - } - } - } - public init(url: URL, session: URLSession = .shared) { self.url = url diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index 647bb194..b1e74a87 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -35,28 +35,6 @@ public class ReconnectableWebSocketClient { /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 private let makeBase: () -> Base - public nonisolated var state: AsyncStream { - AsyncStream { continuation in - Task { - for await state in await stateChannel { - continuation.yield(state) - } - continuation.finish() - } - } - } - - public nonisolated var incoming: AsyncStream> { - AsyncStream { continuation in - Task { - for await message in await incomingChannel { - continuation.yield(message) - } - continuation.finish() - } - } - } - public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { self.makeBase = makeBase self.policy = policy From f9db84eac376f4e000f90a9acc3dd0d04db0c06a Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:00:15 +0900 Subject: [PATCH 04/33] =?UTF-8?q?chore:=20makeStream()=20=EB=A9=94?= =?UTF-8?q?=EC=86=8C=EB=93=9C=20=EB=AF=B8=EC=82=AC=EC=9A=A9=EC=9C=BC?= =?UTF-8?q?=EB=A1=9C=20=EC=9D=B8=ED=95=9C=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 9 --------- 1 file changed, 9 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index de4ae19d..a479a5c5 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -215,15 +215,6 @@ extension BaseWebSocketClient: URLSessionWebSocketDelegate { } } -extension AsyncChannel { - public func makeStream() -> AsyncStream { - AsyncStream { continuation in - Task { - for await value in self { - continuation.yield(value) - } - continuation.finish() - } } } } From 55251221aead6060b176e87472d54eb469c8b0b9 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:00:54 +0900 Subject: [PATCH 05/33] =?UTF-8?q?feat:=20URLSession=20=EC=97=90=EB=9F=AC?= =?UTF-8?q?=20=EC=B2=98=EB=A6=AC=20=EB=8D=B8=EB=A6=AC=EA=B2=8C=EC=9D=B4?= =?UTF-8?q?=ED=8A=B8=20=EB=A9=94=EC=86=8C=EB=93=9C=20=EC=B1=84=ED=83=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index a479a5c5..932a9c7a 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -215,6 +215,14 @@ extension BaseWebSocketClient: URLSessionWebSocketDelegate { } } + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + debugPrint("didCompleteWithError") + + guard let error else { + Task { await handleClose(code: .goingAway, reason: nil)} + return } + + Task { await handleClose(with: error) } } } From 7949207f3390d93e19f6e56f27cc68a4fc4205d3 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:02:39 +0900 Subject: [PATCH 06/33] =?UTF-8?q?chore:=20=EB=94=94=EB=B2=84=EA=B7=B8=20?= =?UTF-8?q?=ED=94=84=EB=A6=B0=ED=8A=B8=20=EC=A0=9C=EA=B1=B0=20=EB=B0=8F=20?= =?UTF-8?q?=EC=82=AC=EC=9A=A9=ED=95=98=EC=A7=80=20=EC=95=8A=EB=8A=94=20?= =?UTF-8?q?=EC=A3=BC=EC=84=9D=20=EC=BD=94=EB=93=9C=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 3 --- .../iCo/Core/Remote/WebSocket/ReconnectableSocket.swift | 6 ------ 2 files changed, 9 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 932a9c7a..96722c19 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -37,7 +37,6 @@ public final class BaseWebSocketClient: NSObject { incomingChannel = AsyncChannel>() super.init() - debugPrint(String(describing: Self.self), #function) } /// 채널을 새로 개설하고 소켓을 엽니다. @@ -163,7 +162,6 @@ public final class BaseWebSocketClient: NSObject { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in self.pongContinuation = continuation - debugPrint("Send Ping") task.sendPing { [weak self] error in Task { if let error { @@ -172,7 +170,6 @@ public final class BaseWebSocketClient: NSObject { debugPrint("Ping Failed: \(error)") return } - debugPrint("Received Pong") await self?.pongContinuation?.resume() await self?.releaseCont() } diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index b1e74a87..c2d5063f 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -43,7 +43,6 @@ public class ReconnectableWebSocketClient { self.stateChannel = .init() self.incomingChannel = .init() - debugPrint(String(describing: Self.self), "init") } /// 소켓 연결 및 재연결 loop 실행 @@ -134,7 +133,6 @@ public class ReconnectableWebSocketClient { let delay = backoff.next() print(backoff.attempt) attempts += 1 - print("object's attemps: \(attempts) after \(delay) sec.") await stateChannel.send(.reconnecting(nextAttempsIn: delay)) try await Task.sleep(for: delay) } @@ -149,9 +147,6 @@ public class ReconnectableWebSocketClient { /// - Returns: 에러타입 반환 예) retryable , closed, nonRetryable private func classify(closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) -> WebSocket.Failure { -// if closeCode == nil, error == nil { -// return .closed(code: .normalClosure, reason: nil) -// } if let code = closeCode { switch code { // 일시적 - 재시도 @@ -224,7 +219,6 @@ public class ReconnectableWebSocketClient { if let reason, let text = String(data: reason, encoding: .utf8) { reasonString = text } - debugPrint(reasonString) return (code, nil) default: continue } From 1eaeed2e440c80bb9cabddad48b945770b37927c Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:03:09 +0900 Subject: [PATCH 07/33] =?UTF-8?q?refactor:=20=EC=A0=91=EA=B7=BC=20?= =?UTF-8?q?=EC=A0=9C=EC=96=B4=EC=9E=90=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 96722c19..67c4af99 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -14,10 +14,10 @@ var flag = true public final class BaseWebSocketClient: NSObject { /// 소켓 상태 채널 - private var stateChannel: AsyncChannel + public var stateChannel: AsyncChannel /// 메세지 채널 - private var incomingChannel: AsyncChannel> + public var incomingChannel: AsyncChannel> private let url: URL private let session: URLSession From a3c6208b43ca2e47be2090236edf6d9d9155ffa1 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:04:28 +0900 Subject: [PATCH 08/33] =?UTF-8?q?feat:=20=ED=8F=AC=EC=9B=8C=EB=94=A9=20?= =?UTF-8?q?=EB=B0=A9=EC=8B=9D=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocket/ReconnectableSocket.swift | 55 ------------------- 1 file changed, 55 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index c2d5063f..38d7cc71 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -17,7 +17,6 @@ public class ReconnectableWebSocketClient { private var base: Base? /// Socket 상태와 메세지를 forwarding - private var forwardStateTask: Task? private var forwardIncomingTask: Task? /// 소켓 상태 재연결하기 위한 Loop @@ -40,9 +39,6 @@ public class ReconnectableWebSocketClient { self.policy = policy self.backoff = ExponentialBackoff(policy: policy) - self.stateChannel = .init() - self.incomingChannel = .init() - } /// 소켓 연결 및 재연결 loop 실행 @@ -62,10 +58,8 @@ public class ReconnectableWebSocketClient { public func close() async { isClosed = true - forwardIncomingTask?.cancel() await base?.close() base = nil - forwardStateTask?.cancel() release() } @@ -76,13 +70,7 @@ public class ReconnectableWebSocketClient { deinit { debugPrint(String(describing: Self.self), #function) - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - base = nil - - stateChannel.finish() - incomingChannel.finish() loopTask?.cancel() loopTask = nil } @@ -93,19 +81,6 @@ public class ReconnectableWebSocketClient { while !isClosed { let base = makeBase() self.base = base - - // 채널 재생성 및 기존 포워딩 task 취소 - self.stateChannel = .init() - self.incomingChannel = .init() - - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - - // forwarding 채널 시작 - forwardStateTask = Task { [weak self] in await self?.forwardState(from: base) } - - forwardIncomingTask = Task { [weak self] in await self?.forwardIncoming(from: base) } - await base.connect() // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 @@ -185,30 +160,6 @@ public class ReconnectableWebSocketClient { return .retryable(underlying: error) } - private func forwardState(from base: Base) async { - for await _state in base.state { - switch _state { - case .connecting: - await stateChannel.send(.connecting) - case .connected: - backoff.reset() - await stateChannel.send(.connected) - case .failed(let error): - await stateChannel.send(.failed(error)) - case .closed(let code, let reason): - await stateChannel.send(.closed(code: code, reason: reason)) - case .reconnecting: - break - } - } - } - - private func forwardIncoming(from base: Base) async { - for await message in base.incoming { - await incomingChannel.send(message) - } - } - private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { for await _state in base.state { switch _state { @@ -228,13 +179,7 @@ public class ReconnectableWebSocketClient { } private func release() { - forwardStateTask?.cancel() - forwardIncomingTask?.cancel() - base = nil - - stateChannel.finish() - incomingChannel.finish() loopTask?.cancel() loopTask = nil } From 8acaaba385681bdcdfd69492ff77a2e9f0842d14 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:05:34 +0900 Subject: [PATCH 09/33] =?UTF-8?q?refactor:=20=EC=95=A1=ED=84=B0=20?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0=EB=A1=9C=20=EC=9D=B8=ED=95=B4=20nonisolated?= =?UTF-8?q?=20=ED=82=A4=EC=9B=8C=EB=93=9C=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 67c4af99..b30277c1 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -197,21 +197,20 @@ public final class BaseWebSocketClient: NSObject { // MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. extension BaseWebSocketClient: URLSessionWebSocketDelegate { - public nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { debugPrint("didOpen") Task { await handleConnect() } } - public nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { debugPrint("didClose") Task { await handleClose(code: closeCode, reason: reason) } } -} - + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { debugPrint("didCompleteWithError") From e824bd494bc7fba6be0cf1af56a89b74e463fbe2 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:07:40 +0900 Subject: [PATCH 10/33] =?UTF-8?q?refactor:=20Task=20weak=20self=20?= =?UTF-8?q?=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Core/Remote/WebSocket/BaseWebSocketClient.swift | 6 +++--- .../Core/Remote/WebSocket/ReconnectableSocket.swift | 12 +++++------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index b30277c1..d3c8a78f 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -127,7 +127,7 @@ public final class BaseWebSocketClient: NSObject { await stateChannel.send(.closed(code: code, reason: reason)) Task.detached { [weak self] in - await self?.release() + self?.release() } } @@ -165,12 +165,12 @@ public final class BaseWebSocketClient: NSObject { task.sendPing { [weak self] error in Task { if let error { - await self?.pongContinuation?.resume(throwing: error) + self?.pongContinuation?.resume(throwing: error) await self?.releaseCont() debugPrint("Ping Failed: \(error)") return } - await self?.pongContinuation?.resume() + self?.pongContinuation?.resume() await self?.releaseCont() } } diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index 38d7cc71..365f94cc 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -16,9 +16,6 @@ public class ReconnectableWebSocketClient { /// SocketEngine Protocol private var base: Base? - /// Socket 상태와 메세지를 forwarding - private var forwardIncomingTask: Task? - /// 소켓 상태 재연결하기 위한 Loop private var loopTask: Task? @@ -46,12 +43,12 @@ public class ReconnectableWebSocketClient { guard loopTask == nil else { return } isClosed = false loopTask?.cancel() - loopTask = Task { [weak self] in + loopTask = Task { do { - try await self?.runLoop() + try await runLoop() } catch { - await self?.loopTask?.cancel() - await self?.close() + loopTask?.cancel() + await close() } } } @@ -70,6 +67,7 @@ public class ReconnectableWebSocketClient { deinit { debugPrint(String(describing: Self.self), #function) + base = nil loopTask?.cancel() loopTask = nil From fbd69fb81b0810a92b9910ee817c924f0a2b18c7 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:09:04 +0900 Subject: [PATCH 11/33] =?UTF-8?q?feat:=20=EC=9E=AC=EC=97=B0=EA=B2=B0=20?= =?UTF-8?q?=EC=9B=B9=EC=86=8C=EC=BC=93=20=EA=B0=9D=EC=B2=B4=20=EC=8A=A4?= =?UTF-8?q?=ED=8A=B8=EB=A6=BC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocket/ReconnectableSocket.swift | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index 365f94cc..b86e8b58 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -31,11 +31,18 @@ public class ReconnectableWebSocketClient { /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 private let makeBase: () -> Base + typealias IncomeStream = AsyncStream> + var stream: IncomeStream? + var incomeContinuation: IncomeStream.Continuation? + public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { self.makeBase = makeBase self.policy = policy self.backoff = ExponentialBackoff(policy: policy) + stream = IncomeStream { continuation in + incomeContinuation = continuation + } } /// 소켓 연결 및 재연결 loop 실행 @@ -81,6 +88,8 @@ public class ReconnectableWebSocketClient { self.base = base await base.connect() + Task { await observeData() } + // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 let terminal = await waitTerminalEvent(from: base) @@ -91,27 +100,28 @@ public class ReconnectableWebSocketClient { // 종료 원인 분기 switch classify(closeCode: terminal.closeCode, error: terminal.error) { - case let .closed(code, reason): - await stateChannel.send(.closed(code: code, reason: reason)) + case .closed: release() return - case .nonRetryable(let error): - await stateChannel.send(.failed(error ?? URLError(.networkConnectionLost))) + case .nonRetryable: release() return case .retryable: - - // 재시도 가능한 에러이면 재시도 - // 재연결 시간 정책 반영하여 계산 let delay = backoff.next() - print(backoff.attempt) attempts += 1 - await stateChannel.send(.reconnecting(nextAttempsIn: delay)) try await Task.sleep(for: delay) } } } + private func observeData() async { + guard let base else { return } + + for await value in base.incomingChannel { + incomeContinuation?.yield(value) + } + } + /// 종료 원인 분기 /// - Parameters: @@ -159,17 +169,15 @@ public class ReconnectableWebSocketClient { } private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { - for await _state in base.state { - switch _state { + for await state in base.stateChannel { + print(#function, state) + switch state { case .failed(let error): return (nil, error) - case .closed(let code, let reason): - var reasonString: String = "내용 없음" - if let reason, let text = String(data: reason, encoding: .utf8) { - reasonString = text - } + case .closed(let code, _): return (code, nil) - default: continue + default: + continue } } From 2424e3a0182f218ca1eed6074dd4d585c5f5b3da Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:10:11 +0900 Subject: [PATCH 12/33] =?UTF-8?q?refactor:=20UpbitTickerService=20?= =?UTF-8?q?=ED=85=8C=EC=8A=A4=ED=8A=B8=EB=A5=BC=20=EC=9C=84=ED=95=B4=20?= =?UTF-8?q?=EC=97=85=EC=BA=90=EC=8A=A4=ED=8C=85=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Data/API/Upbit/UpbitTickerService.swift | 25 +++---------------- 1 file changed, 4 insertions(+), 21 deletions(-) diff --git a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift index 9e288ce5..81eca84e 100644 --- a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift +++ b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift @@ -9,23 +9,17 @@ import Foundation /// 업비트 실시간 코인 시세 웹소켓 서비스 final class UpbitTickerService: RealTimeTickerProvider { - private let client: any SocketEngine + private let client: ReconnectableWebSocketClient /// 소켓 상태 stream private var stateStreamTask: Task? - init( - client: any SocketEngine = - ReconnectableWebSocketClient { - BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) - } - ) { + init(client: ReconnectableWebSocketClient = ReconnectableWebSocketClient { BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) }) { self.client = client } func connect() async { await client.connect() - streamingState() } func disconnect() async { @@ -37,7 +31,8 @@ final class UpbitTickerService: RealTimeTickerProvider { func subscribeTickerStream() -> AsyncStream { AsyncStream { continuation in Task { - for await message in client.incoming { + guard let stream = client.stream else { return } + for await message in stream { switch message { case .success(let data): if let ticker = mapTicker(data) { @@ -80,16 +75,4 @@ final class UpbitTickerService: RealTimeTickerProvider { return nil } } - - - /// 소켓 상태를 구독합니다. - private func streamingState() { - self.stateStreamTask?.cancel() - - self.stateStreamTask = Task { - for await state in client.state { - debugPrint("client State: \(state)") - } - } - } } From 1beb74590c5e78f5789b471315e2047005d5619b Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:10:49 +0900 Subject: [PATCH 13/33] =?UTF-8?q?fix:=20AsyncChannel=20=EB=B8=94=EB=A1=9C?= =?UTF-8?q?=ED=82=B9=20=ED=98=84=EC=83=81=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index d3c8a78f..5ddfee5e 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -46,7 +46,7 @@ public final class BaseWebSocketClient: NSObject { stateChannel = .init() incomingChannel = .init() - await stateChannel.send(.connecting) + Task { await stateChannel.send(.connecting) } self.task = session.webSocketTask(with: url) task?.delegate = self From 8b2c59f87347ef3b2935d643ac81e849dd65aa50 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:11:45 +0900 Subject: [PATCH 14/33] =?UTF-8?q?refactor:=20SocketEngine=20=ED=94=84?= =?UTF-8?q?=EB=A1=9C=ED=86=A0=EC=BD=9C=20=EC=9A=94=EA=B5=AC=EC=82=AC?= =?UTF-8?q?=ED=95=AD=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift index b76f07d3..f073a1d2 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift @@ -6,10 +6,11 @@ // import Foundation +import AsyncAlgorithms public protocol SocketEngine { - var state: AsyncStream { get } - var incoming: AsyncStream> { get } + var stateChannel: AsyncChannel { get set } + var incomingChannel: AsyncChannel> { get set } func connect() async func send(_ data: Data) async throws func close() async From 180117ceb5f80bb871663f468f212b9c82c71192 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Wed, 22 Oct 2025 16:12:24 +0900 Subject: [PATCH 15/33] =?UTF-8?q?chore:=20=EC=9B=B9=EC=86=8C=EC=BC=93=20?= =?UTF-8?q?=EC=9E=AC=EC=97=B0=EA=B2=B0=20=ED=85=8C=EC=8A=A4=ED=8A=B8?= =?UTF-8?q?=EB=A5=BC=20=EC=9C=84=ED=95=9C=20=EC=BD=94=EB=93=9C=20=EC=9E=84?= =?UTF-8?q?=EC=8B=9C=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 5ddfee5e..0943cd79 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -42,6 +42,13 @@ public final class BaseWebSocketClient: NSObject { /// 채널을 새로 개설하고 소켓을 엽니다. /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. public func connect() async { + if flag { + Task { + try await Task.sleep(for: .seconds(5)) + task?.cancel(with: .goingAway, reason: nil) + flag = false + } + } stateChannel = .init() incomingChannel = .init() From 94c9803204c967902efe440b8f336935403d4047 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sat, 25 Oct 2025 21:37:47 +0900 Subject: [PATCH 16/33] =?UTF-8?q?chore:=20Build=20=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EB=AC=B4=EC=8B=9C=ED=95=98=EB=8F=84=EB=A1=9D=20=EC=88=98?= =?UTF-8?q?=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index a6ef976e..47280d7f 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,5 @@ AIProject/iCo/App/Resource/GoogleService-Info.plist AIProject/fastlane/.env AIProject/fastlane/.env AIProject/fastlane/report.xml + +AIProject/build From b96df6c8844a81cd3c77dcb66891169018048e5b Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sat, 25 Oct 2025 22:10:29 +0900 Subject: [PATCH 17/33] =?UTF-8?q?refactor:=20=EC=9B=B9=EC=86=8C=EC=BC=93?= =?UTF-8?q?=20=EA=B5=AC=EC=A1=B0=20=EA=B0=9C=ED=8E=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocket/ReconnectableSocket.swift | 362 +++++++++--------- .../Remote/WebSocket/WebSocketClient.swift | 113 ++++++ .../Data/API/Upbit/UpbitTickerService.swift | 6 +- 3 files changed, 297 insertions(+), 184 deletions(-) create mode 100644 AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift index b86e8b58..0869d415 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift @@ -9,184 +9,184 @@ import Foundation import AsyncAlgorithms /// Socket을 상태와 메세지를 포워딩하고 재연결을 책임집니다. -public class ReconnectableWebSocketClient { - /// 시도한 횟수 - private var attempts: Int = 0 - - /// SocketEngine Protocol - private var base: Base? - - /// 소켓 상태 재연결하기 위한 Loop - private var loopTask: Task? - - /// 소켓 연결 상태 flag - private var isClosed = true - - /// 지수적으로 증가하는 재연결 대기 - private var backoff: ExponentialBackoff - - /// 재연결 정책 - private let policy: ReconnectPolicy - - /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 - private let makeBase: () -> Base - - typealias IncomeStream = AsyncStream> - var stream: IncomeStream? - var incomeContinuation: IncomeStream.Continuation? - - public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { - self.makeBase = makeBase - self.policy = policy - self.backoff = ExponentialBackoff(policy: policy) - - stream = IncomeStream { continuation in - incomeContinuation = continuation - } - } - - /// 소켓 연결 및 재연결 loop 실행 - public func connect() async { - guard loopTask == nil else { return } - isClosed = false - loopTask?.cancel() - loopTask = Task { - do { - try await runLoop() - } catch { - loopTask?.cancel() - await close() - } - } - } - - public func close() async { - isClosed = true - await base?.close() - base = nil - - release() - } - - public func send(_ data: Data) async throws { - try await base?.send(data) - } - - deinit { - debugPrint(String(describing: Self.self), #function) - - base = nil - loopTask?.cancel() - loopTask = nil - } - - /// 소켓을 재연결하기 위한 loop입니다. - /// 소켓이 죽으면 종료 원인을 분기하여 재시도 또는 종료합니다. - private func runLoop() async throws { - while !isClosed { - let base = makeBase() - self.base = base - await base.connect() - - Task { await observeData() } - - // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 - let terminal = await waitTerminalEvent(from: base) - - // 사용자가 종료한 것이면 그냥 종료 - if isClosed { - break - } - - // 종료 원인 분기 - switch classify(closeCode: terminal.closeCode, error: terminal.error) { - case .closed: - release() - return - case .nonRetryable: - release() - return - case .retryable: - let delay = backoff.next() - attempts += 1 - try await Task.sleep(for: delay) - } - } - } - - private func observeData() async { - guard let base else { return } - - for await value in base.incomingChannel { - incomeContinuation?.yield(value) - } - } - - - /// 종료 원인 분기 - /// - Parameters: - /// - closeCode: 종료 코드 // 1000 정상 종료등 - /// - error: urlError // 네트워크 연결 에러 등 - /// - Returns: 에러타입 반환 예) retryable , closed, nonRetryable - private func classify(closeCode: URLSessionWebSocketTask.CloseCode?, - error: Error?) -> WebSocket.Failure { - if let code = closeCode { - switch code { - // 일시적 - 재시도 - case .goingAway, .abnormalClosure, .internalServerError, .noStatusReceived: - return .retryable(underlying: error) - // 정상 종료 - case .normalClosure: - return .closed(code: code, reason: nil) - // 정책/프로토콜/보안 - 비재시도 - case .protocolError, .unsupportedData, .policyViolation, .messageTooBig, .tlsHandshakeFailure, .invalidFramePayloadData, .invalid, .mandatoryExtensionMissing: - return .nonRetryable(underlying: error) - default: - return .retryable(underlying: error) - } - } - - if let urlErr = error as? URLError { - switch urlErr.code { - // 일시적 네트워크 - case .notConnectedToInternet, .timedOut, .networkConnectionLost: - return .retryable(underlying: urlErr) - - // 앱 전환/작업 취소 등 - case .cancelled: - return .retryable(underlying: urlErr) - - // 환경/설정/서버 응답 이상은 보수적으로 비재시도 - case .cannotFindHost, .cannotConnectToHost, .badServerResponse, .secureConnectionFailed, .serverCertificateUntrusted, .serverCertificateHasBadDate, .serverCertificateHasUnknownRoot: - return .nonRetryable(underlying: urlErr) - default: - return .retryable(underlying: urlErr) - } - } - - // 알 수 없으면 재시도 쪽으로 - return .retryable(underlying: error) - } - - private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { - for await state in base.stateChannel { - print(#function, state) - switch state { - case .failed(let error): - return (nil, error) - case .closed(let code, _): - return (code, nil) - default: - continue - } - } - - return (nil, nil) - } - - private func release() { - base = nil - loopTask?.cancel() - loopTask = nil - } -} +//public class ReconnectableWebSocketClient { +// /// 시도한 횟수 +// private var attempts: Int = 0 +// +// /// SocketEngine Protocol +// private var base: Base? +// +// /// 소켓 상태 재연결하기 위한 Loop +// private var loopTask: Task? +// +// /// 소켓 연결 상태 flag +// private var isClosed = true +// +// /// 지수적으로 증가하는 재연결 대기 +// private var backoff: ExponentialBackoff +// +// /// 재연결 정책 +// private let policy: ReconnectPolicy +// +// /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 +// private let makeBase: () -> Base +// +// typealias IncomeStream = AsyncStream> +// var stream: IncomeStream? +// var incomeContinuation: IncomeStream.Continuation? +// +// public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { +// self.makeBase = makeBase +// self.policy = policy +// self.backoff = ExponentialBackoff(policy: policy) +// +// stream = IncomeStream { continuation in +// incomeContinuation = continuation +// } +// } +// +// /// 소켓 연결 및 재연결 loop 실행 +// public func connect() async { +// guard loopTask == nil else { return } +// isClosed = false +// loopTask?.cancel() +// loopTask = Task { +// do { +// try await runLoop() +// } catch { +// loopTask?.cancel() +// await close() +// } +// } +// } +// +// public func close() async { +// isClosed = true +// await base?.close() +// base = nil +// +// release() +// } +// +// public func send(_ data: Data) async throws { +// try await base?.send(data) +// } +// +// deinit { +// debugPrint(String(describing: Self.self), #function) +// +// base = nil +// loopTask?.cancel() +// loopTask = nil +// } +// +// /// 소켓을 재연결하기 위한 loop입니다. +// /// 소켓이 죽으면 종료 원인을 분기하여 재시도 또는 종료합니다. +// private func runLoop() async throws { +// while !isClosed { +// let base = makeBase() +// self.base = base +// await base.connect() +// +// Task { await observeData() } +// +// // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 +// let terminal = await waitTerminalEvent(from: base) +// +// // 사용자가 종료한 것이면 그냥 종료 +// if isClosed { +// break +// } +// +// // 종료 원인 분기 +// switch classify(closeCode: terminal.closeCode, error: terminal.error) { +// case .closed: +// release() +// return +// case .nonRetryable: +// release() +// return +// case .retryable: +// let delay = backoff.next() +// attempts += 1 +// try await Task.sleep(for: delay) +// } +// } +// } +// +// private func observeData() async { +// guard let base else { return } +// +// for await value in base.incomingChannel { +// incomeContinuation?.yield(value) +// } +// } +// +// +// /// 종료 원인 분기 +// /// - Parameters: +// /// - closeCode: 종료 코드 // 1000 정상 종료등 +// /// - error: urlError // 네트워크 연결 에러 등 +// /// - Returns: 에러타입 반환 예) retryable , closed, nonRetryable +// private func classify(closeCode: URLSessionWebSocketTask.CloseCode?, +// error: Error?) -> WebSocket.Failure { +// if let code = closeCode { +// switch code { +// // 일시적 - 재시도 +// case .goingAway, .abnormalClosure, .internalServerError, .noStatusReceived: +// return .retryable(underlying: error) +// // 정상 종료 +// case .normalClosure: +// return .closed(code: code, reason: nil) +// // 정책/프로토콜/보안 - 비재시도 +// case .protocolError, .unsupportedData, .policyViolation, .messageTooBig, .tlsHandshakeFailure, .invalidFramePayloadData, .invalid, .mandatoryExtensionMissing: +// return .nonRetryable(underlying: error) +// default: +// return .retryable(underlying: error) +// } +// } +// +// if let urlErr = error as? URLError { +// switch urlErr.code { +// // 일시적 네트워크 +// case .notConnectedToInternet, .timedOut, .networkConnectionLost: +// return .retryable(underlying: urlErr) +// +// // 앱 전환/작업 취소 등 +// case .cancelled: +// return .retryable(underlying: urlErr) +// +// // 환경/설정/서버 응답 이상은 보수적으로 비재시도 +// case .cannotFindHost, .cannotConnectToHost, .badServerResponse, .secureConnectionFailed, .serverCertificateUntrusted, .serverCertificateHasBadDate, .serverCertificateHasUnknownRoot: +// return .nonRetryable(underlying: urlErr) +// default: +// return .retryable(underlying: urlErr) +// } +// } +// +// // 알 수 없으면 재시도 쪽으로 +// return .retryable(underlying: error) +// } +// +// private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { +// for await state in base.stateChannel { +// print(#function, state) +// switch state { +// case .failed(let error): +// return (nil, error) +// case .closed(let code, _): +// return (code, nil) +// default: +// continue +// } +// } +// +// return (nil, nil) +// } +// +// private func release() { +// base = nil +// loopTask?.cancel() +// loopTask = nil +// } +//} diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift new file mode 100644 index 00000000..1dde091c --- /dev/null +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -0,0 +1,113 @@ +// +// BaseWebSocketClient 2.swift +// iCo +// +// Created by 강대훈 on 10/25/25. +// + +import Foundation +import AsyncAlgorithms + +public final class WebSocketClient: NSObject { + + /// 소켓 상태 채널 + public var stateChannel: AsyncChannel + + /// 메세지 채널 + public var incomingChannel: AsyncChannel> + + private let url: URL + private let session: URLSession + private var task: URLSessionWebSocketTask? + + private var stateTask: Task? + private var incomingTask: Task? + + /// 핑 전송 task + private var healthCheck: Task? + + public init(url: URL, session: URLSession = .shared) { + self.url = url + self.session = session + + stateChannel = AsyncChannel() + incomingChannel = AsyncChannel>() + + super.init() + configureTask() + } + + /// 채널을 새로 개설하고 소켓을 엽니다. + /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. + public func connect() async { + self.task = session.webSocketTask(with: url) + task?.delegate = self + task?.resume() + + await stateChannel.send(.connecting) + + // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 + try? await sendPing() + + // connect 되었다 보내야 됨. + } + + private func sendPing() async throws { + return try await withCheckedThrowingContinuation { continuation in + task?.sendPing { error in + Task { + if let error { + debugPrint("Ping Failed: \(error)") + continuation.resume(throwing: error) + return + } + + continuation.resume() + } + } + } + } + + private func configureTask() { + stateTask?.cancel() + incomingTask?.cancel() + + stateTask = Task { + for await value in stateChannel { + // close, error가 왔을 때 state 초기화 로직이 있어야 함. + } + } + + incomingTask = Task { + for await value in incomingChannel { + print(value) + } + } + } + + deinit { + debugPrint(String(describing: Self.self), #function) + task?.cancel() + task = nil + stateChannel.finish() + incomingChannel.finish() + } +} + +// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. +extension WebSocketClient: URLSessionWebSocketDelegate { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + debugPrint("didOpen") + + Task { await stateChannel.send(.connected) } + } + + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + debugPrint("didClose") + } + + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + debugPrint("didCompleteWithError") + } +} + diff --git a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift index 81eca84e..61f659ed 100644 --- a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift +++ b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift @@ -9,12 +9,12 @@ import Foundation /// 업비트 실시간 코인 시세 웹소켓 서비스 final class UpbitTickerService: RealTimeTickerProvider { - private let client: ReconnectableWebSocketClient + private let client: SocketEngine /// 소켓 상태 stream private var stateStreamTask: Task? - init(client: ReconnectableWebSocketClient = ReconnectableWebSocketClient { BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) }) { + init(client: SocketEngine) { self.client = client } @@ -31,7 +31,7 @@ final class UpbitTickerService: RealTimeTickerProvider { func subscribeTickerStream() -> AsyncStream { AsyncStream { continuation in Task { - guard let stream = client.stream else { return } + guard let stream = client.stateChannel else { return } for await message in stream { switch message { case .success(let data): From 71e604e541a63459cdfbdc591a3d5ebb4a2cbc27 Mon Sep 17 00:00:00 2001 From: kangho Date: Sun, 26 Oct 2025 00:00:29 +0900 Subject: [PATCH 18/33] add: async timeout --- AIProject/iCo/Core/Util/Async+Timeout.swift | 32 +++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 AIProject/iCo/Core/Util/Async+Timeout.swift diff --git a/AIProject/iCo/Core/Util/Async+Timeout.swift b/AIProject/iCo/Core/Util/Async+Timeout.swift new file mode 100644 index 00000000..2d7efb2b --- /dev/null +++ b/AIProject/iCo/Core/Util/Async+Timeout.swift @@ -0,0 +1,32 @@ +// +// Async+Timeout.swift +// iCo +// +// Created by kangho on 10/25/25. +// + +import Foundation + +public func race( + _ lhs: sending @escaping () async throws -> T, + _ rhs: sending @escaping () async throws -> T +) async throws -> T { + return try await withThrowingTaskGroup(of: T.self) { group in + group.addTask { try await lhs() } + group.addTask { try await rhs() } + + defer { group.cancelAll() } + + return try await group.next()! + } +} + +public func performWithTimeout( + _ action: sending @escaping () async throws -> T, + at timeout: Duration +) async throws -> T { + return try await race(action) { + try await Task.sleep(until: .now + timeout) + throw URLError(.timedOut) + } +} From 0ccc5a48ba22fd5bcd919251e45a52db37aaac7c Mon Sep 17 00:00:00 2001 From: kangho Date: Sun, 26 Oct 2025 00:01:27 +0900 Subject: [PATCH 19/33] Test: WebSocket --- .../WebSocket/BaseWebSocketClient.swift | 4 +- .../Core/Remote/WebSocket/SocketEngine.swift | 8 +- .../Remote/WebSocket/WebSocketClient.swift | 112 -------- .../Data/API/Upbit/UpbitTickerService.swift | 8 +- .../iCo/Features/Market/MarketView.swift | 5 +- AIProject/iCoTests/Socket/WebSocketTest.swift | 272 ++++++++++++++++++ .../iCoTests/Socket/WebSocketTests.swift | 98 +++---- 7 files changed, 330 insertions(+), 177 deletions(-) create mode 100644 AIProject/iCoTests/Socket/WebSocketTest.swift diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift index 0943cd79..675dedc0 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift @@ -124,14 +124,14 @@ public final class BaseWebSocketClient: NSObject { private func handleClose(with error: Error) async { guard task != nil else { return } - await stateChannel.send(.failed(error)) + await stateChannel.send(.failed) release() } private func handleClose(code: URLSessionWebSocketTask.CloseCode, reason: Data?) async { guard task != nil else { return } - await stateChannel.send(.closed(code: code, reason: reason)) + await stateChannel.send(.closed) Task.detached { [weak self] in self?.release() diff --git a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift index f073a1d2..3e70a2e1 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/SocketEngine.swift @@ -19,8 +19,8 @@ public protocol SocketEngine { public enum WebSocket { public enum State: Sendable { case connecting, connected - case failed(Error) - case closed(code: URLSessionWebSocketTask.CloseCode, reason: Data?) + case failed + case closed case reconnecting(nextAttempsIn: Duration) } @@ -51,10 +51,6 @@ extension WebSocket.State: Equatable { return true case (.connected, .connected): return true - case (.failed(let lhsError), .failed(let rhsError)): - return lhsError as NSError == rhsError as NSError - case (.closed(let lhsCode, let lhsReason), .closed(let rhsCode, let rhsReason)): - return lhsCode == rhsCode && lhsReason == rhsReason case (.reconnecting(let lhsDelay), .reconnecting(let rhsDelay)): return lhsDelay == rhsDelay default: diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 1dde091c..8b137891 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -1,113 +1 @@ -// -// BaseWebSocketClient 2.swift -// iCo -// -// Created by 강대훈 on 10/25/25. -// - -import Foundation -import AsyncAlgorithms - -public final class WebSocketClient: NSObject { - - /// 소켓 상태 채널 - public var stateChannel: AsyncChannel - - /// 메세지 채널 - public var incomingChannel: AsyncChannel> - - private let url: URL - private let session: URLSession - private var task: URLSessionWebSocketTask? - - private var stateTask: Task? - private var incomingTask: Task? - - /// 핑 전송 task - private var healthCheck: Task? - - public init(url: URL, session: URLSession = .shared) { - self.url = url - self.session = session - - stateChannel = AsyncChannel() - incomingChannel = AsyncChannel>() - - super.init() - configureTask() - } - - /// 채널을 새로 개설하고 소켓을 엽니다. - /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. - public func connect() async { - self.task = session.webSocketTask(with: url) - task?.delegate = self - task?.resume() - - await stateChannel.send(.connecting) - - // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 - try? await sendPing() - - // connect 되었다 보내야 됨. - } - - private func sendPing() async throws { - return try await withCheckedThrowingContinuation { continuation in - task?.sendPing { error in - Task { - if let error { - debugPrint("Ping Failed: \(error)") - continuation.resume(throwing: error) - return - } - - continuation.resume() - } - } - } - } - - private func configureTask() { - stateTask?.cancel() - incomingTask?.cancel() - - stateTask = Task { - for await value in stateChannel { - // close, error가 왔을 때 state 초기화 로직이 있어야 함. - } - } - - incomingTask = Task { - for await value in incomingChannel { - print(value) - } - } - } - - deinit { - debugPrint(String(describing: Self.self), #function) - task?.cancel() - task = nil - stateChannel.finish() - incomingChannel.finish() - } -} - -// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. -extension WebSocketClient: URLSessionWebSocketDelegate { - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - debugPrint("didOpen") - - Task { await stateChannel.send(.connected) } - } - - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { - debugPrint("didClose") - } - - public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { - debugPrint("didCompleteWithError") - } -} diff --git a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift index 61f659ed..0dc07747 100644 --- a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift +++ b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift @@ -9,12 +9,12 @@ import Foundation /// 업비트 실시간 코인 시세 웹소켓 서비스 final class UpbitTickerService: RealTimeTickerProvider { - private let client: SocketEngine + private let client: BaseWebSocketClient /// 소켓 상태 stream private var stateStreamTask: Task? - init(client: SocketEngine) { + init(client: BaseWebSocketClient = BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) { self.client = client } @@ -31,8 +31,8 @@ final class UpbitTickerService: RealTimeTickerProvider { func subscribeTickerStream() -> AsyncStream { AsyncStream { continuation in Task { - guard let stream = client.stateChannel else { return } - for await message in stream { + for await message in client.incomingChannel { + switch message { case .success(let data): if let ticker = mapTicker(data) { diff --git a/AIProject/iCo/Features/Market/MarketView.swift b/AIProject/iCo/Features/Market/MarketView.swift index 84d9079a..e6123a74 100644 --- a/AIProject/iCo/Features/Market/MarketView.swift +++ b/AIProject/iCo/Features/Market/MarketView.swift @@ -124,9 +124,6 @@ extension MarketView { #Preview { MarketView( coinService: UpBitAPIService(), - tickerService: UpbitTickerService(client: - ReconnectableWebSocketClient { - BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) - }) + tickerService: UpbitTickerService(client: BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) ) } diff --git a/AIProject/iCoTests/Socket/WebSocketTest.swift b/AIProject/iCoTests/Socket/WebSocketTest.swift new file mode 100644 index 00000000..0dc80ede --- /dev/null +++ b/AIProject/iCoTests/Socket/WebSocketTest.swift @@ -0,0 +1,272 @@ +// +// WebSocketTest.swift +// iCoTests +// +// Created by kangho on 10/25/25. +// + +import XCTest +import iCo + +final class WebSocketTest: XCTestCase { + + override func setUpWithError() throws { + // Put setup code here. This method is called before the invocation of each test method in the class. + } + + override func tearDownWithError() throws { + // Put teardown code here. This method is called after the invocation of each test method in the class. + } + + func testExample() async throws { + let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) + + await sut.connect() + + try await sut.send(text: "hi") + } + + func testReconnenctWhenAbnormalClose() async throws { + let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) + + await sut.connect() + + try await Task.sleep(for: .seconds(2)) + + sut.cancel(with: .abnormalClosure) + + try await Task.sleep(for: .seconds(2)) + + // connecting -> connected -> abnormal close -> handleDisconnect -> reconnect + // .... -> abnormal close -> didCompletWithError -> reconnect + } + + func testUserClose() async throws { + let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) + + await sut.connect() + + try await Task.sleep(for: .seconds(2)) + + sut.cancel(with: .normalClosure) + + try await Task.sleep(for: .seconds(2)) + } + + func testPerformanceExample() throws { + // This is an example of a performance test case. + self.measure { + // Put the code you want to measure the time of here. + } + } +} + +import Foundation +import AsyncAlgorithms + +public final class WebSocketClient: NSObject { + + /// 소켓 상태 채널 + public var stateChannel: AsyncChannel + + /// 메세지 채널 + public var incomingChannel: AsyncChannel + + private let url: URL + private let session: URLSession + private var task: URLSessionWebSocketTask? + + private var stateTask: Task? + private var incomingTask: Task? + private var receiveTask: Task? + + /// 핑 전송 task + private var healthCheck: Task? + private var pingInterval: Duration = .seconds(30) + private var pingTimeout: Duration = .seconds(10) + + public init(url: URL, session: URLSession = .shared) { + self.url = url + self.session = session + + stateChannel = AsyncChannel() + incomingChannel = AsyncChannel() + + super.init() + + configureTask() + } + + /// 채널을 새로 개설하고 소켓을 엽니다. + /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. + public func connect() async { + await stateChannel.send(.connecting) + self.task = session.webSocketTask(with: url) + task?.delegate = self + task?.resume() + + // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 + do { + try await performWithTimeout(sendPing, at: pingTimeout) + } catch { + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + + // TODO: 고치기 + public func send(text: String) async throws { + try await self.task?.send(.data(Data(text.utf8))) + } + + deinit { + debugPrint(String(describing: Self.self), #function) + task?.cancel() + task = nil + stateChannel.finish() + incomingChannel.finish() + } +} + +// MARK: - Private + +extension WebSocketClient { + + public func sendState(with state: WebSocket.State) async { + await stateChannel.send(state) + } + + public func cancel(with code: URLSessionWebSocketTask.CloseCode) { + task?.cancel(with: code, reason: nil) + } +} + +// MARK: - Private + +extension WebSocketClient { + + private func sendPing() async throws { + return try await withCheckedThrowingContinuation { continuation in + task?.sendPing { error in + Task { + if let error { + debugPrint("Ping Failed: \(error)") + continuation.resume(throwing: error) + return + } + + continuation.resume() + } + } + } + } + + private func configureTask() { + stateTask?.cancel() + incomingTask?.cancel() + + stateTask = Task { + for await state in stateChannel { + switch state { + case .connecting: + debugPrint("connecting...") + break + case .connected: + receive() + checkingAlive() + case .failed, .closed: release() + case .reconnecting: + print("in reconnecting state") +// release() + await reconnect() + } + } + } + + incomingTask = Task { + for await value in incomingChannel { + if case let .string(text) = value { + print(text) + } + } + } + } + + private func receive() { + receiveTask?.cancel() + receiveTask = Task { + // TODO: handle task is nil + guard let task else { return } + let message = try await task.receive() + await incomingChannel.send(message) + receive() + } + } + + private func checkingAlive() { + healthCheck?.cancel() + + healthCheck = Task { + do { + while true { + try await Task.sleep(until: .now + pingInterval) + try await performWithTimeout(sendPing, at: .seconds(10)) + } + } catch { + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + } + + private func handleDisconneted(_ userClose: Bool) async { + if userClose { + await stateChannel.send(.closed) + } else { + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + + private func reconnect() async { + debugPrint("try reconnecting...") + await connect() + } + + private func release() { + stateTask?.cancel() + stateTask = nil + receiveTask?.cancel() + receiveTask = nil + healthCheck?.cancel() + healthCheck = nil + incomingTask?.cancel() + incomingTask = nil + + if task?.state == .running { + task?.cancel(with: .goingAway, reason: nil) + } + task = nil + } +} + +// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. +extension WebSocketClient: URLSessionWebSocketDelegate { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + debugPrint("didOpen") + + Task { await stateChannel.send(.connected) } + } + + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + debugPrint("didClose") + + Task { await handleDisconneted(closeCode == .normalClosure) } + } + + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + debugPrint("didCompleteWithError") + + Task { + + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) } + } +} + diff --git a/AIProject/iCoTests/Socket/WebSocketTests.swift b/AIProject/iCoTests/Socket/WebSocketTests.swift index 7bf11c3c..29aeae43 100644 --- a/AIProject/iCoTests/Socket/WebSocketTests.swift +++ b/AIProject/iCoTests/Socket/WebSocketTests.swift @@ -1,52 +1,52 @@ +//// +//// WebSocketTests.swift +//// AIProjectTests +//// +//// Created by kangho lee on 8/17/25. +//// // -// WebSocketTests.swift -// AIProjectTests +//import XCTest +//@testable import iCo // -// Created by kangho lee on 8/17/25. +//final class WebSocketTests: XCTestCase { +// +// static let echoURL = URL(string: "wss://echo.websocket.org")! +// static let upbitURL = URL(string: "wss://api.upbit.com/websocket/v1")! // - -import XCTest -@testable import iCo - -final class WebSocketTests: XCTestCase { - - static let echoURL = URL(string: "wss://echo.websocket.org")! - static let upbitURL = URL(string: "wss://api.upbit.com/websocket/v1")! - - func test_connect() async throws { - var (socket, states) = makeSUT() - - Task { - for await state in socket.state { - states.append(state) - } - } - - await socket.connect() - XCTAssertEqual(states, [.connecting, .connected]) - } - - func test_user_disconnect() async throws { - var (socket, states) = makeSUT() - - Task { - for await state in socket.state { - states.append(state) - } - } - - await socket.connect() - await socket.close() - try await Task.sleep(for: .milliseconds(100)) - XCTAssertEqual(states, [.connecting, .connected, .closed(code: .normalClosure, reason: nil)]) - } - - // MARK: Helper - - private func makeSUT(url: URL = upbitURL) -> (SocketEngine, [WebSocket.State]) { - let socket = BaseWebSocketClient(url: url) - - return (socket, [WebSocket.State]()) - } - -} +// func test_connect() async throws { +// var (socket, states) = makeSUT() +// +// Task { +// for await state in socket.state { +// states.append(state) +// } +// } +// +// await socket.connect() +// XCTAssertEqual(states, [.connecting, .connected]) +// } +// +// func test_user_disconnect() async throws { +// var (socket, states) = makeSUT() +// +// Task { +// for await state in socket.state { +// states.append(state) +// } +// } +// +// await socket.connect() +// await socket.close() +// try await Task.sleep(for: .milliseconds(100)) +// XCTAssertEqual(states, [.connecting, .connected, .closed(code: .normalClosure, reason: nil)]) +// } +// +// // MARK: Helper +// +// private func makeSUT(url: URL = upbitURL) -> (SocketEngine, [WebSocket.State]) { +// let socket = BaseWebSocketClient(url: url) +// +// return (socket, [WebSocket.State]()) +// } +// +//} From bee2f782eada03ddf2fe7998d6fb0f3c9a59c585 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sun, 26 Oct 2025 16:35:58 +0900 Subject: [PATCH 20/33] =?UTF-8?q?fix:=20=EC=9E=AC=EC=97=B0=EA=B2=B0?= =?UTF-8?q?=EC=9D=B4=20=EB=90=98=EC=A7=80=20=EC=95=8A=EB=8A=94=20=EB=AC=B8?= =?UTF-8?q?=EC=A0=9C=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Remote/WebSocket/WebSocketClient.swift | 217 +++++++++++++++++ AIProject/iCoTests/Socket/WebSocketTest.swift | 229 ++---------------- 2 files changed, 234 insertions(+), 212 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 8b137891..e5dea094 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -1 +1,218 @@ +import Foundation +import AsyncAlgorithms +public final class WebSocketClient: NSObject { + /// 소켓 상태 채널 + public var stateChannel: AsyncChannel + + /// 메세지 채널 + public var incomingChannel: AsyncChannel + + private let url: URL + private let session: URLSession + private var task: URLSessionWebSocketTask? + + private var stateTask: Task? + private var incomingTask: Task? + private var receiveTask: Task? + + /// 핑 전송 task + private var healthCheck: Task? + private var pingInterval: Duration = .seconds(30) + private var pingTimeout: Duration = .seconds(10) + + public init(url: URL, session: URLSession = .shared) { + self.url = url + self.session = session + + stateChannel = AsyncChannel() + incomingChannel = AsyncChannel() + + super.init() + + configureTask() + } + + /// 채널을 새로 개설하고 소켓을 엽니다. + /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. + public func connect() async { + //await stateChannel.send(.connecting) + self.task = session.webSocketTask(with: url) + task?.delegate = self + task?.resume() + + // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 + do { + try await performWithTimeout(sendPing, at: pingTimeout) + } catch { + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + + public func send(text: String) async throws { + try await task?.send(.string(text)) + } + + public func send(data: Data) async throws { + try await task?.send(.data(data)) + } + + deinit { + debugPrint(String(describing: Self.self), #function) + task?.cancel() + task = nil + stateChannel.finish() + incomingChannel.finish() + } +} + +// MARK: - Test용 메소드 +extension WebSocketClient { + public func sendState(with state: WebSocket.State) async { + await stateChannel.send(state) + } + + public func cancel(with code: URLSessionWebSocketTask.CloseCode) { + task?.cancel(with: code, reason: nil) + task = nil + } + + public func cancel() { + task?.cancel() + task = nil + } +} + +// MARK: - Private +extension WebSocketClient { + private func sendPing() async throws { + return try await withCheckedThrowingContinuation { continuation in + task?.sendPing { error in + Task { + if let error { + debugPrint("Ping Failed: \(error)") + continuation.resume(throwing: error) + return + } + + continuation.resume() + } + } + } + } + + private func configureTask() { + stateTask = Task { + for await state in stateChannel { + switch state { + case .connecting: + print("Connecting") + continue + case .connected: + print("Connected") + receive() + checkingAlive() + case .failed, .closed: + release() + case .reconnecting: + print("Reconnecting") + await reconnect() + } + } + } + + incomingTask = Task { + for await value in incomingChannel { + print(value) + } + } + } + + private func receive() { + receiveTask?.cancel() + + receiveTask = Task { + do { + guard let task else { return } + let message = try await task.receive() + await incomingChannel.send(message) + receive() + } catch { + print("Receive Error!!!!!") + } + } + } + + private func checkingAlive() { + healthCheck?.cancel() + + healthCheck = Task { + do { + while true { + try await Task.sleep(until: .now + pingInterval) + try await performWithTimeout(sendPing, at: .seconds(10)) + } + } catch is CancellationError { + debugPrint("작업이 취소되었습니다.") + } catch { + // TODO: 정말 필요한 코드일까? + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + } + + private func handleDisconnected(_ userClose: Bool) async { + if userClose { + await stateChannel.send(.closed) + } else { + await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + } + } + + private func reconnect() async { + await connect() + } + + private func release() { + stateTask?.cancel() + stateTask = nil + receiveTask?.cancel() + receiveTask = nil + healthCheck?.cancel() + healthCheck = nil + incomingTask?.cancel() + incomingTask = nil + + if task?.state == .running { + task?.cancel(with: .goingAway, reason: nil) + } + + task = nil + } +} + +// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. +extension WebSocketClient: URLSessionWebSocketDelegate { + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { + debugPrint("didOpen") + + Task { await stateChannel.send(.connected) } + } + + // 웹소켓으로부터 Close Code를 받았을 때. (정상 종료로 닫혔을 때) + public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { + debugPrint("didClose") + + Task { await handleDisconnected(closeCode == .normalClosure) } + } + + // 세션 레벨에서 작업이 완전히 종료됐을 때. + // 1. 네트워크 닫힘, 2. 에러로 종료, 3. 정상적으로 완료 + public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { + debugPrint("didCompleteWithError") + + if let error { + Task { await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) } + } + } +} diff --git a/AIProject/iCoTests/Socket/WebSocketTest.swift b/AIProject/iCoTests/Socket/WebSocketTest.swift index 0dc80ede..7ac32943 100644 --- a/AIProject/iCoTests/Socket/WebSocketTest.swift +++ b/AIProject/iCoTests/Socket/WebSocketTest.swift @@ -41,15 +41,29 @@ final class WebSocketTest: XCTestCase { // .... -> abnormal close -> didCompletWithError -> reconnect } + func testReceiveData() async throws { + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) + let requestFormat = "[{ticket:test},{type:ticker,codes:[KRW-BTC]}]" + + await sut.connect() + try await sut.send(text: requestFormat) + try await Task.sleep(for: .seconds(3)) + + sut.cancel() + + // TODO: Connected를 기다리지 않으면 아래 Send를 해도 데이터가 들어오지 않음. + try await Task.sleep(for: .seconds(3)) + try await sut.send(text: requestFormat) + try await Task.sleep(for: .seconds(10)) + } + func testUserClose() async throws { let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) await sut.connect() try await Task.sleep(for: .seconds(2)) - - sut.cancel(with: .normalClosure) - + sut.cancel() try await Task.sleep(for: .seconds(2)) } @@ -61,212 +75,3 @@ final class WebSocketTest: XCTestCase { } } -import Foundation -import AsyncAlgorithms - -public final class WebSocketClient: NSObject { - - /// 소켓 상태 채널 - public var stateChannel: AsyncChannel - - /// 메세지 채널 - public var incomingChannel: AsyncChannel - - private let url: URL - private let session: URLSession - private var task: URLSessionWebSocketTask? - - private var stateTask: Task? - private var incomingTask: Task? - private var receiveTask: Task? - - /// 핑 전송 task - private var healthCheck: Task? - private var pingInterval: Duration = .seconds(30) - private var pingTimeout: Duration = .seconds(10) - - public init(url: URL, session: URLSession = .shared) { - self.url = url - self.session = session - - stateChannel = AsyncChannel() - incomingChannel = AsyncChannel() - - super.init() - - configureTask() - } - - /// 채널을 새로 개설하고 소켓을 엽니다. - /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. - public func connect() async { - await stateChannel.send(.connecting) - self.task = session.webSocketTask(with: url) - task?.delegate = self - task?.resume() - - // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 - do { - try await performWithTimeout(sendPing, at: pingTimeout) - } catch { - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) - } - } - - // TODO: 고치기 - public func send(text: String) async throws { - try await self.task?.send(.data(Data(text.utf8))) - } - - deinit { - debugPrint(String(describing: Self.self), #function) - task?.cancel() - task = nil - stateChannel.finish() - incomingChannel.finish() - } -} - -// MARK: - Private - -extension WebSocketClient { - - public func sendState(with state: WebSocket.State) async { - await stateChannel.send(state) - } - - public func cancel(with code: URLSessionWebSocketTask.CloseCode) { - task?.cancel(with: code, reason: nil) - } -} - -// MARK: - Private - -extension WebSocketClient { - - private func sendPing() async throws { - return try await withCheckedThrowingContinuation { continuation in - task?.sendPing { error in - Task { - if let error { - debugPrint("Ping Failed: \(error)") - continuation.resume(throwing: error) - return - } - - continuation.resume() - } - } - } - } - - private func configureTask() { - stateTask?.cancel() - incomingTask?.cancel() - - stateTask = Task { - for await state in stateChannel { - switch state { - case .connecting: - debugPrint("connecting...") - break - case .connected: - receive() - checkingAlive() - case .failed, .closed: release() - case .reconnecting: - print("in reconnecting state") -// release() - await reconnect() - } - } - } - - incomingTask = Task { - for await value in incomingChannel { - if case let .string(text) = value { - print(text) - } - } - } - } - - private func receive() { - receiveTask?.cancel() - receiveTask = Task { - // TODO: handle task is nil - guard let task else { return } - let message = try await task.receive() - await incomingChannel.send(message) - receive() - } - } - - private func checkingAlive() { - healthCheck?.cancel() - - healthCheck = Task { - do { - while true { - try await Task.sleep(until: .now + pingInterval) - try await performWithTimeout(sendPing, at: .seconds(10)) - } - } catch { - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) - } - } - } - - private func handleDisconneted(_ userClose: Bool) async { - if userClose { - await stateChannel.send(.closed) - } else { - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) - } - } - - private func reconnect() async { - debugPrint("try reconnecting...") - await connect() - } - - private func release() { - stateTask?.cancel() - stateTask = nil - receiveTask?.cancel() - receiveTask = nil - healthCheck?.cancel() - healthCheck = nil - incomingTask?.cancel() - incomingTask = nil - - if task?.state == .running { - task?.cancel(with: .goingAway, reason: nil) - } - task = nil - } -} - -// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. -extension WebSocketClient: URLSessionWebSocketDelegate { - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - debugPrint("didOpen") - - Task { await stateChannel.send(.connected) } - } - - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { - debugPrint("didClose") - - Task { await handleDisconneted(closeCode == .normalClosure) } - } - - public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { - debugPrint("didCompleteWithError") - - Task { - - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) } - } -} - From 0a3fb25bfa77172bc1323cbbbda2caa879283fe8 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sun, 26 Oct 2025 23:43:58 +0900 Subject: [PATCH 21/33] =?UTF-8?q?chore:=20=EA=B8=B0=EC=A1=B4=20=EC=9B=B9?= =?UTF-8?q?=EC=86=8C=EC=BC=93=20=ED=8C=8C=EC=9D=BC=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../WebSocket/BaseWebSocketClient.swift | 231 ------------------ .../WebSocket/Reconnectable+Model.swift | 54 ---- .../WebSocket/ReconnectableSocket.swift | 192 --------------- .../iCoTests/Socket/WebSocketTests.swift | 52 ---- 4 files changed, 529 deletions(-) delete mode 100644 AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift delete mode 100644 AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift delete mode 100644 AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift delete mode 100644 AIProject/iCoTests/Socket/WebSocketTests.swift diff --git a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift deleted file mode 100644 index 675dedc0..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/BaseWebSocketClient.swift +++ /dev/null @@ -1,231 +0,0 @@ -// -// BaseWebSocketClient.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation -import AsyncAlgorithms - -var flag = true - -/// 웹소켓 client입니다. 메세지와 연결 상태 stream, 연결, 해제를 책임집니다. -public final class BaseWebSocketClient: NSObject { - - /// 소켓 상태 채널 - public var stateChannel: AsyncChannel - - /// 메세지 채널 - public var incomingChannel: AsyncChannel> - - private let url: URL - private let session: URLSession - private var task: URLSessionWebSocketTask? - - private var pongContinuation: CheckedContinuation? - - /// 핑 전송 task - private var healthCheck: Task? - - public init(url: URL, session: URLSession = .shared) { - - self.url = url - self.session = session - - stateChannel = AsyncChannel() - incomingChannel = AsyncChannel>() - - super.init() - } - - /// 채널을 새로 개설하고 소켓을 엽니다. - /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. - public func connect() async { - if flag { - Task { - try await Task.sleep(for: .seconds(5)) - task?.cancel(with: .goingAway, reason: nil) - flag = false - } - } - - stateChannel = .init() - incomingChannel = .init() - - Task { await stateChannel.send(.connecting) } - - self.task = session.webSocketTask(with: url) - task?.delegate = self - task?.resume() - - // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 - try? await sendPing() - } - - public func send(_ data: Data) async throws { - do { - try await task?.send(.data(data)) - } catch { - await handleClose(with: error) - } - } - - public func close() async { - guard let task else { - await handleClose(code: .normalClosure, reason: nil) - return - } - task.cancel(with: .normalClosure, reason: nil) - } - - deinit { - debugPrint(String(describing: Self.self), #function) - task?.cancel() - task = nil - stateChannel.finish() - incomingChannel.finish() - } - - - /// 상태가 연결된 동안 메세지를 수신하여 채널로 보냅니다. - private func receiveLoop() async { - guard let task else { return } - while true { - do { - let message = try await task.receive() - - switch message { - case .data(let data): - await incomingChannel.send(.success(data)) - case .string(let string): - await incomingChannel.send(.success(Data(string.utf8))) - @unknown default: - await incomingChannel.send(.failure(.frameCorrupted)) - } - } catch is CancellationError { - await handleClose(code: .normalClosure, reason: nil) - return - } catch { - if (error as NSError).code == 57 { - debugPrint("WebSocekt is not connected Error 57") - await handleClose(with: NetworkError.webSocketError) - return - } else if let urlError = error as? URLError, urlError.code == .cancelled { - await handleClose(code: .normalClosure, reason: nil) - } else { - await handleClose(with: error) - return - } - } - } - } - - private func handleClose(with error: Error) async { - guard task != nil else { return } - - await stateChannel.send(.failed) - release() - } - - private func handleClose(code: URLSessionWebSocketTask.CloseCode, reason: Data?) async { - guard task != nil else { return } - - await stateChannel.send(.closed) - - Task.detached { [weak self] in - self?.release() - } - } - - func release() { - task = nil - - healthCheck?.cancel() - healthCheck = nil - stateChannel.finish() - incomingChannel.finish() - } - - /// 인터벌마다 핑 보내는 메소드 - private func checkingAlive(duration: Duration) { - self.healthCheck?.cancel() - - self.healthCheck = Task { - while task != nil { - do { - try await Task.sleep(for: duration, clock: .suspending) - try await sendPing() - } catch { - await handleClose(with: error) - break - } - } - } - } - - private func sendPing() async throws { - guard let task else { return } - - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - self.pongContinuation = continuation - task.sendPing { [weak self] error in - Task { - if let error { - self?.pongContinuation?.resume(throwing: error) - await self?.releaseCont() - debugPrint("Ping Failed: \(error)") - return - } - self?.pongContinuation?.resume() - await self?.releaseCont() - } - } - } - } - - private func releaseCont() async { - self.pongContinuation = nil - } - - /// Socket으로부터 Open 응답을 받으면 상태를 변경하고 메세지 stream을 시작합니다. - /// 핑을 주기적으로 보냅니다. - /// 여기서 120은 업비트의 최소 주기 시간입니다. - private func handleConnect() async { - await stateChannel.send(.connected) - - Task { - await receiveLoop() - } - - checkingAlive(duration: .seconds(120)) - } -} - -// MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. -extension BaseWebSocketClient: URLSessionWebSocketDelegate { - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - debugPrint("didOpen") - Task { - await handleConnect() - } - } - - public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { - debugPrint("didClose") - Task { - await handleClose(code: closeCode, reason: reason) - } - } - - public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { - debugPrint("didCompleteWithError") - - guard let error else { - Task { await handleClose(code: .goingAway, reason: nil)} - return - } - - Task { await handleClose(with: error) } - } -} diff --git a/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift b/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift deleted file mode 100644 index d80888bc..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/Reconnectable+Model.swift +++ /dev/null @@ -1,54 +0,0 @@ -// -// Reconnectable+Model.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation - -/// 재연결 정책. 지수적으로 재연결 인터벌을 증가시켜 서버 부하를 줄임 -public struct ReconnectPolicy { - public var base: Duration = .milliseconds(500) - public var factor: Double = 2.0 - public var max: Duration = .seconds(10) - public var jitter: Double = 0.3 - public var foregroundOnly: Bool = true - public var maxAttemps = 3 - - public init(base: Duration, factor: Double, max: Duration, jitter: Double, foregroundOnly: Bool) { - self.base = base - self.factor = factor - self.max = max - self.jitter = jitter - self.foregroundOnly = foregroundOnly - } - - public static func defaultPolicy() -> ReconnectPolicy { - ReconnectPolicy(base: .milliseconds(500), factor: 2.0, max: .seconds(10), jitter: 0.3, foregroundOnly: true) - } -} - -public struct ExponentialBackoff { - let policy: ReconnectPolicy - private(set) var attempt: Int = 0 - - mutating func next() -> Duration { - defer { attempt += 1 } - let expo = min( - Double(policy.base.components.seconds) * pow(policy.factor, Double(attempt)), - Double(policy.max.components.seconds) - ) - let jitterRatio = 1.0 + (Double.random(in: 0...(policy.jitter))) - let seconds = max(0.1, expo * jitterRatio) - return .seconds(seconds) - } - - mutating func reset() { attempt = 0 } -} - -private extension Duration { - var seconds: Double { - Double(components.seconds) + Double(components.attoseconds) / 1e18 - } -} diff --git a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift b/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift deleted file mode 100644 index 0869d415..00000000 --- a/AIProject/iCo/Core/Remote/WebSocket/ReconnectableSocket.swift +++ /dev/null @@ -1,192 +0,0 @@ -// -// ReconnectableSocket.swift -// AIProject -// -// Created by kangho lee on 8/17/25. -// - -import Foundation -import AsyncAlgorithms - -/// Socket을 상태와 메세지를 포워딩하고 재연결을 책임집니다. -//public class ReconnectableWebSocketClient { -// /// 시도한 횟수 -// private var attempts: Int = 0 -// -// /// SocketEngine Protocol -// private var base: Base? -// -// /// 소켓 상태 재연결하기 위한 Loop -// private var loopTask: Task? -// -// /// 소켓 연결 상태 flag -// private var isClosed = true -// -// /// 지수적으로 증가하는 재연결 대기 -// private var backoff: ExponentialBackoff -// -// /// 재연결 정책 -// private let policy: ReconnectPolicy -// -// /// 소켓은 재사용하기 어렵기 때문에 closure로 캡처하여 재연결 시 사용 -// private let makeBase: () -> Base -// -// typealias IncomeStream = AsyncStream> -// var stream: IncomeStream? -// var incomeContinuation: IncomeStream.Continuation? -// -// public init(makeBase: @escaping () -> Base, policy: ReconnectPolicy = .defaultPolicy()) { -// self.makeBase = makeBase -// self.policy = policy -// self.backoff = ExponentialBackoff(policy: policy) -// -// stream = IncomeStream { continuation in -// incomeContinuation = continuation -// } -// } -// -// /// 소켓 연결 및 재연결 loop 실행 -// public func connect() async { -// guard loopTask == nil else { return } -// isClosed = false -// loopTask?.cancel() -// loopTask = Task { -// do { -// try await runLoop() -// } catch { -// loopTask?.cancel() -// await close() -// } -// } -// } -// -// public func close() async { -// isClosed = true -// await base?.close() -// base = nil -// -// release() -// } -// -// public func send(_ data: Data) async throws { -// try await base?.send(data) -// } -// -// deinit { -// debugPrint(String(describing: Self.self), #function) -// -// base = nil -// loopTask?.cancel() -// loopTask = nil -// } -// -// /// 소켓을 재연결하기 위한 loop입니다. -// /// 소켓이 죽으면 종료 원인을 분기하여 재시도 또는 종료합니다. -// private func runLoop() async throws { -// while !isClosed { -// let base = makeBase() -// self.base = base -// await base.connect() -// -// Task { await observeData() } -// -// // 소켓이 종료될 때 까지 대기 및 종료 원인 응답 대기 -// let terminal = await waitTerminalEvent(from: base) -// -// // 사용자가 종료한 것이면 그냥 종료 -// if isClosed { -// break -// } -// -// // 종료 원인 분기 -// switch classify(closeCode: terminal.closeCode, error: terminal.error) { -// case .closed: -// release() -// return -// case .nonRetryable: -// release() -// return -// case .retryable: -// let delay = backoff.next() -// attempts += 1 -// try await Task.sleep(for: delay) -// } -// } -// } -// -// private func observeData() async { -// guard let base else { return } -// -// for await value in base.incomingChannel { -// incomeContinuation?.yield(value) -// } -// } -// -// -// /// 종료 원인 분기 -// /// - Parameters: -// /// - closeCode: 종료 코드 // 1000 정상 종료등 -// /// - error: urlError // 네트워크 연결 에러 등 -// /// - Returns: 에러타입 반환 예) retryable , closed, nonRetryable -// private func classify(closeCode: URLSessionWebSocketTask.CloseCode?, -// error: Error?) -> WebSocket.Failure { -// if let code = closeCode { -// switch code { -// // 일시적 - 재시도 -// case .goingAway, .abnormalClosure, .internalServerError, .noStatusReceived: -// return .retryable(underlying: error) -// // 정상 종료 -// case .normalClosure: -// return .closed(code: code, reason: nil) -// // 정책/프로토콜/보안 - 비재시도 -// case .protocolError, .unsupportedData, .policyViolation, .messageTooBig, .tlsHandshakeFailure, .invalidFramePayloadData, .invalid, .mandatoryExtensionMissing: -// return .nonRetryable(underlying: error) -// default: -// return .retryable(underlying: error) -// } -// } -// -// if let urlErr = error as? URLError { -// switch urlErr.code { -// // 일시적 네트워크 -// case .notConnectedToInternet, .timedOut, .networkConnectionLost: -// return .retryable(underlying: urlErr) -// -// // 앱 전환/작업 취소 등 -// case .cancelled: -// return .retryable(underlying: urlErr) -// -// // 환경/설정/서버 응답 이상은 보수적으로 비재시도 -// case .cannotFindHost, .cannotConnectToHost, .badServerResponse, .secureConnectionFailed, .serverCertificateUntrusted, .serverCertificateHasBadDate, .serverCertificateHasUnknownRoot: -// return .nonRetryable(underlying: urlErr) -// default: -// return .retryable(underlying: urlErr) -// } -// } -// -// // 알 수 없으면 재시도 쪽으로 -// return .retryable(underlying: error) -// } -// -// private func waitTerminalEvent(from base: Base) async -> (closeCode: URLSessionWebSocketTask.CloseCode?, error: Error?) { -// for await state in base.stateChannel { -// print(#function, state) -// switch state { -// case .failed(let error): -// return (nil, error) -// case .closed(let code, _): -// return (code, nil) -// default: -// continue -// } -// } -// -// return (nil, nil) -// } -// -// private func release() { -// base = nil -// loopTask?.cancel() -// loopTask = nil -// } -//} diff --git a/AIProject/iCoTests/Socket/WebSocketTests.swift b/AIProject/iCoTests/Socket/WebSocketTests.swift deleted file mode 100644 index 29aeae43..00000000 --- a/AIProject/iCoTests/Socket/WebSocketTests.swift +++ /dev/null @@ -1,52 +0,0 @@ -//// -//// WebSocketTests.swift -//// AIProjectTests -//// -//// Created by kangho lee on 8/17/25. -//// -// -//import XCTest -//@testable import iCo -// -//final class WebSocketTests: XCTestCase { -// -// static let echoURL = URL(string: "wss://echo.websocket.org")! -// static let upbitURL = URL(string: "wss://api.upbit.com/websocket/v1")! -// -// func test_connect() async throws { -// var (socket, states) = makeSUT() -// -// Task { -// for await state in socket.state { -// states.append(state) -// } -// } -// -// await socket.connect() -// XCTAssertEqual(states, [.connecting, .connected]) -// } -// -// func test_user_disconnect() async throws { -// var (socket, states) = makeSUT() -// -// Task { -// for await state in socket.state { -// states.append(state) -// } -// } -// -// await socket.connect() -// await socket.close() -// try await Task.sleep(for: .milliseconds(100)) -// XCTAssertEqual(states, [.connecting, .connected, .closed(code: .normalClosure, reason: nil)]) -// } -// -// // MARK: Helper -// -// private func makeSUT(url: URL = upbitURL) -> (SocketEngine, [WebSocket.State]) { -// let socket = BaseWebSocketClient(url: url) -// -// return (socket, [WebSocket.State]()) -// } -// -//} From 32db5f212dedf051e17dc41cb1b581d145f6729b Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sun, 26 Oct 2025 23:45:19 +0900 Subject: [PATCH 22/33] =?UTF-8?q?refactor:=20Reconnect=20=EB=A1=9C?= =?UTF-8?q?=EC=A7=81=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Remote/WebSocket/WebSocketClient.swift | 67 +++++++++---------- .../iCo/Core/Util/Async+BroadCaster.swift | 36 ++++++++++ 2 files changed, 66 insertions(+), 37 deletions(-) create mode 100644 AIProject/iCo/Core/Util/Async+BroadCaster.swift diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index e5dea094..2527ca89 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -3,17 +3,17 @@ import AsyncAlgorithms public final class WebSocketClient: NSObject { /// 소켓 상태 채널 - public var stateChannel: AsyncChannel + private var stateStream: AsyncStream /// 메세지 채널 public var incomingChannel: AsyncChannel + public var stateBroadCaster: AsyncStreamBroadcaster = .init() private let url: URL private let session: URLSession private var task: URLSessionWebSocketTask? private var stateTask: Task? - private var incomingTask: Task? private var receiveTask: Task? /// 핑 전송 task @@ -25,18 +25,17 @@ public final class WebSocketClient: NSObject { self.url = url self.session = session - stateChannel = AsyncChannel() + stateStream = stateBroadCaster.stream() incomingChannel = AsyncChannel() super.init() - - configureTask() + observeState() } /// 채널을 새로 개설하고 소켓을 엽니다. /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. public func connect() async { - //await stateChannel.send(.connecting) + await stateBroadCaster.send(.connecting) self.task = session.webSocketTask(with: url) task?.delegate = self task?.resume() @@ -45,9 +44,13 @@ public final class WebSocketClient: NSObject { do { try await performWithTimeout(sendPing, at: pingTimeout) } catch { - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) } } + + public func disconnect() async { + task?.cancel(with: .normalClosure, reason: nil) + } public func send(text: String) async throws { try await task?.send(.string(text)) @@ -61,7 +64,7 @@ public final class WebSocketClient: NSObject { debugPrint(String(describing: Self.self), #function) task?.cancel() task = nil - stateChannel.finish() + stateBroadCaster.finish() incomingChannel.finish() } } @@ -69,7 +72,7 @@ public final class WebSocketClient: NSObject { // MARK: - Test용 메소드 extension WebSocketClient { public func sendState(with state: WebSocket.State) async { - await stateChannel.send(state) + await stateBroadCaster.send(state) } public func cancel(with code: URLSessionWebSocketTask.CloseCode) { @@ -101,33 +104,29 @@ extension WebSocketClient { } } - private func configureTask() { + private func observeState() { stateTask = Task { - for await state in stateChannel { + for await state in stateStream { switch state { case .connecting: - print("Connecting") + debugPrint("Connecting") continue case .connected: - print("Connected") + debugPrint("Connected") receive() checkingAlive() case .failed, .closed: + debugPrint("Closed") release() case .reconnecting: - print("Reconnecting") + debugPrint("Reconnecting") await reconnect() } } } - - incomingTask = Task { - for await value in incomingChannel { - print(value) - } - } } + // FIXME: 개선이 필요한지 한 번 더 생각해보기 private func receive() { receiveTask?.cancel() @@ -155,33 +154,32 @@ extension WebSocketClient { } catch is CancellationError { debugPrint("작업이 취소되었습니다.") } catch { - // TODO: 정말 필요한 코드일까? - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) } } } private func handleDisconnected(_ userClose: Bool) async { if userClose { - await stateChannel.send(.closed) + await stateBroadCaster.send(.closed) } else { - await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) + await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) } } private func reconnect() async { + guard task?.state != .running else { + return + } + await connect() } private func release() { - stateTask?.cancel() - stateTask = nil receiveTask?.cancel() receiveTask = nil healthCheck?.cancel() healthCheck = nil - incomingTask?.cancel() - incomingTask = nil if task?.state == .running { task?.cancel(with: .goingAway, reason: nil) @@ -194,25 +192,20 @@ extension WebSocketClient { // MARK: 웹 소켓 Delegate로 소켓 응답 및 종료 event를 받아 처리합니다. extension WebSocketClient: URLSessionWebSocketDelegate { public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) { - debugPrint("didOpen") - - Task { await stateChannel.send(.connected) } + Task { await stateBroadCaster.send(.connected) } } // 웹소켓으로부터 Close Code를 받았을 때. (정상 종료로 닫혔을 때) public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) { - debugPrint("didClose") - Task { await handleDisconnected(closeCode == .normalClosure) } } // 세션 레벨에서 작업이 완전히 종료됐을 때. // 1. 네트워크 닫힘, 2. 에러로 종료, 3. 정상적으로 완료 public func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: (any Error)?) { - debugPrint("didCompleteWithError") - - if let error { - Task { await stateChannel.send(.reconnecting(nextAttempsIn: .seconds(2))) } + if let _ = error { + Task { await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) } } } } + diff --git a/AIProject/iCo/Core/Util/Async+BroadCaster.swift b/AIProject/iCo/Core/Util/Async+BroadCaster.swift new file mode 100644 index 00000000..a651ddbf --- /dev/null +++ b/AIProject/iCo/Core/Util/Async+BroadCaster.swift @@ -0,0 +1,36 @@ +// +// Async+BroadCaster.swift +// iCo +// +// Created by 강대훈 on 10/26/25. +// + +@globalActor +actor BroadCaster { + static let shared = BroadCaster() + + private init() {} +} + +public class AsyncStreamBroadcaster { + private var continuations: [AsyncStream.Continuation] = [] + + public func stream() -> AsyncStream { + AsyncStream { continuation in + continuations.append(continuation) + } + } + + @BroadCaster + public func send(_ element: Element) async { + for c in continuations { + c.yield(element) + } + } + + public func finish() { + for c in continuations { + c.finish() + } + } +} From b4c044e369571755bf377f24db9f02aa33f55020 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sun, 26 Oct 2025 23:46:10 +0900 Subject: [PATCH 23/33] =?UTF-8?q?feature:=20=EC=BD=94=EC=9D=B8=20=EC=9E=AC?= =?UTF-8?q?=EA=B5=AC=EB=8F=85=20=EB=A1=9C=EC=A7=81=20=EC=B6=94=EA=B0=80=20?= =?UTF-8?q?=EB=B0=8F=20=EB=A6=AC=ED=8C=A9=ED=86=A0=EB=A7=81=ED=95=9C=20?= =?UTF-8?q?=EC=9B=B9=EC=86=8C=EC=BC=93=20=EA=B0=9D=EC=B2=B4=EB=A1=9C=20?= =?UTF-8?q?=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Data/API/Upbit/UpbitTickerService.swift | 21 ++++++++------ .../Interface/RealTimeTickerProvider.swift | 1 + .../iCo/Features/Market/MarketStore.swift | 28 ++++++++++++------- .../iCo/Features/Market/MarketView.swift | 2 +- 4 files changed, 33 insertions(+), 19 deletions(-) diff --git a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift index 0dc07747..2bcc2abe 100644 --- a/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift +++ b/AIProject/iCo/Data/API/Upbit/UpbitTickerService.swift @@ -9,12 +9,12 @@ import Foundation /// 업비트 실시간 코인 시세 웹소켓 서비스 final class UpbitTickerService: RealTimeTickerProvider { - private let client: BaseWebSocketClient + private let client: WebSocketClient /// 소켓 상태 stream private var stateStreamTask: Task? - init(client: BaseWebSocketClient = BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) { + init(client: WebSocketClient = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) { self.client = client } @@ -23,7 +23,7 @@ final class UpbitTickerService: RealTimeTickerProvider { } func disconnect() async { - await client.close() + await client.disconnect() } /// 업비트의 코인 시세 stream을 가져와 디코딩하여 Model로 만들고 forwarding @@ -32,21 +32,25 @@ final class UpbitTickerService: RealTimeTickerProvider { AsyncStream { continuation in Task { for await message in client.incomingChannel { - switch message { - case .success(let data): + case .data(let data): if let ticker = mapTicker(data) { continuation.yield(ticker) } - case .failure(let error): - debugPrint(error) + case .string(let string): + print(string) } } + continuation.finish() } } } + func subscribeStateStream() -> AsyncStream { + client.stateBroadCaster.stream() + } + /// 티켓과 코인 ID를 가지고 업비트에 코인 시세를 구독합니다. /// - Parameters: /// - ticket: 티켓 iD @@ -56,7 +60,7 @@ final class UpbitTickerService: RealTimeTickerProvider { do { let ticketData = try JSONEncoder().encode(SubscribeRequest.ticker(ticket: ticket, codes: coins)) - try await client.send(ticketData) + try await client.send(data: ticketData) } catch { debugPrint(error) } @@ -76,3 +80,4 @@ final class UpbitTickerService: RealTimeTickerProvider { } } } + diff --git a/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift b/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift index fff20dea..719a986d 100644 --- a/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift +++ b/AIProject/iCo/Domain/Interface/RealTimeTickerProvider.swift @@ -11,5 +11,6 @@ protocol RealTimeTickerProvider { func connect() async func disconnect() async func subscribeTickerStream() -> AsyncStream + func subscribeStateStream() -> AsyncStream func sendTicket(ticket: String, coins: [CoinListModel.ID]) async } diff --git a/AIProject/iCo/Features/Market/MarketStore.swift b/AIProject/iCo/Features/Market/MarketStore.swift index f4587d94..b2e504b7 100644 --- a/AIProject/iCo/Features/Market/MarketStore.swift +++ b/AIProject/iCo/Features/Market/MarketStore.swift @@ -113,6 +113,8 @@ class MarketStore { init(coinService: UpBitAPIService, tickerService: RealTimeTickerProvider) { self.coinService = coinService self.tickerService = tickerService + + Task { await observeState() } } } @@ -275,14 +277,6 @@ extension MarketStore { // service 연결 await tickerService.connect() - if !subscriptionSnapshot.isEmpty { - await sendTicket(subscriptionSnapshot) - } - - // 시세가 - self.tickerStreamTask = Task { - await consume() - } } /// 서비스 연결 해제 @@ -306,7 +300,6 @@ extension MarketStore { private func ticketStream() async { let stream = visibleCoinsChannel .filter { !$0.isEmpty } - .removeDuplicates() .debounce(for: .milliseconds(300)) for await visibleCoin in stream { self.subscriptionSnapshot = visibleCoin @@ -326,7 +319,22 @@ extension MarketStore { store.apply(ticker) } - + private func observeState() async { + for await state in tickerService.subscribeStateStream() { + // TODO: 여러 번 호출되는 이유 찾기 + print(#function, state) + if case .connected = state { + if !subscriptionSnapshot.isEmpty { + await sendTicket(subscriptionSnapshot) + } + + // 시세가 + self.tickerStreamTask = Task { + await consume() + } + } + } + } } extension MarketStore { diff --git a/AIProject/iCo/Features/Market/MarketView.swift b/AIProject/iCo/Features/Market/MarketView.swift index e6123a74..2b61b38e 100644 --- a/AIProject/iCo/Features/Market/MarketView.swift +++ b/AIProject/iCo/Features/Market/MarketView.swift @@ -124,6 +124,6 @@ extension MarketView { #Preview { MarketView( coinService: UpBitAPIService(), - tickerService: UpbitTickerService(client: BaseWebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) + tickerService: UpbitTickerService(client: WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!)) ) } From c78ea8635206556cf690aecffc59e76e980bc9d8 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Sun, 26 Oct 2025 23:46:30 +0900 Subject: [PATCH 24/33] =?UTF-8?q?chore:=20=EC=A3=BC=EC=84=9D=20=EC=A0=9C?= =?UTF-8?q?=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCoTests/Socket/WebSocketTest.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/AIProject/iCoTests/Socket/WebSocketTest.swift b/AIProject/iCoTests/Socket/WebSocketTest.swift index 7ac32943..873fe46c 100644 --- a/AIProject/iCoTests/Socket/WebSocketTest.swift +++ b/AIProject/iCoTests/Socket/WebSocketTest.swift @@ -50,8 +50,7 @@ final class WebSocketTest: XCTestCase { try await Task.sleep(for: .seconds(3)) sut.cancel() - - // TODO: Connected를 기다리지 않으면 아래 Send를 해도 데이터가 들어오지 않음. + try await Task.sleep(for: .seconds(3)) try await sut.send(text: requestFormat) try await Task.sleep(for: .seconds(10)) From 70c5ffde58782c006586f5f407530dd601eafd53 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Mon, 27 Oct 2025 00:45:21 +0900 Subject: [PATCH 25/33] =?UTF-8?q?chore:=20=EC=9B=B9=EC=86=8C=EC=BC=93=20Do?= =?UTF-8?q?cC=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Remote/WebSocket/WebSocketClient.swift | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 2527ca89..4c31fdfb 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -4,10 +4,10 @@ import AsyncAlgorithms public final class WebSocketClient: NSObject { /// 소켓 상태 채널 private var stateStream: AsyncStream - + /// WebSocket의 상태 변화를 여러 Consumer에게 동시에 전달하는 브로드캐스터 + public var stateBroadCaster: AsyncStreamBroadcaster = .init() /// 메세지 채널 public var incomingChannel: AsyncChannel - public var stateBroadCaster: AsyncStreamBroadcaster = .init() private let url: URL private let session: URLSession @@ -32,8 +32,7 @@ public final class WebSocketClient: NSObject { observeState() } - /// 채널을 새로 개설하고 소켓을 엽니다. - /// 핑을 보내는 이유는 연결된 상태를 확정적으로 기다리기 위해서입니다. + /// 웹소켓 세션을 연결하고 작업을 생성합니다. public func connect() async { await stateBroadCaster.send(.connecting) self.task = session.webSocketTask(with: url) @@ -48,14 +47,19 @@ public final class WebSocketClient: NSObject { } } + /// 명시적으로 현재 WebSocket 연결을 정상적으로 종료합니다. + /// + /// 이 메서드는 서버와의 WebSocket 연결을 `normalClosure` 코드로 닫습니다. public func disconnect() async { task?.cancel(with: .normalClosure, reason: nil) } + /// 텍스트 형태의 메시지를 WebSocket 서버로 전송합니다. public func send(text: String) async throws { try await task?.send(.string(text)) } + /// 바이너리(Data) 형태의 메시지를 WebSocket 서버로 전송합니다. public func send(data: Data) async throws { try await task?.send(.data(data)) } @@ -70,6 +74,7 @@ public final class WebSocketClient: NSObject { } // MARK: - Test용 메소드 +// TODO: Deprecated 예정입니다. extension WebSocketClient { public func sendState(with state: WebSocket.State) async { await stateBroadCaster.send(state) @@ -88,6 +93,7 @@ extension WebSocketClient { // MARK: - Private extension WebSocketClient { + /// 서버로 Ping 프레임을 전송하여 연결 상태를 확인합니다. private func sendPing() async throws { return try await withCheckedThrowingContinuation { continuation in task?.sendPing { error in @@ -104,6 +110,7 @@ extension WebSocketClient { } } + /// WebSocket의 상태 변화를 관찰하고 각 상태에 맞는 동작을 수행합니다. private func observeState() { stateTask = Task { for await state in stateStream { @@ -127,6 +134,7 @@ extension WebSocketClient { } // FIXME: 개선이 필요한지 한 번 더 생각해보기 + /// 서버로부터 WebSocket 메시지를 지속적으로 수신합니다. private func receive() { receiveTask?.cancel() @@ -142,6 +150,7 @@ extension WebSocketClient { } } + /// 주기적으로 Ping을 전송하여 WebSocket 연결 상태를 점검합니다. private func checkingAlive() { healthCheck?.cancel() @@ -159,6 +168,7 @@ extension WebSocketClient { } } + /// WebSocket 연결 종료 시 상태를 처리합니다. private func handleDisconnected(_ userClose: Bool) async { if userClose { await stateBroadCaster.send(.closed) @@ -167,6 +177,7 @@ extension WebSocketClient { } } + /// WebSocket 재연결을 시도합니다. private func reconnect() async { guard task?.state != .running else { return @@ -175,6 +186,7 @@ extension WebSocketClient { await connect() } + /// WebSocket 클라이언트의 모든 비동기 작업과 연결을 종료하고 리소스를 정리합니다. private func release() { receiveTask?.cancel() receiveTask = nil From f548137a871c64cf6198fac777604a358579138a Mon Sep 17 00:00:00 2001 From: kangho Date: Mon, 27 Oct 2025 11:42:47 +0900 Subject: [PATCH 26/33] =?UTF-8?q?comment:=20timeout,=20broadcaster=20?= =?UTF-8?q?=EC=A3=BC=EC=84=9D=20=EC=9E=91=EC=97=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Util/Async+BroadCaster.swift | 12 ++++++++++++ AIProject/iCo/Core/Util/Async+Timeout.swift | 11 +++++++++++ 2 files changed, 23 insertions(+) diff --git a/AIProject/iCo/Core/Util/Async+BroadCaster.swift b/AIProject/iCo/Core/Util/Async+BroadCaster.swift index a651ddbf..beaff81d 100644 --- a/AIProject/iCo/Core/Util/Async+BroadCaster.swift +++ b/AIProject/iCo/Core/Util/Async+BroadCaster.swift @@ -12,15 +12,25 @@ actor BroadCaster { private init() {} } + +/// AsyncStream의 다중 소비를 위해 만든 객체로 스트림 전파 수행 public class AsyncStreamBroadcaster { + + /// 구독할 continuation 값들 private var continuations: [AsyncStream.Continuation] = [] + + /// 구독 메서드로 stream을 반환 + /// - Returns: stream 반환 public func stream() -> AsyncStream { AsyncStream { continuation in continuations.append(continuation) } } + + /// 전파할 메세지를 전송하고 구독자들에게 전파 + /// - Parameter element: 메세지 @BroadCaster public func send(_ element: Element) async { for c in continuations { @@ -28,6 +38,8 @@ public class AsyncStreamBroadcaster { } } + + /// 구독을 해제하고 스트림을 종료함 public func finish() { for c in continuations { c.finish() diff --git a/AIProject/iCo/Core/Util/Async+Timeout.swift b/AIProject/iCo/Core/Util/Async+Timeout.swift index 2d7efb2b..09670e2a 100644 --- a/AIProject/iCo/Core/Util/Async+Timeout.swift +++ b/AIProject/iCo/Core/Util/Async+Timeout.swift @@ -7,6 +7,10 @@ import Foundation + +/// 두 클로저 중 먼저 완료되는 클로저만 실행하고 나머지는 취소 +/// - Parameters: +/// - Returns: 먼저 완료되는 클로저 실행 public func race( _ lhs: sending @escaping () async throws -> T, _ rhs: sending @escaping () async throws -> T @@ -21,6 +25,13 @@ public func race( } } + +/// async 작업에 timeout을 줌 +/// - Parameters: +/// - action: 수행할 비동기 클로저 +/// - timeout: 타임아웃 duration +/// - Throws: 타임아웃 에러 또는 작업에서 발생한 에러 +/// - Returns: 비동기 작업 결과값 public func performWithTimeout( _ action: sending @escaping () async throws -> T, at timeout: Duration From d80989bae2cd4bb8275cfa4c13d7858987dfb519 Mon Sep 17 00:00:00 2001 From: kangho Date: Mon, 27 Oct 2025 11:43:01 +0900 Subject: [PATCH 27/33] =?UTF-8?q?fix:=20boradcaster=20=ED=95=B4=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 4c31fdfb..001e76c1 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -70,6 +70,7 @@ public final class WebSocketClient: NSObject { task = nil stateBroadCaster.finish() incomingChannel.finish() + stateBroadCaster.finish() } } From a0e38484b560681c4323e3de691d0f0a3371e0a8 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Mon, 27 Oct 2025 17:41:35 +0900 Subject: [PATCH 28/33] =?UTF-8?q?chore:=20=EC=A4=91=EB=B3=B5=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 001e76c1..4c31fdfb 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -70,7 +70,6 @@ public final class WebSocketClient: NSObject { task = nil stateBroadCaster.finish() incomingChannel.finish() - stateBroadCaster.finish() } } From c9a38cf8a0f8b9354adb3ff5ca5879e14092885f Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Mon, 27 Oct 2025 17:42:09 +0900 Subject: [PATCH 29/33] =?UTF-8?q?fix:=20=EC=9E=AC=EC=97=B0=EA=B2=B0=20?= =?UTF-8?q?=EC=83=81=ED=83=9C=EA=B0=80=202=EB=B2=88=20=ED=98=B8=EC=B6=9C?= =?UTF-8?q?=EB=90=98=EB=8A=94=20=EB=AC=B8=EC=A0=9C=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index 4c31fdfb..ff1cd506 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -40,11 +40,7 @@ public final class WebSocketClient: NSObject { task?.resume() // 핑 응답은 연결 후에 오기 때문에 connected 시점을 캐치할 수 있음 - do { - try await performWithTimeout(sendPing, at: pingTimeout) - } catch { - await stateBroadCaster.send(.reconnecting(nextAttempsIn: .seconds(2))) - } + try? await performWithTimeout(sendPing, at: pingTimeout) } /// 명시적으로 현재 WebSocket 연결을 정상적으로 종료합니다. From a79bac0016e149e0831c71a66fa986a9748f3207 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Mon, 27 Oct 2025 17:45:45 +0900 Subject: [PATCH 30/33] =?UTF-8?q?feat:=20=EC=9E=AC=EC=97=B0=EA=B2=B0=20?= =?UTF-8?q?=EB=A9=94=EC=86=8C=EB=93=9C=20=EC=9E=84=EC=8B=9C=20=EC=93=B0?= =?UTF-8?q?=EB=A1=9C=ED=8B=80=20=EC=A0=81=EC=9A=A9=20(=EA=B0=9C=EC=84=A0?= =?UTF-8?q?=20=EC=98=88=EC=A0=95)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift | 3 ++- AIProject/iCo/Features/Market/MarketStore.swift | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift index ff1cd506..3721d207 100644 --- a/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift +++ b/AIProject/iCo/Core/Remote/WebSocket/WebSocketClient.swift @@ -141,7 +141,7 @@ extension WebSocketClient { await incomingChannel.send(message) receive() } catch { - print("Receive Error!!!!!") + print("종료되어 더 이상 웹소켓 데이터를 받지 않습니다.") } } } @@ -179,6 +179,7 @@ extension WebSocketClient { return } + try? await Task.sleep(for: .seconds(2)) await connect() } diff --git a/AIProject/iCo/Features/Market/MarketStore.swift b/AIProject/iCo/Features/Market/MarketStore.swift index b2e504b7..e98c30f3 100644 --- a/AIProject/iCo/Features/Market/MarketStore.swift +++ b/AIProject/iCo/Features/Market/MarketStore.swift @@ -322,7 +322,6 @@ extension MarketStore { private func observeState() async { for await state in tickerService.subscribeStateStream() { // TODO: 여러 번 호출되는 이유 찾기 - print(#function, state) if case .connected = state { if !subscriptionSnapshot.isEmpty { await sendTicket(subscriptionSnapshot) From 55c25436ac715eb2390c8eabf11fb2ccd91fa65d Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Tue, 28 Oct 2025 10:27:37 +0900 Subject: [PATCH 31/33] =?UTF-8?q?fix:=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20?= =?UTF-8?q?=EC=BD=94=EB=93=9C=20=EC=BB=B4=ED=8C=8C=EC=9D=BC=20=EC=97=90?= =?UTF-8?q?=EB=9F=AC=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AIProject/iCoTests/Socket/WebSocketTest.swift | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/AIProject/iCoTests/Socket/WebSocketTest.swift b/AIProject/iCoTests/Socket/WebSocketTest.swift index 873fe46c..005f60af 100644 --- a/AIProject/iCoTests/Socket/WebSocketTest.swift +++ b/AIProject/iCoTests/Socket/WebSocketTest.swift @@ -19,24 +19,19 @@ final class WebSocketTest: XCTestCase { } func testExample() async throws { - let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) - + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) await sut.connect() - - try await sut.send(text: "hi") + try await sut.send(text: "[{ticket:test},{type:ticker,codes:[KRW-BTC]}]") } func testReconnenctWhenAbnormalClose() async throws { - let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) await sut.connect() try await Task.sleep(for: .seconds(2)) - sut.cancel(with: .abnormalClosure) - try await Task.sleep(for: .seconds(2)) - // connecting -> connected -> abnormal close -> handleDisconnect -> reconnect // .... -> abnormal close -> didCompletWithError -> reconnect } @@ -57,13 +52,13 @@ final class WebSocketTest: XCTestCase { } func testUserClose() async throws { - let sut = WebSocketClient(url: URL(string: "wss://echo.websocket.org")!) - + let sut = WebSocketClient(url: URL(string: "wss://api.upbit.com/websocket/v1")!) await sut.connect() - try await Task.sleep(for: .seconds(2)) - sut.cancel() + await sut.disconnect() try await Task.sleep(for: .seconds(2)) + await sut.connect() + try await Task.sleep(for: .seconds(10)) } func testPerformanceExample() throws { From 83723f15a55714b2c6771f4832e65aa8327c3b6b Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Thu, 30 Oct 2025 20:03:35 +0900 Subject: [PATCH 32/33] =?UTF-8?q?fix:=20=EB=B8=8C=EB=A1=9C=EB=93=9C?= =?UTF-8?q?=EC=BA=90=EC=8A=A4=ED=84=B0=20=EB=A9=94=EB=AA=A8=EB=A6=AC=20?= =?UTF-8?q?=EB=88=84=EC=88=98=20=ED=95=B4=EA=B2=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iCo/Core/Util/Async+BroadCaster.swift | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/AIProject/iCo/Core/Util/Async+BroadCaster.swift b/AIProject/iCo/Core/Util/Async+BroadCaster.swift index beaff81d..4b041646 100644 --- a/AIProject/iCo/Core/Util/Async+BroadCaster.swift +++ b/AIProject/iCo/Core/Util/Async+BroadCaster.swift @@ -5,6 +5,8 @@ // Created by 강대훈 on 10/26/25. // +import Foundation + @globalActor actor BroadCaster { static let shared = BroadCaster() @@ -12,19 +14,24 @@ actor BroadCaster { private init() {} } - /// AsyncStream의 다중 소비를 위해 만든 객체로 스트림 전파 수행 public class AsyncStreamBroadcaster { /// 구독할 continuation 값들 - private var continuations: [AsyncStream.Continuation] = [] - + private var continuations: [UUID: AsyncStream.Continuation] = [:] /// 구독 메서드로 stream을 반환 /// - Returns: stream 반환 public func stream() -> AsyncStream { - AsyncStream { continuation in - continuations.append(continuation) + let id = UUID() + + return AsyncStream { continuation in + continuation.onTermination = { [weak self] _ in + guard let self = self else { return } + self.finish(id: id) + } + + continuations[id] = continuation } } @@ -33,7 +40,7 @@ public class AsyncStreamBroadcaster { /// - Parameter element: 메세지 @BroadCaster public func send(_ element: Element) async { - for c in continuations { + for (_, c) in continuations { c.yield(element) } } @@ -41,8 +48,12 @@ public class AsyncStreamBroadcaster { /// 구독을 해제하고 스트림을 종료함 public func finish() { - for c in continuations { + for (_, c) in continuations { c.finish() } } + + private func finish(id: UUID) { + continuations[id] = nil + } } From dde0bdfab0b83302ff231d664106dd134a1d1705 Mon Sep 17 00:00:00 2001 From: kanghun1121 Date: Thu, 30 Oct 2025 20:04:11 +0900 Subject: [PATCH 33/33] =?UTF-8?q?refactor:=20MarketStore=20Observable=20->?= =?UTF-8?q?=20ObservableObject=EB=A1=9C=20=EB=B3=80=EA=B2=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Market/CoinList/CoinListView.swift | 2 +- .../iCo/Features/Market/MarketStore.swift | 29 ++++++++++--------- .../iCo/Features/Market/MarketView.swift | 4 +-- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/AIProject/iCo/Features/Market/CoinList/CoinListView.swift b/AIProject/iCo/Features/Market/CoinList/CoinListView.swift index bb712c39..678a84c1 100644 --- a/AIProject/iCo/Features/Market/CoinList/CoinListView.swift +++ b/AIProject/iCo/Features/Market/CoinList/CoinListView.swift @@ -9,7 +9,7 @@ import SwiftUI import AsyncAlgorithms struct CoinListView: View { - @Bindable var store: MarketStore + @ObservedObject var store: MarketStore @State private var visibleCoins: Set = [] @State private var isActive: Bool = false diff --git a/AIProject/iCo/Features/Market/MarketStore.swift b/AIProject/iCo/Features/Market/MarketStore.swift index e98c30f3..bb8597f8 100644 --- a/AIProject/iCo/Features/Market/MarketStore.swift +++ b/AIProject/iCo/Features/Market/MarketStore.swift @@ -16,8 +16,7 @@ enum CoinFilter: Int, Equatable { /// 마켓 이벤트 처리를 담당 /// 검색 / 시세 정보 / 웹소켓 상태 / 정렬 / 필터링 이벤트 처리 @MainActor -@Observable -class MarketStore { +final class MarketStore: ObservableObject { /// 최초 한 번만 로드하기 위한 flag private var hasLoaded = false @@ -28,11 +27,12 @@ class MarketStore { private let coinService: UpBitAPIService private let tickerService: RealTimeTickerProvider private let searchRecordManager: SearchRecordManaging = SearchRecordManager() + private var stateTask: Task? private(set) var errorMessage: String? /// 변동성이 적은 메타 정보 - private(set) var coinMeta: [CoinID: Coin] = [:] + @Published private(set) var coinMeta: [CoinID: Coin] = [:] /// 변동성이 큰 시세 정보 private var ticker: [CoinID: TickerStore] = [:] @@ -320,16 +320,19 @@ extension MarketStore { } private func observeState() async { - for await state in tickerService.subscribeStateStream() { - // TODO: 여러 번 호출되는 이유 찾기 - if case .connected = state { - if !subscriptionSnapshot.isEmpty { - await sendTicket(subscriptionSnapshot) - } - - // 시세가 - self.tickerStreamTask = Task { - await consume() + stateTask?.cancel() + + stateTask = Task { + for await state in tickerService.subscribeStateStream() { + if case .connected = state { + if !subscriptionSnapshot.isEmpty { + await sendTicket(subscriptionSnapshot) + } + + // 시세가 + self.tickerStreamTask = Task { + await consume() + } } } } diff --git a/AIProject/iCo/Features/Market/MarketView.swift b/AIProject/iCo/Features/Market/MarketView.swift index 2b61b38e..277091a5 100644 --- a/AIProject/iCo/Features/Market/MarketView.swift +++ b/AIProject/iCo/Features/Market/MarketView.swift @@ -9,7 +9,7 @@ import SwiftUI struct MarketView: View { - @State var store: MarketStore + @StateObject var store: MarketStore @State private var searchText: String = "" @State private var selectedCoinID: CoinID? @@ -25,7 +25,7 @@ struct MarketView: View { @State private var showNewBadge: Bool = false init(coinService: UpBitAPIService, tickerService: RealTimeTickerProvider) { - store = MarketStore(coinService: coinService, tickerService: tickerService) + _store = StateObject(wrappedValue: MarketStore(coinService: coinService, tickerService: tickerService)) } var body: some View {