Encapsulate control state into ControlChannel
First step: variables + mutating funcs.
This commit is contained in:
parent
d80c0b5460
commit
19ce7de819
@ -32,6 +32,8 @@
|
||||
0E12B2A32145341B00B4BAE9 /* PacketTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E12B2A22145341B00B4BAE9 /* PacketTests.swift */; };
|
||||
0E12B2A521454F7F00B4BAE9 /* BidirectionalState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E12B2A421454F7F00B4BAE9 /* BidirectionalState.swift */; };
|
||||
0E12B2A621454F7F00B4BAE9 /* BidirectionalState.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E12B2A421454F7F00B4BAE9 /* BidirectionalState.swift */; };
|
||||
0E12B2A821456C0200B4BAE9 /* ControlChannel.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E12B2A721456C0200B4BAE9 /* ControlChannel.swift */; };
|
||||
0E12B2A921456C0200B4BAE9 /* ControlChannel.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E12B2A721456C0200B4BAE9 /* ControlChannel.swift */; };
|
||||
0E245D692135972800B012A2 /* PushTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0E245D682135972800B012A2 /* PushTests.swift */; };
|
||||
0E245D6C2137F73600B012A2 /* CompressionFramingNative.h in Headers */ = {isa = PBXBuildFile; fileRef = 0E245D6B2137F73600B012A2 /* CompressionFramingNative.h */; };
|
||||
0E39BCE8214B2AB60035E9DE /* ControlPacket.h in Headers */ = {isa = PBXBuildFile; fileRef = 0E39BCE6214B2AB60035E9DE /* ControlPacket.h */; };
|
||||
@ -198,6 +200,7 @@
|
||||
0E1108B71F77B9F900A92462 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
|
||||
0E12B2A22145341B00B4BAE9 /* PacketTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PacketTests.swift; sourceTree = "<group>"; };
|
||||
0E12B2A421454F7F00B4BAE9 /* BidirectionalState.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = BidirectionalState.swift; sourceTree = "<group>"; };
|
||||
0E12B2A721456C0200B4BAE9 /* ControlChannel.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ControlChannel.swift; sourceTree = "<group>"; };
|
||||
0E17D7F91F730D9F009EE129 /* TunnelKit.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = TunnelKit.framework; sourceTree = BUILT_PRODUCTS_DIR; };
|
||||
0E245D682135972800B012A2 /* PushTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = PushTests.swift; sourceTree = "<group>"; };
|
||||
0E245D6B2137F73600B012A2 /* CompressionFramingNative.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = CompressionFramingNative.h; sourceTree = "<group>"; };
|
||||
@ -431,6 +434,7 @@
|
||||
0E245D6B2137F73600B012A2 /* CompressionFramingNative.h */,
|
||||
0E39BCE6214B2AB60035E9DE /* ControlPacket.h */,
|
||||
0E39BCE7214B2AB60035E9DE /* ControlPacket.m */,
|
||||
0E12B2A721456C0200B4BAE9 /* ControlChannel.swift */,
|
||||
0EFEB44A2006D3C800F81029 /* CoreConfiguration.swift */,
|
||||
0E07597C20F0060E00F38FD8 /* CryptoAEAD.h */,
|
||||
0E07597D20F0060E00F38FD8 /* CryptoAEAD.m */,
|
||||
@ -863,6 +867,7 @@
|
||||
0EFEB45D2006D3C800F81029 /* CryptoBox.m in Sources */,
|
||||
0EBBF2FA2085061600E36B40 /* NETCPInterface.swift in Sources */,
|
||||
0E0C2125212ED29D008AB282 /* SessionError.swift in Sources */,
|
||||
0E12B2A821456C0200B4BAE9 /* ControlChannel.swift in Sources */,
|
||||
0EFEB4552006D3C800F81029 /* SessionProxy+EncryptionBridge.swift in Sources */,
|
||||
0EFEB45C2006D3C800F81029 /* ZeroingData.m in Sources */,
|
||||
0EFEB4632006D3C800F81029 /* ProtocolMacros.swift in Sources */,
|
||||
@ -916,6 +921,7 @@
|
||||
0EFEB4962006D7F300F81029 /* ProtocolMacros.swift in Sources */,
|
||||
0EFEB48A2006D7C400F81029 /* TunnelKitProvider.swift in Sources */,
|
||||
0E0C2126212ED29D008AB282 /* SessionError.swift in Sources */,
|
||||
0E12B2A921456C0200B4BAE9 /* ControlChannel.swift in Sources */,
|
||||
0EBBF2FB2085061600E36B40 /* NETCPInterface.swift in Sources */,
|
||||
0EFEB4982006D7F300F81029 /* ZeroingData.swift in Sources */,
|
||||
0EFEB4A32006D7F300F81029 /* Errors.m in Sources */,
|
||||
|
@ -280,9 +280,10 @@ open class TunnelKitProvider: NEPacketTunnelProvider {
|
||||
|
||||
case .dataCount:
|
||||
if let proxy = proxy {
|
||||
let dataCount = proxy.dataCount()
|
||||
response = Data()
|
||||
response?.append(UInt64(proxy.bytesCount.inbound))
|
||||
response?.append(UInt64(proxy.bytesCount.outbound))
|
||||
response?.append(UInt64(dataCount.0)) // inbound
|
||||
response?.append(UInt64(dataCount.1)) // outbound
|
||||
}
|
||||
|
||||
default:
|
||||
|
81
TunnelKit/Sources/Core/ControlChannel.swift
Normal file
81
TunnelKit/Sources/Core/ControlChannel.swift
Normal file
@ -0,0 +1,81 @@
|
||||
//
|
||||
// ControlChannel.swift
|
||||
// TunnelKit
|
||||
//
|
||||
// Created by Davide De Rosa on 9/9/18.
|
||||
// Copyright (c) 2018 Davide De Rosa. All rights reserved.
|
||||
//
|
||||
// https://github.com/keeshux
|
||||
//
|
||||
// This file is part of TunnelKit.
|
||||
//
|
||||
// TunnelKit is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// TunnelKit is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with TunnelKit. If not, see <http://www.gnu.org/licenses/>.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import __TunnelKitNative
|
||||
|
||||
// TODO: make all private
|
||||
|
||||
class ControlChannel {
|
||||
private(set) var queue: BidirectionalState<[ControlPacket]>
|
||||
|
||||
private(set) var packetId: BidirectionalState<UInt32>
|
||||
|
||||
private var pendingAcks: Set<UInt32>
|
||||
|
||||
private(set) var plainBuffer: ZeroingData
|
||||
|
||||
private var dataCount: BidirectionalState<Int>
|
||||
|
||||
init() {
|
||||
queue = BidirectionalState(withResetValue: [])
|
||||
packetId = BidirectionalState(withResetValue: 0)
|
||||
pendingAcks = []
|
||||
plainBuffer = Z(count: TLSBoxMaxBufferLength)
|
||||
dataCount = BidirectionalState(withResetValue: 0)
|
||||
}
|
||||
|
||||
func addPendingAck(_ packetId: UInt32) {
|
||||
pendingAcks.insert(packetId)
|
||||
}
|
||||
|
||||
func removePendingAcks(_ packetIds: [UInt32]) {
|
||||
pendingAcks.subtract(packetIds)
|
||||
}
|
||||
|
||||
func hasPendingAcks() -> Bool {
|
||||
return !pendingAcks.isEmpty
|
||||
}
|
||||
|
||||
func addReceivedDataCount(_ count: Int) {
|
||||
dataCount.inbound += count
|
||||
}
|
||||
|
||||
func addSentDataCount(_ count: Int) {
|
||||
dataCount.outbound += count
|
||||
}
|
||||
|
||||
func currentDataCount() -> (Int, Int) {
|
||||
return dataCount.pair
|
||||
}
|
||||
|
||||
func reset() {
|
||||
plainBuffer.zero()
|
||||
queue.reset()
|
||||
pendingAcks.removeAll()
|
||||
packetId.reset()
|
||||
dataCount.reset()
|
||||
}
|
||||
}
|
@ -155,20 +155,10 @@ public class SessionProxy {
|
||||
|
||||
// MARK: Control
|
||||
|
||||
private let controlPlainBuffer: ZeroingData
|
||||
|
||||
private var controlQueue: BidirectionalState<[ControlPacket]>
|
||||
|
||||
private var controlPendingAcks: Set<UInt32>
|
||||
|
||||
private var controlPacketId: BidirectionalState<UInt32>
|
||||
private var controlChannel: ControlChannel
|
||||
|
||||
private var authenticator: Authenticator?
|
||||
|
||||
// MARK: Data
|
||||
|
||||
private(set) var bytesCount: BidirectionalState<Int>
|
||||
|
||||
// MARK: Init
|
||||
|
||||
/**
|
||||
@ -187,11 +177,7 @@ public class SessionProxy {
|
||||
lastPing = BidirectionalState(withResetValue: Date.distantPast)
|
||||
isStopping = false
|
||||
|
||||
controlPlainBuffer = Z(count: TLSBoxMaxBufferLength)
|
||||
controlQueue = BidirectionalState(withResetValue: [])
|
||||
controlPendingAcks = []
|
||||
controlPacketId = BidirectionalState(withResetValue: 0)
|
||||
bytesCount = BidirectionalState(withResetValue: 0)
|
||||
controlChannel = ControlChannel()
|
||||
}
|
||||
|
||||
deinit {
|
||||
@ -273,6 +259,15 @@ public class SessionProxy {
|
||||
loopTunnel()
|
||||
}
|
||||
|
||||
/**
|
||||
Returns the current data bytes count.
|
||||
|
||||
- Returns: The current data bytes count as a pair, inbound first.
|
||||
*/
|
||||
public func dataCount() -> (Int, Int) {
|
||||
return controlChannel.currentDataCount()
|
||||
}
|
||||
|
||||
/**
|
||||
Shuts down the session with an optional `Error` reason. Does nothing if the session is already stopped or about to stop.
|
||||
|
||||
@ -533,22 +528,22 @@ public class SessionProxy {
|
||||
}
|
||||
|
||||
let controlPacket = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: packetId, payload: payload)
|
||||
controlQueue.inbound.append(controlPacket)
|
||||
controlQueue.inbound.sort { $0.packetId < $1.packetId }
|
||||
controlChannel.queue.inbound.append(controlPacket)
|
||||
controlChannel.queue.inbound.sort { $0.packetId < $1.packetId }
|
||||
|
||||
for queuedControlPacket in controlQueue.inbound {
|
||||
if (queuedControlPacket.packetId < controlPacketId.inbound) {
|
||||
controlQueue.inbound.removeFirst()
|
||||
for queuedControlPacket in controlChannel.queue.inbound {
|
||||
if (queuedControlPacket.packetId < controlChannel.packetId.inbound) {
|
||||
controlChannel.queue.inbound.removeFirst()
|
||||
continue
|
||||
}
|
||||
if (queuedControlPacket.packetId != controlPacketId.inbound) {
|
||||
if (queuedControlPacket.packetId != controlChannel.packetId.inbound) {
|
||||
continue
|
||||
}
|
||||
|
||||
handleControlPacket(queuedControlPacket)
|
||||
|
||||
controlPacketId.inbound += 1
|
||||
controlQueue.inbound.removeFirst()
|
||||
controlChannel.packetId.inbound += 1
|
||||
controlChannel.queue.inbound.removeFirst()
|
||||
}
|
||||
}
|
||||
|
||||
@ -613,12 +608,8 @@ public class SessionProxy {
|
||||
|
||||
// Ruby: reset_ctrl
|
||||
private func resetControlChannel() {
|
||||
controlPlainBuffer.zero()
|
||||
controlQueue.reset()
|
||||
controlPendingAcks.removeAll()
|
||||
controlPacketId.reset()
|
||||
controlChannel.reset()
|
||||
authenticator = nil
|
||||
bytesCount.reset()
|
||||
}
|
||||
|
||||
// Ruby: hard_reset
|
||||
@ -816,9 +807,9 @@ public class SessionProxy {
|
||||
|
||||
do {
|
||||
var length = 0
|
||||
try negotiationKey.tls.pullRawPlainText(controlPlainBuffer.mutableBytes, length: &length)
|
||||
try negotiationKey.tls.pullRawPlainText(controlChannel.plainBuffer.mutableBytes, length: &length)
|
||||
|
||||
let controlData = controlPlainBuffer.withOffset(0, count: length)
|
||||
let controlData = controlChannel.plainBuffer.withOffset(0, count: length)
|
||||
handleControlData(controlData)
|
||||
} catch _ {
|
||||
}
|
||||
@ -827,7 +818,9 @@ public class SessionProxy {
|
||||
|
||||
// Ruby: handle_ctrl_data
|
||||
private func handleControlData(_ data: ZeroingData) {
|
||||
guard let auth = authenticator else { return }
|
||||
guard let auth = authenticator else {
|
||||
return
|
||||
}
|
||||
|
||||
if CoreConfiguration.logsSensitiveData {
|
||||
log.debug("Pulled plain control data (\(data.count) bytes): \(data.toHex())")
|
||||
@ -927,7 +920,7 @@ public class SessionProxy {
|
||||
fatalError("Missing sessionId, do hardReset() first")
|
||||
}
|
||||
|
||||
let oldIdOut = controlPacketId.outbound
|
||||
let oldIdOut = controlChannel.packetId.outbound
|
||||
let maxCount = link.mtu
|
||||
var queuedCount = 0
|
||||
var offset = 0
|
||||
@ -935,19 +928,19 @@ public class SessionProxy {
|
||||
repeat {
|
||||
let subPayloadLength = min(maxCount, payload.count - offset)
|
||||
let subPayloadData = payload.subdata(offset: offset, count: subPayloadLength)
|
||||
let packet = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: controlPacketId.outbound, payload: subPayloadData)
|
||||
let packet = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: controlChannel.packetId.outbound, payload: subPayloadData)
|
||||
|
||||
controlQueue.outbound.append(packet)
|
||||
controlPacketId.outbound += 1
|
||||
controlChannel.queue.outbound.append(packet)
|
||||
controlChannel.packetId.outbound += 1
|
||||
offset += maxCount
|
||||
queuedCount += subPayloadLength
|
||||
} while (offset < payload.count)
|
||||
|
||||
assert(queuedCount == payload.count)
|
||||
|
||||
let packetCount = controlPacketId.outbound - oldIdOut
|
||||
let packetCount = controlChannel.packetId.outbound - oldIdOut
|
||||
if (packetCount > 1) {
|
||||
log.debug("Enqueued \(packetCount) control packets [\(oldIdOut)-\(controlPacketId.outbound - 1)]")
|
||||
log.debug("Enqueued \(packetCount) control packets [\(oldIdOut)-\(controlChannel.packetId.outbound - 1)]")
|
||||
} else {
|
||||
log.debug("Enqueued 1 control packet [\(oldIdOut)]")
|
||||
}
|
||||
@ -957,7 +950,7 @@ public class SessionProxy {
|
||||
|
||||
// Ruby: flush_ctrl_q_out
|
||||
private func flushControlQueue() {
|
||||
for controlPacket in controlQueue.outbound {
|
||||
for controlPacket in controlChannel.queue.outbound {
|
||||
if let sentDate = controlPacket.sentDate {
|
||||
let timeAgo = -sentDate.timeIntervalSinceNow
|
||||
guard (timeAgo >= CoreConfiguration.retransmissionLimit) else {
|
||||
@ -980,7 +973,7 @@ public class SessionProxy {
|
||||
log.debug("Send control packet (\(raw.count) bytes): \(raw.toHex())")
|
||||
|
||||
// track pending acks for sent packets
|
||||
controlPendingAcks.insert(controlPacket.packetId)
|
||||
controlChannel.addPendingAck(controlPacket.packetId)
|
||||
|
||||
// WARNING: runs in Network.framework queue
|
||||
link?.writePacket(raw) { [weak self] (error) in
|
||||
@ -1068,7 +1061,7 @@ public class SessionProxy {
|
||||
|
||||
// Ruby: handle_data_pkt
|
||||
private func handleDataPackets(_ packets: [Data], key: SessionKey) {
|
||||
bytesCount.inbound += packets.flatCount
|
||||
controlChannel.addReceivedDataCount(packets.flatCount)
|
||||
do {
|
||||
guard let decryptedPackets = try key.decrypt(packets: packets) else {
|
||||
log.warning("Could not decrypt packets, is SessionKey properly configured (dataPath, peerId)?")
|
||||
@ -1103,7 +1096,7 @@ public class SessionProxy {
|
||||
}
|
||||
|
||||
// WARNING: runs in Network.framework queue
|
||||
bytesCount.outbound += encryptedPackets.flatCount
|
||||
controlChannel.addSentDataCount(encryptedPackets.flatCount)
|
||||
link?.writePackets(encryptedPackets) { [weak self] (error) in
|
||||
if let error = error {
|
||||
self?.queue.sync {
|
||||
@ -1136,18 +1129,18 @@ public class SessionProxy {
|
||||
}
|
||||
|
||||
// drop queued out packets if ack-ed
|
||||
for (i, controlPacket) in controlQueue.outbound.enumerated() {
|
||||
for (i, controlPacket) in controlChannel.queue.outbound.enumerated() {
|
||||
if packetIds.contains(controlPacket.packetId) {
|
||||
controlQueue.outbound.remove(at: i)
|
||||
controlChannel.queue.outbound.remove(at: i)
|
||||
}
|
||||
}
|
||||
|
||||
// remove ack-ed packets from pending
|
||||
controlPendingAcks.subtract(packetIds)
|
||||
controlChannel.removePendingAcks(packetIds)
|
||||
// log.verbose("Packets still pending ack: \(controlPendingAcks)")
|
||||
|
||||
// retry PUSH_REQUEST if ack queue is empty (all sent packets were ack'ed)
|
||||
if (isReliableLink && controlPendingAcks.isEmpty) {
|
||||
if isReliableLink && !controlChannel.hasPendingAcks() {
|
||||
pushRequest()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user