Compare commits

...

2 Commits

Author SHA1 Message Date
Christoph Hagen
62ae594e06 Convert database to actor 2023-11-01 11:56:02 +01:00
Christoph Hagen
f3c59e0f3f Remove uses of Task 2023-11-01 11:52:04 +01:00
5 changed files with 48 additions and 47 deletions

View File

@ -37,7 +37,7 @@ private extension Configuration.EMail {
} }
} }
final class SQLiteDatabase { actor SQLiteDatabase {
/// A mapping between player name and generated access tokens for a session /// A mapping between player name and generated access tokens for a session
private var sessionTokenForPlayer = [PlayerName: SessionToken]() private var sessionTokenForPlayer = [PlayerName: SessionToken]()
@ -207,18 +207,18 @@ final class SQLiteDatabase {
playerNameForToken[token] != nil playerNameForToken[token] != nil
} }
func startSession(socket: WebSocket, sessionToken token: SessionToken) -> Bool { func startSession(socket: WebSocket, sessionToken token: SessionToken) async -> Bool {
guard let player = playerNameForToken[token] else { guard let player = playerNameForToken[token] else {
return false return false
} }
return tables.connect(player: player, using: socket) return await tables.connect(player: player, using: socket)
} }
func endSession(forSessionToken sessionToken: SessionToken) { func endSession(forSessionToken sessionToken: SessionToken) async {
guard let player = endExistingSession(forSessionToken: sessionToken) else { guard let player = endExistingSession(forSessionToken: sessionToken) else {
return return
} }
tables.disconnect(player: player) await tables.disconnect(player: player)
} }
private func endExistingSession(forSessionToken token: SessionToken) -> PlayerName? { private func endExistingSession(forSessionToken token: SessionToken) -> PlayerName? {
@ -320,7 +320,7 @@ final class SQLiteDatabase {
return try await tables.play(card: card, player: player, in: database) return try await tables.play(card: card, player: player, in: database)
} }
func disconnectAllSockets() { func disconnectAllSockets() async {
tables.disconnectAllSockets() await tables.disconnectAllSockets()
} }
} }

View File

@ -172,25 +172,21 @@ final class TableManagement {
try await player.update(on: database) try await player.update(on: database)
} }
func connect(player: PlayerName, using socket: WebSocket) -> Bool { func connect(player: PlayerName, using socket: WebSocket) async -> Bool {
guard let table = currentTable(for: player) else { guard let table = currentTable(for: player) else {
return false return false
} }
let result = table.connect(player: player, using: socket) let result = table.connect(player: player, using: socket)
Task { await logConnectedPlayerCount()
await logConnectedPlayerCount()
}
return result return result
} }
func disconnect(player: PlayerName) { func disconnect(player: PlayerName) async {
guard let table = currentTable(for: player) else { guard let table = currentTable(for: player) else {
return return
} }
table.disconnect(player: player) table.disconnect(player: player)
Task { await logConnectedPlayerCount()
await logConnectedPlayerCount()
}
} }
func performAction(player: PlayerName, action: PlayerAction) -> PlayerActionResult { func performAction(player: PlayerName, action: PlayerAction) -> PlayerActionResult {
@ -257,10 +253,8 @@ final class TableManagement {
return .success return .success
} }
func disconnectAllSockets() { func disconnectAllSockets() async {
tables.values.forEach { $0.disconnectAllPlayers() } tables.values.forEach { $0.disconnectAllPlayers() }
Task { await logConnectedPlayerCount()
await logConnectedPlayerCount()
}
} }
} }

View File

