Update logic to async

This commit is contained in:
Christoph Hagen 2023-11-10 13:45:42 +01:00
parent b8c7256b9d
commit 9f20563877
4 changed files with 119 additions and 102 deletions

View File

@ -3,7 +3,7 @@ import Foundation
/** /**
A result from sending a key to the device. A result from sending a key to the device.
*/ */
enum MessageResult: UInt8 { enum MessageResult: UInt8, Error {
/// Text content was received, although binary data was expected /// Text content was received, although binary data was expected
case textReceived = 1 case textReceived = 1
@ -48,6 +48,8 @@ enum MessageResult: UInt8 {
case invalidUrlParameter = 20 case invalidUrlParameter = 20
case invalidResponseAuthentication = 21 case invalidResponseAuthentication = 21
case invalidDeviceResponse = 22
} }
extension MessageResult: CustomStringConvertible { extension MessageResult: CustomStringConvertible {
@ -84,6 +86,8 @@ extension MessageResult: CustomStringConvertible {
return "The url parameter could not be found" return "The url parameter could not be found"
case .invalidResponseAuthentication: case .invalidResponseAuthentication:
return "The response could not be authenticated" return "The response could not be authenticated"
case .invalidDeviceResponse:
return "The device responded with invalid data"
} }
} }
} }

View File

