import Foundation import WebSocketKit import Vapor import Clairvoyant final class DeviceManager { /// The connection to the device private var connection: WebSocket? /// The authentication token of the device for the socket connection private let deviceKey: Data /// The authentication token of the remote private let remoteKey: Data /// Indicate that the socket is fully initialized with an authorized device var deviceIsAuthenticated = false private var isOpeningNewConnection = false private let deviceTimeout: Int64 private let deviceConnectedMetric: Metric private let messagesToDeviceMetric: Metric /// Indicator for device availability var deviceIsConnected: Bool { deviceIsAuthenticated && !(connection?.isClosed ?? true) } /// A promise to finish the request once the device responds or times out private var requestInProgress: EventLoopPromise? init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) async { self.deviceKey = deviceKey self.remoteKey = remoteKey self.deviceTimeout = deviceTimeout self.deviceConnectedMetric = try! await .init( "sesame.connected", name: "Device connected", description: "Shows if the device is connected via WebSocket") self.messagesToDeviceMetric = try! await .init( "sesame.messages", name: "Forwarded Messages", description: "The number of messages transmitted to the device") } private func updateDeviceConnectionMetric() { Task { try? await deviceConnectedMetric.update(deviceIsConnected) } } private func updateMessageCountMetric() { Task { let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0 try? await messagesToDeviceMetric.update(lastValue + 1) } } // MARK: API var deviceStatus: String { deviceIsConnected ? "1" : "0" } func sendMessageToDevice(_ message: Message, on eventLoop: EventLoop) -> EventLoopFuture { guard let socket = connection, !socket.isClosed else { connection = nil return eventLoop.makeSucceededFuture(.deviceNotConnected) } guard requestInProgress == nil else { return eventLoop.makeSucceededFuture(.operationInProgress) } let result = eventLoop.makePromise(of: DeviceResponse.self) self.requestInProgress = result socket.send(message.bytes, promise: nil) updateMessageCountMetric() eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in guard let promise = self?.requestInProgress else { return } self?.requestInProgress = nil promise.succeed(.deviceTimedOut) } return result.futureResult } func authenticateDevice(hash: String) { defer { updateDeviceConnectionMetric() } guard let key = Data(fromHexEncodedString: hash), SHA256.hash(data: key) == self.deviceKey else { log("Invalid device key") _ = connection?.close() deviceIsAuthenticated = false return } log("Device authenticated") deviceIsAuthenticated = true } func authenticateRemote(_ token: Data) -> Bool { let hash = SHA256.hash(data: token) return hash == remoteKey } func processDeviceResponse(_ data: ByteBuffer) { guard let promise = requestInProgress else { return } defer { requestInProgress = nil } promise.succeed(DeviceResponse(data, request: RouteAPI.socket.rawValue) ?? .unexpectedSocketEvent) } func didCloseDeviceSocket() { defer { updateDeviceConnectionMetric() } guard !isOpeningNewConnection else { return } deviceIsAuthenticated = false guard connection != nil else { log("Socket closed, but no connection anyway") return } connection = nil log("Socket closed") } func removeDeviceConnection() { defer { updateDeviceConnectionMetric() } deviceIsAuthenticated = false guard let socket = connection else { return } try? socket.close().wait() connection = nil log("Removed device connection") } func createNewDeviceConnection(_ socket: WebSocket) { defer { updateDeviceConnectionMetric() } isOpeningNewConnection = true removeDeviceConnection() connection = socket log("Socket connected") isOpeningNewConnection = false } }