OpenVPN: Resolve issues with long negotiations (#1094)

Clean up the naive abuse of async/await in OpenVPNSession. Encapsulate
the instances of ControlChannel/DataChannel inside the Negotiator actor,
so that actor-isolation for them becomes automatically unnecessary.
Synchronous methods inside the actor are the way to go.

After that, handle control packets in orderly fashion, because this is
not being done and may spoil negotiation very easily. Probably also
happening in TunnelKit.

Ultimately, skip some unnecessary XOR processing in UDP when no XOR
method is actually set.
This commit is contained in:
Davide 2025-01-22 22:33:29 +01:00 committed by GitHub
parent f76b1e26ba
commit 8ab7b0d143
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 207 additions and 193 deletions

View File

@ -27,7 +27,7 @@ internal import CPassepartoutOpenVPNOpenSSL
import Foundation
import PassepartoutKit
actor ControlChannel {
final class ControlChannel {
private let prng: PRNGProtocol
private let serializer: ControlChannelSerializer
@ -52,11 +52,11 @@ actor ControlChannel {
private var sentDates: [UInt32: Date]
init(prng: PRNGProtocol) {
convenience init(prng: PRNGProtocol) {
self.init(prng: prng, serializer: PlainSerializer())
}
init(
convenience init(
prng: PRNGProtocol,
crypto: OpenVPNCryptoProtocol,
authKey key: OpenVPN.StaticKey,
@ -65,7 +65,7 @@ actor ControlChannel {
self.init(prng: prng, serializer: try AuthSerializer(with: crypto, key: key, digest: digest))
}
init(
convenience init(
prng: PRNGProtocol,
crypto: OpenVPNCryptoProtocol,
cryptKey key: OpenVPN.StaticKey

View File

@ -27,7 +27,7 @@ internal import CPassepartoutOpenVPNOpenSSL
import Foundation
import PassepartoutKit
actor DataChannel {
final class DataChannel {
let key: UInt8
private let dataPath: DataPath

View File

@ -27,7 +27,7 @@ internal import CPassepartoutOpenVPNOpenSSL
import Foundation
import PassepartoutKit
actor Negotiator {
final class Negotiator {
struct Options {
let configuration: OpenVPN.Configuration
@ -44,6 +44,8 @@ actor Negotiator {
let onError: (UInt8, Error) async -> Void
}
private let parser = StandardOpenVPNParser()
let key: UInt8 // 3-bit
private(set) var history: NegotiationHistory?
@ -52,7 +54,7 @@ actor Negotiator {
let link: LinkInterface
let channel: ControlChannel
private var channel: ControlChannel
private let prng: PRNGProtocol
@ -76,6 +78,10 @@ actor Negotiator {
private let tlsBox: OpenVPNTLSProtocol
private var expectedPacketId: UInt32
private var pendingPackets: [UInt32: ControlPacket]
private var authenticator: Authenticator?
private var nextPushRequestDate: Date?
@ -86,7 +92,7 @@ actor Negotiator {
// MARK: Init
init(
convenience init(
link: LinkInterface,
channel: ControlChannel,
prng: PRNGProtocol,
@ -132,6 +138,8 @@ actor Negotiator {
negotiationTimeout = renegotiation != nil ? options.sessionOptions.softNegotiationTimeout : options.sessionOptions.negotiationTimeout
state = .idle
tlsBox = tlsFactory()
expectedPacketId = 0
pendingPackets = [:]
}
func forRenegotiation(initiatedBy newRenegotiation: RenegotiationType) -> Negotiator {
@ -165,21 +173,21 @@ extension Negotiator {
renegotiation != nil && state != .connected
}
func start() async throws {
await channel.reset(forNewSession: renegotiation == nil)
func start() throws {
channel.reset(forNewSession: renegotiation == nil)
// schedule this repeatedly
try await checkNegotiationComplete()
try checkNegotiationComplete()
switch renegotiation {
case .client:
try await enqueueControlPackets(code: .softResetV1, key: key, payload: Data())
try enqueueControlPackets(code: .softResetV1, key: key, payload: Data())
case .server:
break
default:
try await enqueueControlPackets(code: .hardResetClientV2, key: key, payload: hardResetPayload() ?? Data())
try enqueueControlPackets(code: .hardResetClientV2, key: key, payload: hardResetPayload() ?? Data())
}
}
@ -187,8 +195,41 @@ extension Negotiator {
checkNegotiationTask?.cancel()
}
func readPacket(_ packet: ControlPacket) async throws {
try await handleControlPacket(packet)
func readInboundPacket(withData packet: Data, offset: Int) throws -> ControlPacket {
try channel.readInboundPacket(withData: packet, offset: 0)
}
func enqueueInboundPacket(packet controlPacket: ControlPacket) -> [ControlPacket] {
channel.enqueueInboundPacket(packet: controlPacket)
}
func handleControlPacket(_ packet: ControlPacket) throws {
guard packet.packetId >= expectedPacketId else {
return
}
if packet.packetId > expectedPacketId {
pendingPackets[packet.packetId] = packet
return
}
try privateHandleControlPacket(packet)
expectedPacketId += 1
while let packet = pendingPackets[expectedPacketId] {
try privateHandleControlPacket(packet)
pendingPackets.removeValue(forKey: packet.packetId)
expectedPacketId += 1
}
}
func handleAcks() {
//
}
func sendAck(for controlPacket: ControlPacket, to link: LinkInterface) {
Task {
try await privateSendAck(for: controlPacket, to: link)
}
}
func shouldRenegotiate() -> Bool {
@ -226,7 +267,7 @@ private extension Negotiator {
return nil
}
func checkNegotiationComplete() async throws {
func checkNegotiationComplete() throws {
guard !didHardResetTimeout else {
throw OpenVPNSessionError.recoverable(OpenVPNSessionError.negotiationTimeout)
}
@ -235,10 +276,10 @@ private extension Negotiator {
}
if !isRenegotiating {
try await pushRequest()
try pushRequest()
}
if !link.isReliable {
try await flushControlQueue()
try flushControlQueue()
}
guard state == .connected else {
@ -252,7 +293,7 @@ private extension Negotiator {
return
}
do {
try await checkNegotiationComplete()
try checkNegotiationComplete()
} catch {
await options.onError(key, error)
}
@ -263,7 +304,7 @@ private extension Negotiator {
// let loop die when negotiation is complete
}
func pushRequest() async throws {
func pushRequest() throws {
guard state == .push else {
return
}
@ -287,25 +328,25 @@ private extension Negotiator {
}
pp_log(.openvpn, .info, "TLS.ifconfig: Send pulled ciphertext \(cipherTextOut.asSensitiveBytes)")
try await enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
try enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
self.nextPushRequestDate = Date().addingTimeInterval(options.sessionOptions.pushRequestInterval)
}
func enqueueControlPackets(code: PacketCode, key: UInt8, payload: Data) async throws {
try await channel.enqueueOutboundPackets(
func enqueueControlPackets(code: PacketCode, key: UInt8, payload: Data) throws {
try channel.enqueueOutboundPackets(
withCode: code,
key: key,
payload: payload,
maxPacketSize: Constants.maxPacketSize
)
try await flushControlQueue()
try flushControlQueue()
}
func flushControlQueue() async throws {
func flushControlQueue() throws {
let rawList: [Data]
do {
rawList = try await channel.writeOutboundPackets(resendAfter: options.sessionOptions.retxInterval)
rawList = try channel.writeOutboundPackets(resendAfter: options.sessionOptions.retxInterval)
} catch {
pp_log(.openvpn, .error, "Failed control packet serialization: \(error)")
throw error
@ -313,14 +354,16 @@ private extension Negotiator {
guard !rawList.isEmpty else {
return
}
do {
for raw in rawList {
pp_log(.openvpn, .info, "Send control packet \(raw.asSensitiveBytes)")
for raw in rawList {
pp_log(.openvpn, .info, "Send control packet \(raw.asSensitiveBytes)")
}
Task {
do {
try await link.writePackets(rawList)
} catch {
pp_log(.openvpn, .error, "Failed LINK write during control flush: \(error)")
await options.onError(key, PassepartoutError(.linkFailure, error))
}
try await link.writePackets(rawList)
} catch {
pp_log(.openvpn, .error, "Failed LINK write during control flush: \(error)")
throw PassepartoutError(.linkFailure)
}
}
}
@ -328,7 +371,7 @@ private extension Negotiator {
// MARK: - Inbound
private extension Negotiator {
func handleControlPacket(_ packet: ControlPacket) async throws {
func privateHandleControlPacket(_ packet: ControlPacket) throws {
guard packet.key == key else {
pp_log(.openvpn, .error, "Bad key in control packet (\(packet.key) != \(key))")
return
@ -343,9 +386,9 @@ private extension Negotiator {
if isRenegotiating {
pp_log(.openvpn, .error, "Sent SOFT_RESET but received HARD_RESET?")
}
await channel.setRemoteSessionId(packet.sessionId)
channel.setRemoteSessionId(packet.sessionId)
}
guard let remoteSessionId = await channel.remoteSessionId else {
guard let remoteSessionId = channel.remoteSessionId else {
let error = OpenVPNSessionError.missingSessionId
pp_log(.openvpn, .fault, "No remote sessionId (never set): \(error)")
throw error
@ -381,13 +424,13 @@ private extension Negotiator {
}
pp_log(.openvpn, .info, "TLS.connect: Pulled ciphertext \(cipherTextOut.asSensitiveBytes)")
try await enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
try enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
case .tls, .auth, .push, .connected:
guard packet.code == .controlV1 else {
return
}
guard let remoteSessionId = await channel.remoteSessionId else {
guard let remoteSessionId = channel.remoteSessionId else {
let error = OpenVPNSessionError.missingSessionId
pp_log(.openvpn, .fault, "No remote sessionId found in packet (control packets before server HARD_RESET): \(error)")
throw error
@ -402,14 +445,14 @@ private extension Negotiator {
return
}
pp_log(.openvpn, .info, "TLS.connect: Put received ciphertext \(cipherTextIn.asSensitiveBytes)")
pp_log(.openvpn, .info, "TLS.connect: Put received ciphertext [\(packet.packetId)] \(cipherTextIn.asSensitiveBytes)")
try? tlsBox.putCipherText(cipherTextIn)
let cipherTextOut: Data
do {
cipherTextOut = try tlsBox.pullCipherText()
pp_log(.openvpn, .info, "TLS.connect: Send pulled ciphertext \(cipherTextOut.asSensitiveBytes)")
try await enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
try enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
} catch {
if let nativeError = error.asNativeOpenVPNError {
pp_log(.openvpn, .fault, "TLS.connect: Failed pulling ciphertext: \(nativeError)")
@ -422,19 +465,35 @@ private extension Negotiator {
pp_log(.openvpn, .info, "TLS.connect: Handshake is complete")
state = .auth
try await onTLSConnect()
try onTLSConnect()
}
do {
while true {
let controlData = try await channel.currentControlData(withTLS: tlsBox)
try await handleControlData(controlData)
let controlData = try channel.currentControlData(withTLS: tlsBox)
try handleControlData(controlData)
}
} catch {
}
}
}
func onTLSConnect() async throws {
func privateSendAck(for controlPacket: ControlPacket, to link: LinkInterface) async throws {
do {
pp_log(.openvpn, .info, "Send ack for received packetId \(controlPacket.packetId)")
let raw = try channel.writeAcks(
withKey: controlPacket.key,
ackPacketIds: [controlPacket.packetId],
ackRemoteSessionId: controlPacket.sessionId
)
try await link.writePackets([raw])
pp_log(.openvpn, .info, "Ack successfully written to LINK for packetId \(controlPacket.packetId)")
} catch {
pp_log(.openvpn, .error, "Failed LINK write during send ack for packetId \(controlPacket.packetId): \(error)")
await options.onError(key, PassepartoutError(.linkFailure, error))
}
}
func onTLSConnect() throws {
authenticator = Authenticator(
prng: prng,
options.credentials?.username,
@ -456,10 +515,10 @@ private extension Negotiator {
}
pp_log(.openvpn, .info, "TLS.auth: Pulled ciphertext \(cipherTextOut.asSensitiveBytes)")
try await enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
try enqueueControlPackets(code: .controlV1, key: key, payload: cipherTextOut)
}
func handleControlData(_ data: ZeroingData) async throws {
func handleControlData(_ data: ZeroingData) throws {
guard let authenticator else {
return
}
@ -479,7 +538,7 @@ private extension Negotiator {
pp_log(.openvpn, .fault, "Renegotiating connection without former history")
throw OpenVPNSessionError.assertion
}
try await completeConnection(pushReply: pushReply)
try completeConnection(pushReply: pushReply)
return
}
@ -490,15 +549,17 @@ private extension Negotiator {
for message in authenticator.parseMessages() {
pp_log(.openvpn, .info, "Parsed control message \(message.asSensitiveBytes)")
do {
try await handleControlMessage(message)
try handleControlMessage(message)
} catch {
await options.onError(key, error)
Task {
await options.onError(key, error)
}
throw error
}
}
}
func handleControlMessage(_ message: String) async throws {
func handleControlMessage(_ message: String) throws {
pp_log(.openvpn, .info, "Received control message \(message.asSensitiveBytes)")
// disconnect on authentication failure
@ -532,7 +593,7 @@ private extension Negotiator {
}
let reply: PushReply
do {
guard let optionalReply = try StandardOpenVPNParser().pushReply(with: completeMessage) else {
guard let optionalReply = try parser.pushReply(with: completeMessage) else {
return
}
reply = optionalReply
@ -570,26 +631,28 @@ private extension Negotiator {
return
}
state = .connected
try await completeConnection(pushReply: reply)
try completeConnection(pushReply: reply)
}
}
private extension Negotiator {
func completeConnection(pushReply: PushReply) async throws {
func completeConnection(pushReply: PushReply) throws {
pp_log(.openvpn, .info, "Complete connection of key \(key)")
let history = NegotiationHistory(pushReply: pushReply)
let dataChannel = try await newDataChannel(with: history)
let dataChannel = try newDataChannel(with: history)
self.history = history
authenticator?.reset()
await options.onConnected(key, dataChannel, pushReply)
Task {
await options.onConnected(key, dataChannel, pushReply)
}
}
func newDataChannel(with history: NegotiationHistory) async throws -> DataChannel {
guard let sessionId = await channel.sessionId else {
func newDataChannel(with history: NegotiationHistory) throws -> DataChannel {
guard let sessionId = channel.sessionId else {
pp_log(.openvpn, .fault, "Setting up connection without a local sessionId")
throw OpenVPNSessionError.assertion
}
guard let remoteSessionId = await channel.remoteSessionId else {
guard let remoteSessionId = channel.remoteSessionId else {
pp_log(.openvpn, .fault, "Setting up connection without a remote sessionId")
throw OpenVPNSessionError.assertion
}

View File

@ -1,54 +0,0 @@
//
// OpenVPNSession+Acks.swift
// PassepartoutKit
//
// Created by Davide De Rosa on 3/28/24.
// Copyright (c) 2025 Davide De Rosa. All rights reserved.
//
// https://github.com/passepartoutvpn
//
// This file is part of PassepartoutKit.
//
// PassepartoutKit is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// PassepartoutKit is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with PassepartoutKit. If not, see <http://www.gnu.org/licenses/>.
//
internal import CPassepartoutOpenVPNOpenSSL
import Foundation
import PassepartoutKit
extension OpenVPNSession {
func handleAcks() {
//
}
func sendAck(
for controlPacket: ControlPacket,
to link: LinkInterface,
channel: ControlChannel
) async throws {
do {
pp_log(.openvpn, .info, "Send ack for received packetId \(controlPacket.packetId)")
let raw = try await channel.writeAcks(
withKey: controlPacket.key,
ackPacketIds: [controlPacket.packetId],
ackRemoteSessionId: controlPacket.sessionId
)
try await link.writePackets([raw])
pp_log(.openvpn, .info, "Ack successfully written to LINK for packetId \(controlPacket.packetId)")
} catch {
pp_log(.openvpn, .error, "Failed LINK write during send ack for packetId \(controlPacket.packetId): \(error)")
throw PassepartoutError(.linkFailure)
}
}
}

View File

@ -31,22 +31,24 @@ extension OpenVPNSession {
_ packets: [Data],
to tunnel: TunnelInterface,
dataChannel: DataChannel
) async throws {
do {
guard let decryptedPackets = try await dataChannel.decrypt(packets: packets) else {
pp_log(.openvpn, .error, "Unable to decrypt packets, is SessionKey properly configured (dataPath, peerId)?")
return
) {
Task {
do {
guard let decryptedPackets = try dataChannel.decrypt(packets: packets) else {
pp_log(.openvpn, .error, "Unable to decrypt packets, is SessionKey properly configured (dataPath, peerId)?")
return
}
guard !decryptedPackets.isEmpty else {
return
}
reportInboundDataCount(decryptedPackets.flatCount)
try await tunnel.writePackets(decryptedPackets)
} catch {
if let nativeError = error.asNativeOpenVPNError {
throw nativeError
}
throw OpenVPNSessionError.recoverable(error)
}
guard !decryptedPackets.isEmpty else {
return
}
reportInboundDataCount(decryptedPackets.flatCount)
try await tunnel.writePackets(decryptedPackets)
} catch {
if let nativeError = error.asNativeOpenVPNError {
throw nativeError
}
throw OpenVPNSessionError.recoverable(error)
}
}
@ -54,23 +56,25 @@ extension OpenVPNSession {
_ packets: [Data],
to link: LinkInterface,
dataChannel: DataChannel
) async throws {
do {
guard let encryptedPackets = try await dataChannel.encrypt(packets: packets) else {
pp_log(.openvpn, .error, "Unable to encrypt packets, is SessionKey properly configured (dataPath, peerId)?")
return
) {
Task {
do {
guard let encryptedPackets = try dataChannel.encrypt(packets: packets) else {
pp_log(.openvpn, .error, "Unable to encrypt packets, is SessionKey properly configured (dataPath, peerId)?")
return
}
guard !encryptedPackets.isEmpty else {
return
}
reportOutboundDataCount(encryptedPackets.flatCount)
try await link.writePackets(encryptedPackets)
} catch {
if let nativeError = error.asNativeOpenVPNError {
throw nativeError
}
pp_log(.openvpn, .error, "Data: Failed LINK write during send data: \(error)")
await shutdown(PassepartoutError(.linkFailure, error))
}
guard !encryptedPackets.isEmpty else {
return
}
reportOutboundDataCount(encryptedPackets.flatCount)
try await link.writePackets(encryptedPackets)
} catch {
if let nativeError = error.asNativeOpenVPNError {
throw nativeError
}
pp_log(.openvpn, .error, "Data: Failed LINK write during send data: \(error)")
throw PassepartoutError(.linkFailure)
}
}
}

View File

@ -60,7 +60,7 @@ extension OpenVPNSession {
}
if let error {
pp_log(.openvpn, .error, "Failed LINK read: \(error)")
await shutdown(PassepartoutError(.linkFailure))
await shutdown(PassepartoutError(.linkFailure, error))
}
guard let packets, !packets.isEmpty else {
return
@ -78,7 +78,7 @@ extension OpenVPNSession {
// MARK: - Private
private extension OpenVPNSession {
func receiveLink(packets: [Data]) async throws {
func receiveLink(packets: [Data]) throws {
guard !isStopped, let link else {
return
}
@ -90,8 +90,8 @@ private extension OpenVPNSession {
pp_log(.openvpn, .fault, "No negotiator")
throw OpenVPNSessionError.assertion
}
if await negotiator.shouldRenegotiate() {
negotiator = try await startRenegotiation(after: negotiator, on: link, isServerInitiated: false)
if negotiator.shouldRenegotiate() {
negotiator = try startRenegotiation(after: negotiator, on: link, isServerInitiated: false)
}
for packet in packets {
@ -131,8 +131,8 @@ private extension OpenVPNSession {
let controlPacket: ControlPacket
do {
let parsedPacket = try await negotiator.channel.readInboundPacket(withData: packet, offset: 0)
handleAcks()
let parsedPacket = try negotiator.readInboundPacket(withData: packet, offset: 0)
negotiator.handleAcks()
if parsedPacket.code == .ackV1 {
continue
}
@ -145,28 +145,26 @@ private extension OpenVPNSession {
case .hardResetServerV2:
// HARD_RESET coming while connected
guard await !negotiator.isConnected else {
guard !negotiator.isConnected else {
throw OpenVPNSessionError.recoverable(OpenVPNSessionError.staleSession)
}
case .softResetV1:
if await !negotiator.isRenegotiating {
negotiator = try await startRenegotiation(after: negotiator, on: link, isServerInitiated: true)
if !negotiator.isRenegotiating {
negotiator = try startRenegotiation(after: negotiator, on: link, isServerInitiated: true)
}
default:
break
}
try await sendAck(
for: controlPacket,
to: link,
channel: negotiator.channel
)
negotiator.sendAck(for: controlPacket, to: link)
let pendingInboundQueue = await negotiator.channel.enqueueInboundPacket(packet: controlPacket)
let pendingInboundQueue = negotiator.enqueueInboundPacket(packet: controlPacket)
pp_log(.openvpn, .debug, "Pending inbound queue: \(pendingInboundQueue.map(\.packetId))")
for inboundPacket in pendingInboundQueue {
try await negotiator.readPacket(inboundPacket)
pp_log(.openvpn, .debug, "Handle packet: \(inboundPacket.packetId)")
try negotiator.handleControlPacket(inboundPacket)
}
}
@ -177,7 +175,7 @@ private extension OpenVPNSession {
pp_log(.openvpn, .error, "Accounted a data packet for which the cryptographic key hadn't been found")
continue
}
try await handleDataPackets(
handleDataPackets(
dataPackets,
to: tunnel,
dataChannel: dataChannel
@ -186,7 +184,7 @@ private extension OpenVPNSession {
}
}
func receiveTunnel(packets: [Data]) async throws {
func receiveTunnel(packets: [Data]) throws {
guard !isStopped else {
return
}
@ -194,13 +192,13 @@ private extension OpenVPNSession {
pp_log(.openvpn, .fault, "No negotiator")
throw OpenVPNSessionError.assertion
}
guard await negotiator.isConnected, let currentDataChannel else {
guard negotiator.isConnected, let currentDataChannel else {
return
}
try checkPingTimeout()
try await sendDataPackets(
sendDataPackets(
packets,
to: negotiator.link,
dataChannel: currentDataChannel

View File

@ -29,12 +29,12 @@ import PassepartoutKit
extension OpenVPNSession {
@discardableResult
func startNegotiation(on link: LinkInterface) async throws -> Negotiator {
func startNegotiation(on link: LinkInterface) throws -> Negotiator {
pp_log(.openvpn, .info, "Start negotiation")
let neg = newNegotiator(on: link)
addNegotiator(neg)
loopLink()
try await neg.start()
try neg.start()
return neg
}
@ -42,8 +42,8 @@ extension OpenVPNSession {
after negotiator: Negotiator,
on link: LinkInterface,
isServerInitiated: Bool
) async throws -> Negotiator {
guard await !negotiator.isRenegotiating else {
) throws -> Negotiator {
guard !negotiator.isRenegotiating else {
pp_log(.openvpn, .error, "Renegotiation already in progress")
return negotiator
}
@ -52,9 +52,9 @@ extension OpenVPNSession {
} else {
pp_log(.openvpn, .notice, "Renegotiation request from client")
}
let neg = await negotiator.forRenegotiation(initiatedBy: isServerInitiated ? .server : .client)
let neg = negotiator.forRenegotiation(initiatedBy: isServerInitiated ? .server : .client)
addNegotiator(neg)
try await neg.start()
try neg.start()
return neg
}
}

View File

@ -196,7 +196,7 @@ extension OpenVPNSession: OpenVPNSessionProtocol {
self.link = link
sessionState = .starting
try await startNegotiation(on: link)
try startNegotiation(on: link)
}
func hasLink() async -> Bool {
@ -221,7 +221,7 @@ extension OpenVPNSession: OpenVPNSessionProtocol {
let link, !link.isReliable,
let currentDataChannel {
do {
if let packets = try await currentDataChannel.encrypt(packets: [OCCPacket.exit.serialized()]) {
if let packets = try currentDataChannel.encrypt(packets: [OCCPacket.exit.serialized()]) {
pp_log(.openvpn, .info, "Send OCCPacket exit")
let timeoutMillis = Int((timeout ?? options.writeTimeout) * 1000.0)
@ -262,7 +262,7 @@ private extension OpenVPNSession {
func cleanup() async {
link?.shutdown()
for neg in negotiators.values {
await neg.cancel()
neg.cancel()
}
negotiators.removeAll()
dataChannels.removeAll()
@ -453,7 +453,7 @@ private extension OpenVPNSession {
}
}
func ping() async throws {
func ping() throws {
guard !isStopped else {
pp_log(.openvpn, .debug, "Ping cancelled, session stopped")
return
@ -473,7 +473,7 @@ private extension OpenVPNSession {
// is keep-alive enabled?
if keepAliveInterval != nil {
pp_log(.openvpn, .debug, "Send ping")
try await sendDataPackets(
sendDataPackets(
[ProtocolMacros.pingString],
to: link,
dataChannel: currentDataChannel

View File

@ -31,17 +31,17 @@ import PassepartoutKit
final class OpenVPNUDPLink {
private let link: LinkInterface
private let xor: XORProcessor
private let xor: XORProcessor?
/// - Parameters:
/// - link: The underlying socket.
/// - xorMethod: The optional XOR method.
convenience init(link: LinkInterface, xorMethod: OpenVPN.XORMethod?) {
precondition(link.linkType.plainType == .udp)
self.init(link: link, xor: XORProcessor(method: xorMethod))
self.init(link: link, xor: xorMethod.map(XORProcessor.init(method:)))
}
init(link: LinkInterface, xor: XORProcessor) {
init(link: LinkInterface, xor: XORProcessor?) {
self.link = link
self.xor = xor
}
@ -80,19 +80,28 @@ extension OpenVPNUDPLink: LinkInterface {
extension OpenVPNUDPLink {
func setReadHandler(_ handler: @escaping ([Data]?, Error?) -> Void) {
link.setReadHandler { [weak self] packets, error in
guard let self else {
guard let self, let packets, !packets.isEmpty else {
return
}
var processedPackets: [Data]?
if let packets {
processedPackets = xor.processPackets(packets, outbound: false)
if let xor {
let processedPackets = xor.processPackets(packets, outbound: false)
handler(processedPackets, error)
return
}
handler(processedPackets, error)
handler(packets, error)
}
}
func writePackets(_ packets: [Data]) async throws {
let processedPackets = xor.processPackets(packets, outbound: true)
try await link.writePackets(processedPackets)
guard !packets.isEmpty else {
assertionFailure("Writing empty packets?")
return
}
if let xor {
let processedPackets = xor.processPackets(packets, outbound: true)
try await link.writePackets(processedPackets)
return
}
try await link.writePackets(packets)
}
}

View File

@ -29,9 +29,9 @@ import PassepartoutKit
/// Processes data packets according to a XOR method.
struct XORProcessor {
private let method: OpenVPN.XORMethod?
private let method: OpenVPN.XORMethod
init(method: OpenVPN.XORMethod?) {
init(method: OpenVPN.XORMethod) {
self.method = method
}
@ -43,10 +43,7 @@ struct XORProcessor {
- Returns: The array of packets after XOR processing.
**/
func processPackets(_ packets: [Data], outbound: Bool) -> [Data] {
guard method != nil else {
return packets
}
return packets.map {
packets.map {
processPacket($0, outbound: outbound)
}
}
@ -59,9 +56,6 @@ struct XORProcessor {
- Returns: The packet after XOR processing.
**/
func processPacket(_ packet: Data, outbound: Bool) -> Data {
guard let method else {
return packet
}
switch method {
case .xormask(let mask):
return Self.xormask(packet: packet, mask: mask.zData)