@ -3,6 +3,18 @@ import WebSocketKit
import Vapor import Vapor
import Clairvoyant import Clairvoyant
enum DeviceState: UInt8 {
case disconnected = 0
case connected = 1
case authenticated = 2
}
extension DeviceState: MetricValue {
static let valueType: MetricType = .customType(named: "DeviceState")
}
final class DeviceManager { final class DeviceManager {
/// The connection to the device /// The connection to the device
@ -17,30 +29,38 @@ final class DeviceManager {
/// Indicate that the socket is fully initialized with an authorized device /// Indicate that the socket is fully initialized with an authorized device
private var deviceIsAuthenticated = false private var deviceIsAuthenticated = false
private var isOpeningNewConnection = false
private let deviceTimeout: Int64 private let deviceTimeout: Int64
private let deviceConnectedMetric: Metric<Bool> private let deviceStateMetric: Metric<DeviceState>
private let messagesToDeviceMetric: Metric<Int> private let messagesToDeviceMetric: Metric<Int>
private let scheduler: AsyncScheduler private let scheduler: AsyncScheduler
var deviceState: DeviceState {
guard let connection, !connection.isClosed else {
return .disconnected
}
guard deviceIsAuthenticated else {
return .connected
}
return .authenticated
}
/// Indicator for device availability /// Indicator for device availability
var deviceIsConnected: Bool { var deviceIsConnected: Bool {
deviceIsAuthenticated && !(connection?.isClosed ?? true) deviceIsAuthenticated && !(connection?.isClosed ?? true)
} }
/// A promise to finish the request once the device responds or times out /// A promise to finish the request once the device responds or times out
private var requestInProgress: EventLoopPromise<Data>? private var requestInProgress: CheckedContinuation<Data, Error>?
init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64, scheduler: AsyncScheduler) { init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64, scheduler: AsyncScheduler) {
self.deviceKey = deviceKey self.deviceKey = deviceKey
self.remoteKey = remoteKey self.remoteKey = remoteKey
self.deviceTimeout = deviceTimeout self.deviceTimeout = deviceTimeout
self.deviceConnectedMetric = .init( self.deviceStateMetric = .init(
"sesame.connected", "sesame.device",
name: "Device connected", name: "Device connected",
description: "Shows if the device is connected via WebSocket") description: "Shows if the device is connected via WebSocket")
self.messagesToDeviceMetric = .init( self.messagesToDeviceMetric = .init(
@ -50,61 +70,72 @@ final class DeviceManager {
self.scheduler = scheduler self.scheduler = scheduler
} }
private func updateDeviceConnectionMetric() { private func updateDeviceConnectionMetric() async {
scheduler.schedule { [weak self] in _ = try? await deviceStateMetric.update(deviceState)
guard let self else { return }
_ = try? await deviceConnectedMetric.update(deviceIsConnected)
}
} }
private func updateMessageCountMetric() { private func updateMessageCountMetric() async {
scheduler.schedule { [weak self] in let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0
guard let self else { return } _ = try? await messagesToDeviceMetric.update(lastValue + 1)
let lastValue = await self.messagesToDeviceMetric.lastValue()?.value ?? 0
_ = try? await messagesToDeviceMetric.update(lastValue + 1)
}
} }
// MARK: API // MARK: API
private var deviceStatus: String { private var deviceStatus: String {
deviceIsConnected ? "1" : "0" "\(deviceState.rawValue)"
} }
func sendMessageToDevice(_ message: Data, on eventLoop: EventLoop) -> EventLoopFuture<Data> { func sendMessageToDevice(_ message: Data, on eventLoop: EventLoop) async throws -> Data {
guard let socket = connection, !socket.isClosed else { guard let socket = connection, !socket.isClosed else {
connection = nil connection = nil
return eventLoop.makeSucceededFuture(MessageResult.deviceNotConnected.encoded) throw MessageResult.deviceNotConnected
} }
guard requestInProgress == nil else { guard requestInProgress == nil else {
return eventLoop.makeSucceededFuture(MessageResult.operationInProgress.encoded) throw MessageResult.operationInProgress
} }
let result = eventLoop.makePromise(of: Data.self) do {
self.requestInProgress = result try await socket.send(Array(message))
socket.send(Array(message), promise: nil) await updateMessageCountMetric()
updateMessageCountMetric() } catch {
eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in throw MessageResult.deviceNotConnected
guard let promise = self?.requestInProgress else {
return
}
self?.requestInProgress = nil
log("Timed out waiting for device response")
promise.succeed(MessageResult.deviceTimedOut.encoded)
} }
return result.futureResult startTimeoutForDeviceRequest(on: eventLoop)
let result: Data = try await withCheckedThrowingContinuation { continuation in
self.requestInProgress = continuation
}
return result
} }
func authenticateDevice(hash: String) { private func startTimeoutForDeviceRequest(on eventLoop: EventLoop) {
defer { updateDeviceConnectionMetric() } eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in
self?.resumeDeviceRequest(with: .deviceTimedOut)
}
}
private func resumeDeviceRequest(with data: Data) {
requestInProgress?.resume(returning: data)
requestInProgress = nil
}
private func resumeDeviceRequest(with result: MessageResult) {
requestInProgress?.resume(throwing: result)
requestInProgress = nil
}
func authenticateDevice(hash: String) async {
guard let key = Data(fromHexEncodedString: hash), guard let key = Data(fromHexEncodedString: hash),
SHA256.hash(data: key) == self.deviceKey else { SHA256.hash(data: key) == self.deviceKey else {
log("Invalid device key") log("Invalid device key")
_ = connection?.close() await removeDeviceConnection()
deviceIsAuthenticated = false return
}
guard let connection, !connection.isClosed else {
await updateDeviceConnectionMetric()
return return
} }
log("Device authenticated")
deviceIsAuthenticated = true deviceIsAuthenticated = true
await updateDeviceConnectionMetric()
} }
func authenticateRemote(_ token: Data) -> Bool { func authenticateRemote(_ token: Data) -> Bool {
@ -115,60 +146,39 @@ final class DeviceManager {
func processDeviceResponse(_ buffer: ByteBuffer) { func processDeviceResponse(_ buffer: ByteBuffer) {
guard let data = buffer.getData(at: 0, length: buffer.readableBytes) else { guard let data = buffer.getData(at: 0, length: buffer.readableBytes) else {
log("Failed to get data buffer received from device") log("Failed to get data buffer received from device")
self.resumeDeviceRequest(with: .invalidDeviceResponse)
return return
} }
guard let promise = requestInProgress else { self.resumeDeviceRequest(with: data)
log("Received device response \(data) without an active request")
return
}
defer { requestInProgress = nil }
log("Device response received")
promise.succeed(data)
} }
func didCloseDeviceSocket() { func didCloseDeviceSocket() {
defer { updateDeviceConnectionMetric() }
guard !isOpeningNewConnection else {
return
}
deviceIsAuthenticated = false deviceIsAuthenticated = false
guard connection != nil else {
log("Socket closed, but no connection anyway")
return
}
connection = nil connection = nil
log("Socket closed")
} }
func removeDeviceConnection() { func removeDeviceConnection() async {
defer { updateDeviceConnectionMetric() } try? await connection?.close()
deviceIsAuthenticated = false
guard let socket = connection else {
return
}
socket.close().whenSuccess { log("Socket closed") }
connection = nil connection = nil
log("Removed device connection") deviceIsAuthenticated = false
await updateDeviceConnectionMetric()
} }
func createNewDeviceConnection(_ socket: WebSocket) { func createNewDeviceConnection(_ socket: WebSocket) async {
defer { updateDeviceConnectionMetric() } await removeDeviceConnection()
socket.onBinary { _, data in socket.onBinary { [weak self] _, data in
self.processDeviceResponse(data) self?.processDeviceResponse(data)
} }
socket.onText { _, text in socket.onText { [weak self] _, text async in
self.authenticateDevice(hash: text) await self?.authenticateDevice(hash: text)
} }
_ = socket.onClose.always { _ in _ = socket.onClose.always { [weak self] _ in
self.didCloseDeviceSocket() self?.didCloseDeviceSocket()
} }
isOpeningNewConnection = true
removeDeviceConnection()
connection = socket connection = socket
log("Socket connected") await updateDeviceConnectionMetric()
isOpeningNewConnection = false
} }
} }

View File

@ -47,13 +47,6 @@ public func configure(_ app: Application) throws {
provider.asyncScheduler = asyncScheduler provider.asyncScheduler = asyncScheduler
provider.registerRoutes(app) provider.registerRoutes(app)
// Gracefully shut down by closing potentially open socket
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + .seconds(5)) {
_ = app.server.onShutdown.always { _ in
deviceManager.removeDeviceConnection()
}
}
asyncScheduler.schedule { asyncScheduler.schedule {
_ = try await status.update(.nominal) _ = try await status.update(.nominal)
} }
@ -62,6 +55,14 @@ public func configure(_ app: Application) throws {
df.dateStyle = .short df.dateStyle = .short
df.timeStyle = .short df.timeStyle = .short
print("[\(df.string(from: Date()))] Server started") print("[\(df.string(from: Date()))] Server started")
// Gracefully shut down by closing potentially open socket
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + .seconds(5)) {
_ = app.server.onShutdown.always { _ in
print("[\(df.string(from: Date()))] Server shutdown")
//await deviceManager.removeDeviceConnection()
}
}
} }
private func loadKeys(at url: URL) throws -> (deviceKey: Data, remoteKey: Data) { private func loadKeys(at url: URL) throws -> (deviceKey: Data, remoteKey: Data) {

View File

@ -11,33 +11,33 @@ extension RouteAPI {
} }
} }
private func messageTransmission(_ req: Request) -> EventLoopFuture<Data> { private func messageTransmission(_ req: Request) async throws -> Data {
guard let body = req.body.data else { guard let body = req.body.data else {
return req.eventLoop.makeSucceededFuture(MessageResult.noBodyData.encoded) throw MessageResult.noBodyData
} }
guard let message = ServerMessage(decodeFrom: body) else { guard let message = ServerMessage(decodeFrom: body) else {
return req.eventLoop.makeSucceededFuture(MessageResult.invalidMessageSize.encoded) throw MessageResult.invalidMessageSize
} }
guard deviceManager.authenticateRemote(message.authToken) else { guard deviceManager.authenticateRemote(message.authToken) else {
return req.eventLoop.makeSucceededFuture(MessageResult.messageAuthenticationFailed.encoded) throw MessageResult.messageAuthenticationFailed
} }
return deviceManager.sendMessageToDevice(message.message, on: req.eventLoop) return try await deviceManager.sendMessageToDevice(message.message, on: req.eventLoop)
} }
private func deviceStatus(_ req: Request) -> EventLoopFuture<MessageResult> { private func deviceStatus(_ req: Request) -> MessageResult {
guard let body = req.body.data else { guard let body = req.body.data else {
return req.eventLoop.makeSucceededFuture(.noBodyData) return .noBodyData
} }
guard let authToken = ServerMessage.token(from: body) else { guard let authToken = ServerMessage.token(from: body) else {
return req.eventLoop.makeSucceededFuture(.invalidMessageSize) return .invalidMessageSize
} }
guard deviceManager.authenticateRemote(authToken) else { guard deviceManager.authenticateRemote(authToken) else {
return req.eventLoop.makeSucceededFuture(.messageAuthenticationFailed) return .messageAuthenticationFailed
} }
guard deviceManager.deviceIsConnected else { guard deviceManager.deviceIsConnected else {
return req.eventLoop.makeSucceededFuture(.deviceNotConnected) return .deviceNotConnected
} }
return req.eventLoop.makeSucceededFuture(.deviceConnected) return .deviceConnected
} }
func routes(_ app: Application) throws { func routes(_ app: Application) throws {
@ -50,10 +50,9 @@ func routes(_ app: Application) throws {
The request returns one byte of data, which is the raw value of a `MessageResult`. The request returns one byte of data, which is the raw value of a `MessageResult`.
Possible results are `noBodyData`, `invalidMessageSize`, `deviceNotConnected`, `deviceConnected`. Possible results are `noBodyData`, `invalidMessageSize`, `deviceNotConnected`, `deviceConnected`.
*/ */
app.post(RouteAPI.getDeviceStatus.path) { req in app.post(RouteAPI.getDeviceStatus.path) { request in
deviceStatus(req).map { let result = deviceStatus(request)
Response(status: .ok, body: .init(data: $0.encoded)) return Response(status: .ok, body: .init(data: result.encoded))
}
} }
/** /**
@ -64,9 +63,12 @@ func routes(_ app: Application) throws {
The request returns one or `Message.length+1` bytes of data, where the first byte is the raw value of a `MessageResult`, The request returns one or `Message.length+1` bytes of data, where the first byte is the raw value of a `MessageResult`,
and the optional following bytes contain the response message of the device. This request does not complete until either the device responds or the request times out. The timeout is specified by `KeyManagement.deviceTimeout`. and the optional following bytes contain the response message of the device. This request does not complete until either the device responds or the request times out. The timeout is specified by `KeyManagement.deviceTimeout`.
*/ */
app.post(RouteAPI.postMessage.path) { req in app.post(RouteAPI.postMessage.path) { request async throws in
messageTransmission(req).map { do {
Response(status: .ok, body: .init(data: $0)) let result = try await messageTransmission(request)
return Response(status: .ok, body: .init(data: result))
} catch let error as MessageResult {
return Response(status: .ok, body: .init(data: error.encoded))
} }
} }
@ -75,7 +77,7 @@ func routes(_ app: Application) throws {
- Returns: Nothing - Returns: Nothing
- Note: The first message from the device over the connection must be a valid auth token. - Note: The first message from the device over the connection must be a valid auth token.
*/ */
app.webSocket(RouteAPI.socket.path) { req, socket in app.webSocket(RouteAPI.socket.path) { req, socket async in
deviceManager.createNewDeviceConnection(socket) await deviceManager.createNewDeviceConnection(socket)
} }
} }