Move to newer metrics version
This commit is contained in:
parent
e96b85b1cc
commit
23fd5055cd
@ -8,7 +8,7 @@ let package = Package(
|
|||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
|
.package(url: "https://github.com/vapor/vapor.git", from: "4.0.0"),
|
||||||
.package(url: "https://github.com/christophhagen/clairvoyant.git", from: "0.4.0"),
|
.package(url: "https://github.com/christophhagen/clairvoyant.git", from: "0.5.0"),
|
||||||
],
|
],
|
||||||
targets: [
|
targets: [
|
||||||
.target(
|
.target(
|
||||||
|
@ -33,27 +33,31 @@ final class DeviceManager {
|
|||||||
/// A promise to finish the request once the device responds or times out
|
/// A promise to finish the request once the device responds or times out
|
||||||
private var requestInProgress: EventLoopPromise<DeviceResponse>?
|
private var requestInProgress: EventLoopPromise<DeviceResponse>?
|
||||||
|
|
||||||
init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) {
|
init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64) async {
|
||||||
self.deviceKey = deviceKey
|
self.deviceKey = deviceKey
|
||||||
self.remoteKey = remoteKey
|
self.remoteKey = remoteKey
|
||||||
self.deviceTimeout = deviceTimeout
|
self.deviceTimeout = deviceTimeout
|
||||||
self.deviceConnectedMetric = .init(
|
self.deviceConnectedMetric = try! await .init(
|
||||||
"sesame.connected",
|
"sesame.connected",
|
||||||
name: "Device connected",
|
name: "Device connected",
|
||||||
description: "Shows if the device is connected via WebSocket")
|
description: "Shows if the device is connected via WebSocket")
|
||||||
self.messagesToDeviceMetric = .init(
|
self.messagesToDeviceMetric = try! await .init(
|
||||||
"sesame.messages",
|
"sesame.messages",
|
||||||
name: "Forwarded Messages",
|
name: "Forwarded Messages",
|
||||||
description: "The number of messages transmitted to the device")
|
description: "The number of messages transmitted to the device")
|
||||||
}
|
}
|
||||||
|
|
||||||
private func updateDeviceConnectionMetric() {
|
private func updateDeviceConnectionMetric() {
|
||||||
deviceConnectedMetric.update(deviceIsConnected)
|
Task {
|
||||||
|
try? await deviceConnectedMetric.update(deviceIsConnected)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func updateMessageCountMetric() {
|
private func updateMessageCountMetric() {
|
||||||
let lastValue = messagesToDeviceMetric.lastValue()?.value ?? 0
|
Task {
|
||||||
messagesToDeviceMetric.update(lastValue + 1)
|
let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0
|
||||||
|
try? await messagesToDeviceMetric.update(lastValue + 1)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// MARK: API
|
// MARK: API
|
||||||
@ -70,7 +74,8 @@ final class DeviceManager {
|
|||||||
guard requestInProgress == nil else {
|
guard requestInProgress == nil else {
|
||||||
return eventLoop.makeSucceededFuture(.operationInProgress)
|
return eventLoop.makeSucceededFuture(.operationInProgress)
|
||||||
}
|
}
|
||||||
requestInProgress = eventLoop.makePromise(of: DeviceResponse.self)
|
let result = eventLoop.makePromise(of: DeviceResponse.self)
|
||||||
|
self.requestInProgress = result
|
||||||
socket.send(message.bytes, promise: nil)
|
socket.send(message.bytes, promise: nil)
|
||||||
updateMessageCountMetric()
|
updateMessageCountMetric()
|
||||||
eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in
|
eventLoop.scheduleTask(in: .seconds(deviceTimeout)) { [weak self] in
|
||||||
@ -80,7 +85,7 @@ final class DeviceManager {
|
|||||||
self?.requestInProgress = nil
|
self?.requestInProgress = nil
|
||||||
promise.succeed(.deviceTimedOut)
|
promise.succeed(.deviceTimedOut)
|
||||||
}
|
}
|
||||||
return requestInProgress!.futureResult
|
return result.futureResult
|
||||||
}
|
}
|
||||||
|
|
||||||
func authenticateDevice(hash: String) {
|
func authenticateDevice(hash: String) {
|
||||||
|
@ -9,21 +9,21 @@ enum ServerError: Error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// configures your application
|
// configures your application
|
||||||
public func configure(_ app: Application) throws {
|
public func configure(_ app: Application) async throws {
|
||||||
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
|
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
|
||||||
let logFolder = storageFolder.appendingPathComponent("logs")
|
let logFolder = storageFolder.appendingPathComponent("logs")
|
||||||
|
|
||||||
let accessManager = AccessTokenManager([])
|
let accessManager = AccessTokenManager([])
|
||||||
let monitor = MetricObserver(
|
let monitor = await MetricObserver(
|
||||||
logFolder: logFolder,
|
logFolder: logFolder,
|
||||||
accessManager: accessManager,
|
accessManager: accessManager,
|
||||||
logMetricId: "sesame.log")
|
logMetricId: "sesame.log")
|
||||||
MetricObserver.standard = monitor
|
MetricObserver.standard = monitor
|
||||||
|
|
||||||
let status = Metric<ServerStatus>("sesame.status")
|
let status = try await Metric<ServerStatus>("sesame.status")
|
||||||
status.update(.initializing)
|
try await status.update(.initializing)
|
||||||
|
|
||||||
monitor.registerRoutes(app)
|
await monitor.registerRoutes(app)
|
||||||
|
|
||||||
let configUrl = storageFolder.appendingPathComponent("config.json")
|
let configUrl = storageFolder.appendingPathComponent("config.json")
|
||||||
let config = try Config(loadFrom: configUrl)
|
let config = try Config(loadFrom: configUrl)
|
||||||
@ -35,7 +35,11 @@ public func configure(_ app: Application) throws {
|
|||||||
let keyFile = storageFolder.appendingPathComponent(config.keyFileName)
|
let keyFile = storageFolder.appendingPathComponent(config.keyFileName)
|
||||||
|
|
||||||
let (deviceKey, remoteKey) = try loadKeys(at: keyFile)
|
let (deviceKey, remoteKey) = try loadKeys(at: keyFile)
|
||||||
deviceManager = DeviceManager(deviceKey: deviceKey, remoteKey: remoteKey, deviceTimeout: config.deviceTimeout)
|
deviceManager = await DeviceManager(
|
||||||
|
deviceKey: deviceKey,
|
||||||
|
remoteKey: remoteKey,
|
||||||
|
deviceTimeout: config.deviceTimeout)
|
||||||
|
|
||||||
try routes(app)
|
try routes(app)
|
||||||
|
|
||||||
// Gracefully shut down by closing potentially open socket
|
// Gracefully shut down by closing potentially open socket
|
||||||
@ -45,7 +49,7 @@ public func configure(_ app: Application) throws {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
status.update(.nominal)
|
try await status.update(.nominal)
|
||||||
}
|
}
|
||||||
|
|
||||||
private func loadKeys(at url: URL) throws -> (deviceKey: Data, remoteKey: Data) {
|
private func loadKeys(at url: URL) throws -> (deviceKey: Data, remoteKey: Data) {
|
||||||
@ -69,6 +73,11 @@ private func loadKeys(at url: URL) throws -> (deviceKey: Data, remoteKey: Data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func log(_ message: String) {
|
func log(_ message: String) {
|
||||||
MetricObserver.standard?.log(message)
|
guard let observer = MetricObserver.standard else {
|
||||||
print(message)
|
print(message)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
Task {
|
||||||
|
await observer.log(message)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
import App
|
import App
|
||||||
import Vapor
|
import Vapor
|
||||||
|
|
||||||
var env = try Environment.detect()
|
var env = Environment.production //.detect()
|
||||||
try LoggingSystem.bootstrap(from: &env)
|
try LoggingSystem.bootstrap(from: &env)
|
||||||
let app = Application(env)
|
let app = Application(env)
|
||||||
defer { app.shutdown() }
|
defer { app.shutdown() }
|
||||||
try configure(app)
|
|
||||||
|
private let semaphore = DispatchSemaphore(value: 0)
|
||||||
|
Task {
|
||||||
|
try await configure(app)
|
||||||
|
semaphore.signal()
|
||||||
|
}
|
||||||
|
semaphore.wait()
|
||||||
try app.run()
|
try app.run()
|
||||||
|
@ -43,10 +43,10 @@ final class AppTests: XCTestCase {
|
|||||||
XCTAssertEqual(content, input.content)
|
XCTAssertEqual(content, input.content)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMessageTransmission() throws {
|
func testMessageTransmission() async throws {
|
||||||
let app = Application(.testing)
|
let app = Application(.testing)
|
||||||
defer { app.shutdown() }
|
defer { app.shutdown() }
|
||||||
try configure(app)
|
try await configure(app)
|
||||||
|
|
||||||
// How to open a socket via request?
|
// How to open a socket via request?
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user