Compare commits

...

2 Commits

Author SHA1 Message Date
Christoph Hagen
a08dc4f175 Simplify scheduler 2023-12-06 10:02:43 +01:00
Christoph Hagen
e62ccb9241 Switch to new Vapor main 2023-12-06 10:01:19 +01:00
6 changed files with 61 additions and 96 deletions

View File

@ -20,7 +20,7 @@ let package = Package(
.package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.1"), .package(url: "https://github.com/christophhagen/ClairvoyantBinaryCodable", from: "0.3.1"),
], ],
targets: [ targets: [
.target( .executableTarget(
name: "App", name: "App",
dependencies: [ dependencies: [
.product(name: "Fluent", package: "fluent"), .product(name: "Fluent", package: "fluent"),
@ -30,18 +30,7 @@ let package = Package(
.product(name: "Clairvoyant", package: "Clairvoyant"), .product(name: "Clairvoyant", package: "Clairvoyant"),
.product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"), .product(name: "ClairvoyantVapor", package: "ClairvoyantVapor"),
.product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"), .product(name: "ClairvoyantBinaryCodable", package: "ClairvoyantBinaryCodable"),
], ]
swiftSettings: [ )
// Enable better optimizations when building in Release configuration. Despite the use of
// the `.unsafeFlags` construct required by SwiftPM, this flag is recommended for Release
// builds. See <https://github.com/swift-server/guides/blob/main/docs/building.md#building-for-production> for details.
.unsafeFlags(["-cross-module-optimization"], .when(configuration: .release))
]
),
.executableTarget(name: "Run", dependencies: [.target(name: "App")]),
// .testTarget(name: "AppTests", dependencies: [
// .target(name: "App"),
// .product(name: "XCTVapor", package: "vapor"),
// ])
] ]
) )

View File

@ -1,34 +1,10 @@
import Foundation import Foundation
import Clairvoyant
import Vapor import Vapor
import NIOCore import Clairvoyant
final class EventLoopScheduler { extension MultiThreadedEventLoopGroup: AsyncScheduler {
private let backgroundGroup: EventLoopGroup
init(numberOfThreads: Int = 2) {
backgroundGroup = MultiThreadedEventLoopGroup(numberOfThreads: numberOfThreads)
}
func next() -> EventLoop {
backgroundGroup.next()
}
func provider() -> NIOEventLoopGroupProvider {
return .shared(backgroundGroup)
}
func shutdown() {
backgroundGroup.shutdownGracefully { _ in
} public func schedule(asyncJob: @escaping @Sendable () async throws -> Void) {
} _ = any().makeFutureWithTask(asyncJob)
}
extension EventLoopScheduler: AsyncScheduler {
func schedule(asyncJob: @escaping @Sendable () async throws -> Void) {
_ = backgroundGroup.any().makeFutureWithTask(asyncJob)
} }
} }

View File

@ -8,26 +8,10 @@ var server: SQLiteDatabase!
private var provider: VaporMetricProvider! = nil private var provider: VaporMetricProvider! = nil
private var status: Metric<ServerStatus>! private var status: Metric<ServerStatus>!
private let scheduler = EventLoopScheduler() private let scheduler = MultiThreadedEventLoopGroup(numberOfThreads: 2)
private var configurationError: Error? = nil private var configurationError: Error? = nil
public func configure(_ app: Application) throws { func configure(_ app: Application) async throws {
let semaphore = DispatchSemaphore(value: 0)
scheduler.schedule {
do {
try await configureAsync(app)
} catch {
configurationError = error
}
semaphore.signal()
}
semaphore.wait()
if let configurationError {
throw configurationError
}
}
private func configureAsync(_ app: Application) async throws {
let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory) let storageFolder = URL(fileURLWithPath: app.directory.resourcesDirectory)
let configPath = URL(fileURLWithPath: app.directory.resourcesDirectory) let configPath = URL(fileURLWithPath: app.directory.resourcesDirectory)
@ -45,7 +29,7 @@ private func configureAsync(_ app: Application) async throws {
name: "Status", name: "Status",
description: "The main status of the server") description: "The main status of the server")
_ = try? await status.update(.initializing) try await status.update(.initializing)
app.http.server.configuration.port = configuration.serverPort app.http.server.configuration.port = configuration.serverPort
@ -69,7 +53,7 @@ private func configureAsync(_ app: Application) async throws {
try await app.autoMigrate() try await app.autoMigrate()
} catch { } catch {
await monitor.log("Failed to migrate database: \(error)") await monitor.log("Failed to migrate database: \(error)")
_ = try? await status.update(.initializationFailure) try await status.update(.initializationFailure)
return return
} }
@ -79,15 +63,6 @@ private func configureAsync(_ app: Application) async throws {
let db = app.databases.database(.sqlite, logger: .init(label: "Init"), on: app.databases.eventLoopGroup.next())! let db = app.databases.database(.sqlite, logger: .init(label: "Init"), on: app.databases.eventLoopGroup.next())!
server = try await SQLiteDatabase(database: db, mail: configuration.mail) server = try await SQLiteDatabase(database: db, mail: configuration.mail)
// Gracefully shut down by closing potentially open socket
DispatchQueue.global(qos: .utility).asyncAfter(deadline: .now() + .seconds(5)) {
_ = app.server.onShutdown.always { _ in
scheduler.schedule {
await server.disconnectAllSockets()
}
}
}
// register routes // register routes
routes(app) routes(app)
@ -96,7 +71,14 @@ private func configureAsync(_ app: Application) async throws {
provider.asyncScheduler = scheduler provider.asyncScheduler = scheduler
provider.registerRoutes(app) provider.registerRoutes(app)
_ = try? await status.update(.nominal) try await status.update(.nominal)
}
func shutdown() {
scheduler.schedule {
await server.disconnectAllSockets()
try await scheduler.shutdownGracefully()
}
} }
func log(_ message: String) { func log(_ message: String) {

View File

@ -0,0 +1,43 @@
import Vapor
import Dispatch
import Logging
/// This extension is temporary and can be removed once Vapor gets this support.
private extension Vapor.Application {
static let baseExecutionQueue = DispatchQueue(label: "vapor.codes.entrypoint")
func runFromAsyncMainEntrypoint() async throws {
try await withCheckedThrowingContinuation { continuation in
Vapor.Application.baseExecutionQueue.async { [self] in
do {
try self.run()
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}
}
}
@main
enum Entrypoint {
static func main() async throws {
var env = try Environment.detect()
try LoggingSystem.bootstrap(from: &env)
let app = Application(env)
defer {
shutdown()
app.shutdown()
}
do {
try await configure(app)
} catch {
app.logger.report(error: error)
throw error
}
try await app.runFromAsyncMainEntrypoint()
}
}

View File

@ -1,10 +0,0 @@
import App
import Vapor
var env = try Environment.detect()
try LoggingSystem.bootstrap(from: &env)
let app = Application(env)
defer { app.shutdown() }
try configure(app)
try app.run()

View File

@ -1,15 +0,0 @@
@testable import App
import XCTVapor
final class AppTests: XCTestCase {
func testHelloWorld() throws {
let app = Application(.testing)
defer { app.shutdown() }
try configure(app)
try app.test(.GET, "hello", afterResponse: { res in
XCTAssertEqual(res.status, .ok)
XCTAssertEqual(res.body.string, "Hello, world!")
})
}
}