Compare commits
No commits in common. "fe5128fa8bcf5e74c615c0b2b733dd999471ced1" and "44b776ef32413febcf0e140cb5d839a798e35e3d" have entirely different histories.
fe5128fa8b
...
44b776ef32
@ -11,6 +11,7 @@ let package = Package(
|
||||
.package(url: "https://github.com/christophhagen/Clairvoyant", from: "0.9.0"),
|
||||
.package(url: "https://github.com/christophhagen/ClairvoyantVapor", from: "0.2.0"),
|
||||
.package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.1"),
|
||||
.package(url:"https://github.com/christophhagen/CBORCoding", from: "1.0.0"),
|
||||
],
|
||||
targets: [
|
||||
.target(
|
||||
@ -20,6 +21,7 @@ let package = Package(
|
||||
.product(name: "Clairvoyant", package: "Clairvoyant"),
|
||||
.product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"),
|
||||
.product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"),
|
||||
.product(name: "CBORCoding", package: "CBORCoding")
|
||||
],
|
||||
swiftSettings: [
|
||||
// Enable better optimizations when building in Release configuration. Despite the use of
|
||||
|
@ -141,7 +141,7 @@ final class DeviceManager {
|
||||
guard let socket = connection else {
|
||||
return
|
||||
}
|
||||
socket.close().whenSuccess { log("Socket closed") }
|
||||
try? socket.close().wait()
|
||||
connection = nil
|
||||
log("Removed device connection")
|
||||
}
|
||||
|
@ -86,3 +86,111 @@ func log(_ message: String) {
|
||||
}
|
||||
observer.log(message)
|
||||
}
|
||||
|
||||
import CBORCoding
|
||||
|
||||
public func migrate(folder: URL) throws {
|
||||
try migrateMetric("sesame.log", containing: String.self, in: folder)
|
||||
try migrateMetric("sesame.status", containing: ServerStatus.self, in: folder)
|
||||
try migrateMetric("sesame.connected", containing: Bool.self, in: folder)
|
||||
try migrateMetric("sesame.messages", containing: Int.self, in: folder)
|
||||
}
|
||||
|
||||
private func migrateMetric<T>(_ id: String, containing type: T.Type, in folder: URL) throws where T: MetricValue {
|
||||
print("Processing metric \(id)")
|
||||
let file = id.hashed()
|
||||
let url = folder.appendingPathComponent(file)
|
||||
|
||||
let files = try FileManager.default.contentsOfDirectory(at: url, includingPropertiesForKeys: nil)
|
||||
.filter { Int($0.lastPathComponent) != nil }
|
||||
print("Found \(files.count) files for \(id)")
|
||||
|
||||
let all: [Timestamped<T>] = try files.map(readElements(from:))
|
||||
.reduce([], +)
|
||||
.sorted { $0.timestamp < $1.timestamp }
|
||||
|
||||
print("Found \(all.count) items for \(id)")
|
||||
|
||||
try FileManager.default.removeItem(at: url)
|
||||
|
||||
print("Removed log folder")
|
||||
|
||||
// TODO: Write values back to disk
|
||||
let observer = MetricObserver(logFileFolder: folder, logMetricId: "sesame.migration")
|
||||
let metric: Metric<T> = observer.addMetric(id: id)
|
||||
private let semaphore = DispatchSemaphore(value: 0)
|
||||
Task {
|
||||
try await metric.update(all)
|
||||
print("Saved all values for metric \(id)")
|
||||
semaphore.signal()
|
||||
}
|
||||
semaphore.wait()
|
||||
print("Finished metric \(id)")
|
||||
}
|
||||
|
||||
private func readElements<T>(from url: URL) throws -> [Timestamped<T>] where T: MetricValue {
|
||||
let data = try Data(contentsOf: url)
|
||||
let file = url.lastPathComponent
|
||||
print("File \(file): Loaded \(data.count) bytes")
|
||||
|
||||
|
||||
|
||||
let decoder = CBORDecoder()
|
||||
let timestampLength = 9
|
||||
let byteCountLength = 2
|
||||
|
||||
var result: [Timestamped<T>] = []
|
||||
var currentIndex = data.startIndex
|
||||
var skippedValues = 0
|
||||
while currentIndex < data.endIndex {
|
||||
let startIndexOfTimestamp = currentIndex + byteCountLength
|
||||
guard startIndexOfTimestamp <= data.endIndex else {
|
||||
print("File \(file): Only \(data.endIndex - currentIndex) bytes, needed \(byteCountLength) for byte count")
|
||||
throw MetricError.logFileCorrupted
|
||||
}
|
||||
guard let byteCount = UInt16(fromData: data[currentIndex..<startIndexOfTimestamp]) else {
|
||||
print("File \(file): Invalid byte count")
|
||||
throw MetricError.logFileCorrupted
|
||||
}
|
||||
let nextIndex = startIndexOfTimestamp + Int(byteCount)
|
||||
guard nextIndex <= data.endIndex else {
|
||||
print("File \(file): Needed \(byteCountLength + Int(byteCount)) for timestamped value, has \(data.endIndex - startIndexOfTimestamp)")
|
||||
throw MetricError.logFileCorrupted
|
||||
}
|
||||
guard byteCount >= timestampLength else {
|
||||
print("File \(file): Only \(byteCount) bytes, needed \(timestampLength) for timestamp")
|
||||
throw MetricError.logFileCorrupted
|
||||
}
|
||||
let timestampData = data[startIndexOfTimestamp..<startIndexOfTimestamp+timestampLength]
|
||||
let timestamp = try decoder.decode(Double.self, from: timestampData)
|
||||
let date = Date(timeIntervalSince1970: timestamp)
|
||||
let elementData = data[startIndexOfTimestamp+timestampLength..<nextIndex]
|
||||
do {
|
||||
let element: T = try decoder.decode(from: elementData)
|
||||
result.append(.init(value: element, timestamp: date))
|
||||
} catch {
|
||||
skippedValues += 1
|
||||
}
|
||||
currentIndex = nextIndex
|
||||
if result.count % 100 == 1 {
|
||||
print("File \(file): \(result.count) entries loaded (\(currentIndex)/\(data.endIndex) bytes)")
|
||||
}
|
||||
}
|
||||
print("Loaded \(result.count) data points (\(skippedValues) skipped)")
|
||||
return result
|
||||
}
|
||||
|
||||
extension UInt16 {
|
||||
|
||||
func toData() -> Data {
|
||||
Data([UInt8(self >> 8 & 0xFF), UInt8(self & 0xFF)])
|
||||
}
|
||||
|
||||
init?<T: DataProtocol>(fromData data: T) {
|
||||
guard data.count == 2 else {
|
||||
return nil
|
||||
}
|
||||
let bytes = Array(data)
|
||||
self = UInt16(UInt32(bytes[0]) << 8 | UInt32(bytes[1]))
|
||||
}
|
||||
}
|
||||
|
@ -6,6 +6,12 @@ try LoggingSystem.bootstrap(from: &env)
|
||||
let app = Application(env)
|
||||
defer { app.shutdown() }
|
||||
|
||||
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
|
||||
let logFolder = storageFolder.appendingPathComponent("logs")
|
||||
print("Starting migration")
|
||||
try migrate(folder: logFolder)
|
||||
print("Finished migration")
|
||||
/*
|
||||
private let semaphore = DispatchSemaphore(value: 0)
|
||||
Task {
|
||||
try await configure(app)
|
||||
@ -13,3 +19,4 @@ Task {
|
||||
}
|
||||
semaphore.wait()
|
||||
try app.run()
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user