Update clairvoyant

This commit is contained in:
Christoph Hagen 2023-10-01 19:26:31 +02:00
parent 810bff0eb3
commit 00ac95df01
3 changed files with 45 additions and 6 deletions

View File

@ -33,15 +33,15 @@ final class DeviceManager {
/// A promise to finish the request once the device responds or times out /// A promise to finish the request once the device responds or times out
private var requestInProgress: EventLoopPromise<Data>? private var requestInProgress: EventLoopPromise<Data>?
init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) async { init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) {
self.deviceKey = deviceKey self.deviceKey = deviceKey
self.remoteKey = remoteKey self.remoteKey = remoteKey
self.deviceTimeout = deviceTimeout self.deviceTimeout = deviceTimeout
self.deviceConnectedMetric = try! await .init( self.deviceConnectedMetric = .init(
"sesame.connected", "sesame.connected",
name: "Device connected", name: "Device connected",
description: "Shows if the device is connected via WebSocket") description: "Shows if the device is connected via WebSocket")
self.messagesToDeviceMetric = try! await .init( self.messagesToDeviceMetric = .init(
"sesame.messages", "sesame.messages",
name: "Forwarded Messages", name: "Forwarded Messages",
description: "The number of messages transmitted to the device") description: "The number of messages transmitted to the device")

View File

@ -0,0 +1,34 @@
import Foundation
import Vapor
import NIOCore
import ClairvoyantVapor
final class EventLoopScheduler {
private let backgroundGroup: EventLoopGroup
init() {
backgroundGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
}
func next() -> EventLoop {
backgroundGroup.next()
}
func provider() -> NIOEventLoopGroupProvider {
return .shared(backgroundGroup)
}
func shutdown() {
backgroundGroup.shutdownGracefully { _ in
}
}
}
extension EventLoopScheduler: ClairvoyantVapor.AsyncScheduler {
func schedule(asyncJob: @escaping @Sendable () async throws -> Void) {
_ = backgroundGroup.any().makeFutureWithTask(asyncJob)
}
}

View File

@ -7,6 +7,8 @@ var deviceManager: DeviceManager!
private var provider: VaporMetricProvider! private var provider: VaporMetricProvider!
private var asyncScheduler = EventLoopScheduler()
enum ServerError: Error { enum ServerError: Error {
case invalidAuthenticationFileContent case invalidAuthenticationFileContent
case invalidAuthenticationToken case invalidAuthenticationToken
@ -27,7 +29,7 @@ public func configure(_ app: Application) async throws {
let monitor = MetricObserver(logFileFolder: logFolder, logMetricId: "sesame.log") let monitor = MetricObserver(logFileFolder: logFolder, logMetricId: "sesame.log")
MetricObserver.standard = monitor MetricObserver.standard = monitor
let status = try await Metric<ServerStatus>("sesame.status") let status = Metric<ServerStatus>("sesame.status")
_ = try await status.update(.initializing) _ = try await status.update(.initializing)
let configUrl = storageFolder.appendingPathComponent("config.json") let configUrl = storageFolder.appendingPathComponent("config.json")
@ -38,7 +40,7 @@ public func configure(_ app: Application) async throws {
let keyFile = storageFolder.appendingPathComponent(config.keyFileName) let keyFile = storageFolder.appendingPathComponent(config.keyFileName)
let (deviceKey, remoteKey) = try loadKeys(at: keyFile) let (deviceKey, remoteKey) = try loadKeys(at: keyFile)
deviceManager = await DeviceManager( deviceManager = DeviceManager(
deviceKey: deviceKey, deviceKey: deviceKey,
remoteKey: remoteKey, remoteKey: remoteKey,
deviceTimeout: config.deviceTimeout) deviceTimeout: config.deviceTimeout)
@ -46,6 +48,7 @@ public func configure(_ app: Application) async throws {
try routes(app) try routes(app)
provider = .init(observer: monitor, accessManager: config.authenticationTokens) provider = .init(observer: monitor, accessManager: config.authenticationTokens)
provider.asyncScheduler = asyncScheduler
provider.registerRoutes(app) provider.registerRoutes(app)
// Gracefully shut down by closing potentially open socket // Gracefully shut down by closing potentially open socket
@ -84,5 +87,7 @@ func log(_ message: String) {
print(message) print(message)
return return
} }
observer.log(message) asyncScheduler.schedule {
await observer.log(message)
}
} }