Attempt metric log migration

This commit is contained in:
Christoph Hagen 2023-09-07 15:23:44 +02:00
parent 04248b04af
commit e52f44f168
2 changed files with 100 additions and 0 deletions

View File

@ -11,6 +11,7 @@ let package = Package(
.package(url: "https://github.com/christophhagen/Clairvoyant", from: "0.9.0"), .package(url: "https://github.com/christophhagen/Clairvoyant", from: "0.9.0"),
.package(url: "https://github.com/christophhagen/ClairvoyantVapor", from: "0.2.0"), .package(url: "https://github.com/christophhagen/ClairvoyantVapor", from: "0.2.0"),
.package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.1"), .package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.1"),
.package(url:"https://github.com/christophhagen/CBORCoding", from: "1.0.0"),
], ],
targets: [ targets: [
.target( .target(
@ -20,6 +21,7 @@ let package = Package(
.product(name: "Clairvoyant", package: "Clairvoyant"), .product(name: "Clairvoyant", package: "Clairvoyant"),
.product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"), .product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"),
.product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"), .product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"),
.product(name: "CBORCoding", package: "CBORCoding")
], ],
swiftSettings: [ swiftSettings: [
// Enable better optimizations when building in Release configuration. Despite the use of // Enable better optimizations when building in Release configuration. Despite the use of

View File

@ -23,6 +23,8 @@ private let dateFormatter: DateFormatter = {
public func configure(_ app: Application) async throws { public func configure(_ app: Application) async throws {
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory) let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
let logFolder = storageFolder.appendingPathComponent("logs") let logFolder = storageFolder.appendingPathComponent("logs")
try await migrate(folder: logFolder)
fatalError("Done")
let monitor = MetricObserver(logFileFolder: logFolder, logMetricId: "sesame.log") let monitor = MetricObserver(logFileFolder: logFolder, logMetricId: "sesame.log")
MetricObserver.standard = monitor MetricObserver.standard = monitor
@ -86,3 +88,99 @@ func log(_ message: String) {
} }
observer.log(message) observer.log(message)
} }
import CBORCoding
private func migrate(folder: URL) async throws {
try await migrateMetric("sesame.log", containing: String.self, in: folder)
try await migrateMetric("sesame.status", containing: ServerStatus.self, in: folder)
try await migrateMetric("sesame.connected", containing: Bool.self, in: folder)
try await migrateMetric("sesame.messages", containing: Int.self, in: folder)
}
private func migrateMetric<T>(_ id: String, containing type: T.Type, in folder: URL) async throws where T: MetricValue {
print("Processing metric \(id)")
let file = id.hashed()
let url = folder.appendingPathComponent(file)
let files = try FileManager.default.contentsOfDirectory(at: url, includingPropertiesForKeys: nil)
.filter { Int($0.lastPathComponent) != nil }
print("Found \(files.count) files for \(id)")
let all: [Timestamped<T>] = try files.map(readElements(from:))
.reduce([], +)
.sorted { $0.timestamp < $1.timestamp }
print("Found \(all.count) items for \(id)")
try FileManager.default.removeItem(at: url)
print("Removed log folder")
// TODO: Write values back to disk
let observer = MetricObserver(logFileFolder: folder, logMetricId: "sesame.migration")
let metric: Metric<T> = observer.addMetric(id: id)
try await metric.update(all)
print("Finished metric \(id)")
}
private func readElements<T>(from url: URL) throws -> [Timestamped<T>] where T: MetricValue {
let data = try Data(contentsOf: url)
let file = url.lastPathComponent
print("File \(file): Loaded \(data.count) bytes")
let decoder = CBORDecoder()
let timestampLength = 9
let byteCountLength = 2
var result: [Timestamped<T>] = []
var currentIndex = data.startIndex
while currentIndex < data.endIndex {
let startIndexOfTimestamp = currentIndex + byteCountLength
guard startIndexOfTimestamp <= data.endIndex else {
print("File \(file): Only \(data.endIndex - currentIndex) bytes, needed \(byteCountLength) for byte count")
throw MetricError.logFileCorrupted
}
guard let byteCount = UInt16(fromData: data[currentIndex..<startIndexOfTimestamp]) else {
print("File \(file): Invalid byte count")
throw MetricError.logFileCorrupted
}
let nextIndex = startIndexOfTimestamp + Int(byteCount)
guard nextIndex <= data.endIndex else {
print("File \(file): Needed \(byteCountLength + Int(byteCount)) for timestamped value, has \(data.endIndex - startIndexOfTimestamp)")
throw MetricError.logFileCorrupted
}
guard byteCount >= timestampLength else {
print("File \(file): Only \(byteCount) bytes, needed \(timestampLength) for timestamp")
throw MetricError.logFileCorrupted
}
let timestampData = data[startIndexOfTimestamp..<startIndexOfTimestamp+timestampLength]
let timestamp = try decoder.decode(Double.self, from: timestampData)
let date = Date(timeIntervalSince1970: timestamp)
let elementData = data[currentIndex..<nextIndex]
let element: T = try decoder.decode(from: elementData)
result.append(.init(value: element, timestamp: date))
currentIndex = nextIndex
if result.count % 100 == 0 {
print("File \(file): \(result.count) entries loaded (\(currentIndex)/\(data.endIndex) bytes)")
}
}
print("Loaded \(result.count) data points")
return result
}
extension UInt16 {
func toData() -> Data {
Data([UInt8(self >> 8 & 0xFF), UInt8(self & 0xFF)])
}
init?<T: DataProtocol>(fromData data: T) {
guard data.count == 2 else {
return nil
}
let bytes = Array(data)
self = UInt16(UInt32(bytes[0]) << 8 | UInt32(bytes[1]))
}
}