@ -7,17 +7,27 @@ import ClairvoyantVapor
var server: SQLiteDatabase! var server: SQLiteDatabase!
private var provider: VaporMetricProvider! = nil private var provider: VaporMetricProvider! = nil
private let scheduler = EventLoopScheduler()
private var status: Metric<ServerStatus>! private var status: Metric<ServerStatus>!
private let scheduler = EventLoopScheduler()
private var configurationError: Error? = nil
private func update(status newStatus: ServerStatus) { public func configure(_ app: Application) throws {
let semaphore = DispatchSemaphore(value: 0)
scheduler.schedule { scheduler.schedule {
_ = try? await status.update(newStatus) do {
try await configureAsync(app)
} catch {
configurationError = error
}
semaphore.signal()
}
semaphore.wait()
if let configurationError {
throw configurationError
} }
} }
// configures your application private func configureAsync(_ app: Application) async throws {
public func configure(_ app: Application) async throws {
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory) let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
let logFolder = storageFolder.appendingPathComponent("logs") let logFolder = storageFolder.appendingPathComponent("logs")
@ -31,7 +41,7 @@ public func configure(_ app: Application) async throws {
name: "Status", name: "Status",
description: "The main status of the server") description: "The main status of the server")
update(status: .initializing) _ = try? await status.update(.initializing)
let configPath = URL(fileURLWithPath: app.directory.resourcesDirectory) let configPath = URL(fileURLWithPath: app.directory.resourcesDirectory)
.appendingPathComponent("config.json") .appendingPathComponent("config.json")
@ -40,7 +50,7 @@ public func configure(_ app: Application) async throws {
do { do {
configuration = try Configuration(loadFromUrl: configPath) configuration = try Configuration(loadFromUrl: configPath)
} catch { } catch {
update(status: .initializationFailure) _ = try? await status.update(.initializationFailure)
await monitor.log("Failed to read configuration: \(error)") await monitor.log("Failed to read configuration: \(error)")
// Note: If configuration can't be loaded, then the server will run on the wrong port // Note: If configuration can't be loaded, then the server will run on the wrong port
// and access to metrics is impossible, since no tokens are loaded // and access to metrics is impossible, since no tokens are loaded
@ -69,7 +79,7 @@ public func configure(_ app: Application) async throws {
try await app.autoMigrate() try await app.autoMigrate()
} catch { } catch {
await monitor.log("Failed to migrate database: \(error)") await monitor.log("Failed to migrate database: \(error)")
update(status: .initializationFailure) _ = try? await status.update(.initializationFailure)
return return
} }
@ -82,7 +92,9 @@ public func configure(_ app: Application) async throws {
// Gracefully shut down by closing potentially open socket // Gracefully shut down by closing potentially open socket
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + .seconds(5)) { DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + .seconds(5)) {
_ = app.server.onShutdown.always { _ in _ = app.server.onShutdown.always { _ in
server.disconnectAllSockets() scheduler.schedule {
await server.disconnectAllSockets()
}
} }
} }
@ -94,7 +106,7 @@ public func configure(_ app: Application) async throws {
provider.asyncScheduler = scheduler provider.asyncScheduler = scheduler
provider.registerRoutes(app) provider.registerRoutes(app)
update(status: .nominal) _ = try? await status.update(.nominal)
} }
func log(_ message: String) { func log(_ message: String) {

View File

@ -156,7 +156,7 @@ func loginPlayer(_ app: Application) {
guard try request.password.verify(password, created: hash) else { guard try request.password.verify(password, created: hash) else {
throw Abort(.unauthorized) // 401 throw Abort(.unauthorized) // 401
} }
return server.startNewSessionForRegisteredPlayer(named: name) return await server.startNewSessionForRegisteredPlayer(named: name)
} }
} }
@ -177,7 +177,7 @@ func resumeSession(_ app: Application) {
app.post("player", "resume") { request -> String in app.post("player", "resume") { request -> String in
let token = try request.header(.token) let token = try request.header(.token)
guard let player = server.registeredPlayerExists(withSessionToken: token) else { guard let player = await server.registeredPlayerExists(withSessionToken: token) else {
throw Abort(.unauthorized) // 401 throw Abort(.unauthorized) // 401
} }
return player return player
@ -199,7 +199,7 @@ func logoutPlayer(_ app: Application) {
app.post("player", "logout") { request -> HTTPResponseStatus in app.post("player", "logout") { request -> HTTPResponseStatus in
let token = try request.header(.token) let token = try request.header(.token)
server.endSession(forSessionToken: token) await server.endSession(forSessionToken: token)
return .ok return .ok
} }
} }
@ -221,10 +221,10 @@ func getTableForPlayer(_ app: Application) {
app.post("player", "table") { request -> String in app.post("player", "table") { request -> String in
let token = try request.header(.token) let token = try request.header(.token)
guard let player = server.registeredPlayerExists(withSessionToken: token) else { guard let player = await server.registeredPlayerExists(withSessionToken: token) else {
throw Abort(.unauthorized) // 401 throw Abort(.unauthorized) // 401
} }
guard let info = server.currentTableOfPlayer(named: player) else { guard let info = await server.currentTableOfPlayer(named: player) else {
return "" return ""
} }
return try encodeJSON(info) return try encodeJSON(info)
@ -239,8 +239,8 @@ func getTableForPlayer(_ app: Application) {
func openWebsocket(_ app: Application) { func openWebsocket(_ app: Application) {
app.webSocket("session", "start") { req, socket in app.webSocket("session", "start") { req, socket in
socket.onText { socket, text in socket.onText { socket, text in
guard server.startSession(socket: socket, sessionToken: text) else { guard await server.startSession(socket: socket, sessionToken: text) else {
_ = socket.close() try? await socket.close()
return return
} }
} }
@ -279,7 +279,7 @@ func createTable(_ app: Application) {
throw Abort(.badRequest) // 400 throw Abort(.badRequest) // 400
} }
guard let player = server.registeredPlayerExists(withSessionToken: token) else { guard let player = await server.registeredPlayerExists(withSessionToken: token) else {
throw Abort(.unauthorized) // 401 throw Abort(.unauthorized) // 401
} }
let result = try await server.createTable(named: tableName, player: player, isPublic: isPublic, in: request.db) let result = try await server.createTable(named: tableName, player: player, isPublic: isPublic, in: request.db)
@ -304,10 +304,10 @@ func getPublicTables(_ app: Application) {
app.post("tables", "public") { request -> String in app.post("tables", "public") { request -> String in
let token = try request.header(.token) let token = try request.header(.token)
guard server.isValid(sessionToken: token) else { guard await server.isValid(sessionToken: token) else {
throw Abort(.unauthorized) // 401 throw Abort(.unauthorized) // 401
} }
let list = server.getPublicTableInfos() let list = await server.getPublicTableInfos()
return try encodeJSON(list) return try encodeJSON(list)
} }
} }
@ -376,9 +376,9 @@ func performActionForPlayer(_ app: Application) {
let result: PlayerActionResult let result: PlayerActionResult
if let action = PlayerAction(rawValue: actionString) { if let action = PlayerAction(rawValue: actionString) {
result = server.performAction(playerToken: token, action: action) result = await server.performAction(playerToken: token, action: action)
} else if let game = GameType(rawValue: actionString) { } else if let game = GameType(rawValue: actionString) {
result = server.select(game: game, playerToken: token) result = await server.select(game: game, playerToken: token)
} else { } else {
throw Abort(.badRequest) throw Abort(.badRequest)
} }

View File

@ -6,10 +6,5 @@ try LoggingSystem.bootstrap(from: &env)
let app = Application(env) let app = Application(env)
defer { app.shutdown() } defer { app.shutdown() }
private let semaphore = DispatchSemaphore(value: 0) try configure(app)
Task {
try await configure(app)
semaphore.signal()
}
semaphore.wait()
try app.run() try app.run()