Compare commits
2 Commits
e7aa2774df
...
11be3e78b3
Author | SHA1 | Date | |
---|---|---|---|
|
11be3e78b3 | ||
|
29a72032c6 |
@ -8,13 +8,19 @@ let package = Package(
|
|||||||
],
|
],
|
||||||
dependencies: [
|
dependencies: [
|
||||||
.package(url: "https://github.com/vapor/vapor", from: "4.0.0"),
|
.package(url: "https://github.com/vapor/vapor", from: "4.0.0"),
|
||||||
.package(url: "https://github.com/christophhagen/Clairvoyant", from: "0.5.0"),
|
.package(url: "https://github.com/christophhagen/Clairvoyant", from: "0.9.0"),
|
||||||
|
.package(url: "https://github.com/christophhagen/ClairvoyantVapor", from: "0.2.0"),
|
||||||
|
.package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.0"),
|
||||||
|
.package(url:"https://github.com/christophhagen/CBORCoding", from: "1.0.0"),
|
||||||
],
|
],
|
||||||
targets: [
|
targets: [
|
||||||
.target(name: "App",
|
.target(name: "App",
|
||||||
dependencies: [
|
dependencies: [
|
||||||
.product(name: "Vapor", package: "vapor"),
|
.product(name: "Vapor", package: "vapor"),
|
||||||
.product(name: "Clairvoyant", package: "Clairvoyant"),
|
.product(name: "Clairvoyant", package: "Clairvoyant"),
|
||||||
|
.product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"),
|
||||||
|
.product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"),
|
||||||
|
.product(name: "CBORCoding", package: "CBORCoding"),
|
||||||
],
|
],
|
||||||
swiftSettings: [
|
swiftSettings: [
|
||||||
// Enable better optimizations when building in Release configuration. Despite the use of
|
// Enable better optimizations when building in Release configuration. Despite the use of
|
||||||
|
@ -1,8 +1,7 @@
|
|||||||
import Foundation
|
import Foundation
|
||||||
import Clairvoyant
|
|
||||||
import Vapor
|
import Vapor
|
||||||
|
|
||||||
final class Authenticator: MetricAccessManager {
|
final class Authenticator {
|
||||||
|
|
||||||
private var writers: Set<String>
|
private var writers: Set<String>
|
||||||
|
|
||||||
@ -10,7 +9,6 @@ final class Authenticator: MetricAccessManager {
|
|||||||
self.writers = Set(writers)
|
self.writers = Set(writers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
func hasAuthorization(for key: String) -> Bool {
|
func hasAuthorization(for key: String) -> Bool {
|
||||||
// Note: This is not a constant-time compare, so there may be an opportunity
|
// Note: This is not a constant-time compare, so there may be an opportunity
|
||||||
// for timing attack here. Sets perform hashed lookups, so this may be less of an issue,
|
// for timing attack here. Sets perform hashed lookups, so this may be less of an issue,
|
||||||
@ -20,20 +18,6 @@ final class Authenticator: MetricAccessManager {
|
|||||||
writers.contains(key)
|
writers.contains(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func metricListAccess(isAllowedForToken accessToken: AccessToken) throws {
|
|
||||||
guard let key = String(data: accessToken, encoding: .utf8) else {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
guard hasAuthorization(for: key) else {
|
|
||||||
throw MetricError.accessDenied
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func metricAccess(to metric: MetricId, isAllowedForToken accessToken: AccessToken) throws {
|
|
||||||
try metricListAccess(isAllowedForToken: accessToken)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
func authorize(_ request: Request) throws {
|
func authorize(_ request: Request) throws {
|
||||||
guard let key = request.headers.first(name: "key") else {
|
guard let key = request.headers.first(name: "key") else {
|
||||||
throw Abort(.badRequest) // 400
|
throw Abort(.badRequest) // 400
|
||||||
|
@ -67,8 +67,8 @@ final class CapServer {
|
|||||||
didSet {
|
didSet {
|
||||||
scheduleSave()
|
scheduleSave()
|
||||||
Task {
|
Task {
|
||||||
try? await capCountMetric.update(caps.count)
|
_ = try? await capCountMetric.update(caps.count)
|
||||||
try? await imageCountMetric.update(imageCount)
|
_ = try? await imageCountMetric.update(imageCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,10 @@
|
|||||||
import Vapor
|
import Vapor
|
||||||
import Foundation
|
import Foundation
|
||||||
import Clairvoyant
|
import Clairvoyant
|
||||||
|
import ClairvoyantVapor
|
||||||
|
import ClairvoyantBinaryCodable
|
||||||
|
|
||||||
|
private var provider: VaporMetricProvider!
|
||||||
|
|
||||||
public func configure(_ app: Application) async throws {
|
public func configure(_ app: Application) async throws {
|
||||||
|
|
||||||
@ -10,12 +14,7 @@ public func configure(_ app: Application) async throws {
|
|||||||
let config = Config(loadFrom: resourceDirectory)
|
let config = Config(loadFrom: resourceDirectory)
|
||||||
let authenticator = Authenticator(writers: config.writers)
|
let authenticator = Authenticator(writers: config.writers)
|
||||||
|
|
||||||
let monitor = await MetricObserver(
|
let monitor = MetricObserver(logFileFolder: config.logURL, logMetricId: "caps.log")
|
||||||
logFolder: config.logURL,
|
|
||||||
accessManager: authenticator,
|
|
||||||
logMetricId: "caps.log")
|
|
||||||
|
|
||||||
// All new metrics are automatically registered with the standard observer
|
|
||||||
MetricObserver.standard = monitor
|
MetricObserver.standard = monitor
|
||||||
|
|
||||||
let status = try await Metric<ServerStatus>("caps.status",
|
let status = try await Metric<ServerStatus>("caps.status",
|
||||||
@ -28,7 +27,8 @@ public func configure(_ app: Application) async throws {
|
|||||||
|
|
||||||
let server = await CapServer(in: URL(fileURLWithPath: publicDirectory))
|
let server = await CapServer(in: URL(fileURLWithPath: publicDirectory))
|
||||||
|
|
||||||
await monitor.registerRoutes(app)
|
provider = .init(observer: monitor, accessManager: config.writers)
|
||||||
|
provider.registerRoutes(app)
|
||||||
|
|
||||||
if config.serveFiles {
|
if config.serveFiles {
|
||||||
let middleware = FileMiddleware(publicDirectory: publicDirectory)
|
let middleware = FileMiddleware(publicDirectory: publicDirectory)
|
||||||
@ -52,7 +52,114 @@ func log(_ message: String) {
|
|||||||
print(message)
|
print(message)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
observer.log(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
import CBORCoding
|
||||||
|
|
||||||
|
public func migrate(folder: URL) throws {
|
||||||
|
try migrateMetric("caps.log", containing: String.self, in: folder)
|
||||||
|
try migrateMetric("caps.status", containing: ServerStatus.self, in: folder)
|
||||||
|
try migrateMetric("caps.count", containing: Int.self, in: folder)
|
||||||
|
try migrateMetric("caps.images", containing: Int.self, in: folder)
|
||||||
|
try migrateMetric("caps.classifier", containing: Int.self, in: folder)
|
||||||
|
}
|
||||||
|
|
||||||
|
private func migrateMetric<T>(_ id: String, containing type: T.Type, in folder: URL) throws where T: MetricValue {
|
||||||
|
print("Processing metric \(id)")
|
||||||
|
let file = id.hashed()
|
||||||
|
let url = folder.appendingPathComponent(file)
|
||||||
|
|
||||||
|
let files = try FileManager.default.contentsOfDirectory(at: url, includingPropertiesForKeys: nil)
|
||||||
|
.filter { Int($0.lastPathComponent) != nil }
|
||||||
|
print("Found \(files.count) files for \(id)")
|
||||||
|
|
||||||
|
let all: [Timestamped<T>] = try files.map(readElements(from:))
|
||||||
|
.reduce([], +)
|
||||||
|
.sorted { $0.timestamp < $1.timestamp }
|
||||||
|
|
||||||
|
print("Found \(all.count) items for \(id)")
|
||||||
|
|
||||||
|
try FileManager.default.removeItem(at: url)
|
||||||
|
|
||||||
|
print("Removed log folder")
|
||||||
|
|
||||||
|
// TODO: Write values back to disk
|
||||||
|
let observer = MetricObserver(logFileFolder: folder, logMetricId: "sesame.migration")
|
||||||
|
let metric: Metric<T> = observer.addMetric(id: id)
|
||||||
|
let semaphore = DispatchSemaphore(value: 0)
|
||||||
Task {
|
Task {
|
||||||
await observer.log(message)
|
try await metric.update(all)
|
||||||
|
print("Saved all values for metric \(id)")
|
||||||
|
semaphore.signal()
|
||||||
|
}
|
||||||
|
semaphore.wait()
|
||||||
|
print("Finished metric \(id)")
|
||||||
|
}
|
||||||
|
|
||||||
|
private func readElements<T>(from url: URL) throws -> [Timestamped<T>] where T: MetricValue {
|
||||||
|
let data = try Data(contentsOf: url)
|
||||||
|
let file = url.lastPathComponent
|
||||||
|
print("File \(file): Loaded \(data.count) bytes")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
let decoder = CBORDecoder()
|
||||||
|
let timestampLength = 9
|
||||||
|
let byteCountLength = 2
|
||||||
|
|
||||||
|
var result: [Timestamped<T>] = []
|
||||||
|
var currentIndex = data.startIndex
|
||||||
|
var skippedValues = 0
|
||||||
|
while currentIndex < data.endIndex {
|
||||||
|
let startIndexOfTimestamp = currentIndex + byteCountLength
|
||||||
|
guard startIndexOfTimestamp <= data.endIndex else {
|
||||||
|
print("File \(file): Only \(data.endIndex - currentIndex) bytes, needed \(byteCountLength) for byte count")
|
||||||
|
throw MetricError.logFileCorrupted
|
||||||
|
}
|
||||||
|
guard let byteCount = UInt16(fromData: data[currentIndex..<startIndexOfTimestamp]) else {
|
||||||
|
print("File \(file): Invalid byte count")
|
||||||
|
throw MetricError.logFileCorrupted
|
||||||
|
}
|
||||||
|
let nextIndex = startIndexOfTimestamp + Int(byteCount)
|
||||||
|
guard nextIndex <= data.endIndex else {
|
||||||
|
print("File \(file): Needed \(byteCountLength + Int(byteCount)) for timestamped value, has \(data.endIndex - startIndexOfTimestamp)")
|
||||||
|
throw MetricError.logFileCorrupted
|
||||||
|
}
|
||||||
|
guard byteCount >= timestampLength else {
|
||||||
|
print("File \(file): Only \(byteCount) bytes, needed \(timestampLength) for timestamp")
|
||||||
|
throw MetricError.logFileCorrupted
|
||||||
|
}
|
||||||
|
let timestampData = data[startIndexOfTimestamp..<startIndexOfTimestamp+timestampLength]
|
||||||
|
let timestamp = try decoder.decode(Double.self, from: timestampData)
|
||||||
|
let date = Date(timeIntervalSince1970: timestamp)
|
||||||
|
let elementData = data[startIndexOfTimestamp+timestampLength..<nextIndex]
|
||||||
|
do {
|
||||||
|
let element: T = try decoder.decode(from: elementData)
|
||||||
|
result.append(.init(value: element, timestamp: date))
|
||||||
|
} catch {
|
||||||
|
skippedValues += 1
|
||||||
|
}
|
||||||
|
currentIndex = nextIndex
|
||||||
|
if result.count % 100 == 1 {
|
||||||
|
print("File \(file): \(result.count) entries loaded (\(currentIndex)/\(data.endIndex) bytes)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
print("Loaded \(result.count) data points (\(skippedValues) skipped)")
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
extension UInt16 {
|
||||||
|
|
||||||
|
func toData() -> Data {
|
||||||
|
Data([UInt8(self >> 8 & 0xFF), UInt8(self & 0xFF)])
|
||||||
|
}
|
||||||
|
|
||||||
|
init?<T: DataProtocol>(fromData data: T) {
|
||||||
|
guard data.count == 2 else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
let bytes = Array(data)
|
||||||
|
self = UInt16(UInt32(bytes[0]) << 8 | UInt32(bytes[1]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,13 @@ try LoggingSystem.bootstrap(from: &env)
|
|||||||
let app = Application(env)
|
let app = Application(env)
|
||||||
defer { app.shutdown() }
|
defer { app.shutdown() }
|
||||||
|
|
||||||
|
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
|
||||||
|
let logFolder = storageFolder.appendingPathComponent("logs")
|
||||||
|
print("Starting migration")
|
||||||
|
try migrate(folder: logFolder)
|
||||||
|
print("Finished migration")
|
||||||
|
/*
|
||||||
|
|
||||||
private let semaphore = DispatchSemaphore(value: 0)
|
private let semaphore = DispatchSemaphore(value: 0)
|
||||||
Task {
|
Task {
|
||||||
try await configure(app)
|
try await configure(app)
|
||||||
@ -13,3 +20,4 @@ Task {
|
|||||||
}
|
}
|
||||||
semaphore.wait()
|
semaphore.wait()
|
||||||
try app.run()
|
try app.run()
|
||||||
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user