import Foundation import WebSocketKit import Vapor import Clairvoyant final class DeviceManager { /// The connection to the device private var connection: WebSocket? /// The authentication token of the device for the socket connection private let deviceKey: Data /// The authentication token of the remote private let remoteKey: Data private let deviceTimeout: Int64 private let deviceConnectedMetric: Metric private let messagesToDeviceMetric: Metric var deviceIsConnected: Bool { guard let connection, !connection.isClosed else { return false } return true } /// A promise to finish the request once the device responds or times out private var requestInProgress: CheckedContinuation? init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) { self.deviceKey = deviceKey self.remoteKey = remoteKey self.deviceTimeout = deviceTimeout self.deviceConnectedMetric = .init( "sesame.connected", name: "Device connection", description: "Shows if the device is connected via WebSocket") self.messagesToDeviceMetric = .init( "sesame.messages", name: "Forwarded Messages", description: "The number of messages transmitted to the device") } func updateDeviceConnectionMetric() async { _ = try? await deviceConnectedMetric.update(deviceIsConnected) } private func updateMessageCountMetric() async { let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0 _ = try? await messagesToDeviceMetric.update(lastValue + 1) } // MARK: API func sendMessageToDevice(_ message: Data, on eventLoop: EventLoop) async throws -> Data { guard let socket = connection, !socket.isClosed else { connection = nil throw MessageResult.deviceNotConnected } guard requestInProgress == nil else { throw MessageResult.operationInProgress } do { try await socket.send(Array(message)) await updateMessageCountMetric() } catch { throw MessageResult.deviceNotConnected } startTimeoutForDeviceRequest(on: eventLoop) let result: Data = try await withCheckedThrowingContinuation { continuation in self.requestInProgress = continuation } return result } private func startTimeoutForDeviceRequest(on eventLoop: EventLoop) { eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in self?.resumeDeviceRequest(with: .deviceTimedOut) } } private func resumeDeviceRequest(with data: Data) { requestInProgress?.resume(returning: data) requestInProgress = nil } private func resumeDeviceRequest(with result: MessageResult) { requestInProgress?.resume(throwing: result) requestInProgress = nil } func authenticateRemote(_ token: Data) -> Bool { let hash = SHA256.hash(data: token) return hash == remoteKey } func processDeviceResponse(_ buffer: ByteBuffer) { guard let data = buffer.getData(at: 0, length: buffer.readableBytes) else { log("Failed to get data buffer received from device") self.resumeDeviceRequest(with: .invalidDeviceResponse) return } self.resumeDeviceRequest(with: data) } func didCloseDeviceSocket() { connection = nil } func removeDeviceConnection() async { try? await connection?.close() connection = nil await updateDeviceConnectionMetric() } func createNewDeviceConnection(socket: WebSocket, auth: String) async { guard let key = Data(fromHexEncodedString: auth), SHA256.hash(data: key) == self.deviceKey else { log("Invalid device key") return } await removeDeviceConnection() connection = socket socket.eventLoop.execute { socket.pingInterval = .seconds(10) socket.onBinary { [weak self] _, data in self?.processDeviceResponse(data) } socket.onClose.whenComplete { [weak self] _ in self?.didCloseDeviceSocket() } } await updateDeviceConnectionMetric() } }