Attempt to fix socket data processing

This commit is contained in:
Christoph Hagen 2023-12-08 19:40:49 +01:00
parent d9bd0c6e30
commit 160c9a1a97
2 changed files with 75 additions and 20 deletions

View File

@ -86,7 +86,7 @@ enum MessageResult: UInt8 {
/// A valid server challenge was received /// A valid server challenge was received
case deviceAvailable = 37 case deviceAvailable = 37
case invalidSignatureByDevice = 38 case invalidSignatureFromDevice = 38
case invalidMessageTypeFromDevice = 39 case invalidMessageTypeFromDevice = 39
@ -162,7 +162,7 @@ extension MessageResult: CustomStringConvertible {
return "Unexpected server response code" return "Unexpected server response code"
case .deviceAvailable: case .deviceAvailable:
return "Device available" return "Device available"
case .invalidSignatureByDevice: case .invalidSignatureFromDevice:
return "Invalid device signature" return "Invalid device signature"
case .invalidMessageTypeFromDevice: case .invalidMessageTypeFromDevice:
return "Message type from device invalid" return "Message type from device invalid"

View File

@ -4,20 +4,20 @@ import Vapor
import Clairvoyant import Clairvoyant
final class DeviceManager { final class DeviceManager {
/// The connection to the device /// The connection to the device
private var connection: WebSocket? private var connection: WebSocket?
/// The authentication token of the device for the socket connection /// The authentication token of the device for the socket connection
private let deviceKey: Data private let deviceKey: Data
/// The authentication token of the remote /// The authentication token of the remote
private let remoteKey: Data private let remoteKey: Data
private let deviceTimeout: Int64 private let deviceTimeout: Int64
private let deviceConnectedMetric: Metric<Bool> private let deviceConnectedMetric: Metric<Bool>
private let messagesToDeviceMetric: Metric<Int> private let messagesToDeviceMetric: Metric<Int>
let serverStatus: Metric<ServerStatus> let serverStatus: Metric<ServerStatus>
@ -31,6 +31,7 @@ final class DeviceManager {
/// A promise to finish the request once the device responds or times out /// A promise to finish the request once the device responds or times out
private var requestInProgress: CheckedContinuation<Data, Error>? private var requestInProgress: CheckedContinuation<Data, Error>?
private var receivedMessageData: Data?
init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64, serverStatus: Metric<ServerStatus>) { init(deviceKey: Data, remoteKey: Data, deviceTimeout: Int64, serverStatus: Metric<ServerStatus>) {
self.deviceKey = deviceKey self.deviceKey = deviceKey
@ -51,11 +52,11 @@ final class DeviceManager {
_ = try? await serverStatus.update(deviceIsConnected ? .nominal : .reducedFunctionality) _ = try? await serverStatus.update(deviceIsConnected ? .nominal : .reducedFunctionality)
await updateDeviceConnectionMetric() await updateDeviceConnectionMetric()
} }
private func updateDeviceConnectionMetric() async { private func updateDeviceConnectionMetric() async {
_ = try? await deviceConnectedMetric.update(deviceIsConnected) _ = try? await deviceConnectedMetric.update(deviceIsConnected)
} }
private func updateMessageCountMetric() async { private func updateMessageCountMetric() async {
let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0 let lastValue = await messagesToDeviceMetric.lastValue()?.value ?? 0
_ = try? await messagesToDeviceMetric.update(lastValue + 1) _ = try? await messagesToDeviceMetric.update(lastValue + 1)
@ -74,9 +75,11 @@ final class DeviceManager {
connection = nil connection = nil
throw MessageResult.deviceNotConnected throw MessageResult.deviceNotConnected
} }
guard requestInProgress == nil else { guard receivedMessageData == nil else {
throw MessageResult.tooManyRequests throw MessageResult.tooManyRequests
} }
// Indicate that a message is in transit
receivedMessageData = Data()
do { do {
try await socket.send(Array(message)) try await socket.send(Array(message))
} catch { } catch {
@ -84,6 +87,12 @@ final class DeviceManager {
} }
startTimeoutForDeviceRequest(on: eventLoop) startTimeoutForDeviceRequest(on: eventLoop)
// Check if a full message has already been received
if let receivedMessageData, receivedMessageData.count == SignedMessage.size {
self.receivedMessageData = nil
return receivedMessageData
}
// Wait until a fill message is received, or a timeout occurs
let result: Data = try await withCheckedThrowingContinuation { continuation in let result: Data = try await withCheckedThrowingContinuation { continuation in
self.requestInProgress = continuation self.requestInProgress = continuation
} }
@ -98,13 +107,44 @@ final class DeviceManager {
} }
private func resumeDeviceRequest(with data: Data) { private func resumeDeviceRequest(with data: Data) {
requestInProgress?.resume(returning: data) guard let receivedMessageData else {
requestInProgress = nil print("[WARN] Received \(data.count) bytes after message completion")
self.requestInProgress = nil
return
}
let newData = receivedMessageData + data
if newData.count < SignedMessage.size {
// Wait for more data
self.receivedMessageData = newData
return
}
self.receivedMessageData = nil
guard let requestInProgress else {
print("[WARN] Received \(newData.count) bytes, but no continuation to resume")
return
}
self.requestInProgress = nil
guard newData.count == SignedMessage.size else {
print("[WARN] Received \(newData.count) bytes, expected \(SignedMessage.size) for a message.")
requestInProgress.resume(throwing: MessageResult.invalidMessageSizeFromDevice)
return
}
requestInProgress.resume(returning: newData)
} }
private func resumeDeviceRequest(with result: MessageResult) { private func resumeDeviceRequest(with result: MessageResult) {
requestInProgress?.resume(throwing: result) guard let receivedMessageData else {
requestInProgress = nil print("[WARN] Result after message completed: \(result)")
self.requestInProgress = nil
return
}
self.receivedMessageData = nil
guard let requestInProgress else {
print("[WARN] Request in progress (\(receivedMessageData.count) bytes), but no continuation found for result: \(result)")
return
}
self.requestInProgress = nil
requestInProgress.resume(throwing: result)
} }
func authenticateRemote(_ token: Data) -> Bool { func authenticateRemote(_ token: Data) -> Bool {
@ -114,7 +154,7 @@ final class DeviceManager {
func processDeviceResponse(_ buffer: ByteBuffer) { func processDeviceResponse(_ buffer: ByteBuffer) {
guard let data = buffer.getData(at: 0, length: buffer.readableBytes) else { guard let data = buffer.getData(at: 0, length: buffer.readableBytes) else {
log("Failed to get data buffer received from device") print("Failed to get data buffer received from device")
self.resumeDeviceRequest(with: .invalidMessageSizeFromDevice) self.resumeDeviceRequest(with: .invalidMessageSizeFromDevice)
return return
} }
@ -140,7 +180,7 @@ final class DeviceManager {
func createNewDeviceConnection(socket: WebSocket, auth: String) async { func createNewDeviceConnection(socket: WebSocket, auth: String) async {
guard let key = Data(fromHexEncodedString: auth), guard let key = Data(fromHexEncodedString: auth),
SHA256.hash(data: key) == self.deviceKey else { SHA256.hash(data: key) == self.deviceKey else {
log("Invalid device key while opening socket") log("[WARN] Invalid device key while opening socket")
try? await socket.close() try? await socket.close()
return return
} }
@ -150,17 +190,32 @@ final class DeviceManager {
socket.eventLoop.execute { socket.eventLoop.execute {
socket.pingInterval = .seconds(10) socket.pingInterval = .seconds(10)
socket.onText { socket, text in socket.onText { [weak self] socket, text in
print("[WARN] Received text over socket: \(text)") print("[WARN] Received text over socket: \(text)")
// Close connection to prevent spamming the log // Close connection to prevent spamming the log
try? await socket.close() try? await socket.close()
guard let self else {
print("[WARN] No reference to self to handle text over socket")
return
}
self.didCloseDeviceSocket()
} }
socket.onBinary { [weak self] _, data in socket.onBinary { [weak self] _, data in
self?.processDeviceResponse(data) guard let self else {
print("[WARN] No reference to self to process binary data on socket")
return
}
self.processDeviceResponse(data)
} }
socket.onClose.whenComplete { [weak self] _ in socket.onClose.whenComplete { [weak self] _ in
self?.didCloseDeviceSocket() guard let self else {
print("[WARN] No reference to self to handle socket closing")
return
}
self.didCloseDeviceSocket()
} }
} }
log("[INFO] Socket connected") log("[INFO] Socket connected")