Move control queue management

- Out packets
- In packets
- Acks
This commit is contained in:
Davide De Rosa 2018-09-09 17:31:11 +02:00
parent e6dd4de472
commit 1573b2070a
2 changed files with 121 additions and 96 deletions

View File

@ -25,13 +25,14 @@
import Foundation
import __TunnelKitNative
import SwiftyBeaver
// TODO: make all private
private let log = SwiftyBeaver.self
class ControlChannel {
private(set) var queue: BidirectionalState<[ControlPacket]>
private var queue: BidirectionalState<[ControlPacket]>
private(set) var packetId: BidirectionalState<UInt32>
private var packetId: BidirectionalState<UInt32>
private var pendingAcks: Set<UInt32>
@ -47,18 +48,112 @@ class ControlChannel {
dataCount = BidirectionalState(withResetValue: 0)
}
func addPendingAck(_ packetId: UInt32) {
pendingAcks.insert(packetId)
func readInboundPacket(withCode code: PacketCode, key: UInt8, sessionId inboundSessionId: Data, packetId inboundPacketId: UInt32, payload: Data?) -> [ControlPacket] {
let packet = ControlPacket(code: code, key: key, sessionId: inboundSessionId, packetId: inboundPacketId, payload: payload)
queue.inbound.append(packet)
queue.inbound.sort { $0.packetId < $1.packetId }
var toHandle: [ControlPacket] = []
for queuedPacket in queue.inbound {
if queuedPacket.packetId < packetId.inbound {
queue.inbound.removeFirst()
continue
}
if queuedPacket.packetId != packetId.inbound {
continue
}
toHandle.append(queuedPacket)
packetId.inbound += 1
queue.inbound.removeFirst()
}
return toHandle
}
func removePendingAcks(_ packetIds: [UInt32]) {
pendingAcks.subtract(packetIds)
func enqueueOutboundPackets(withCode code: PacketCode, key: UInt8, sessionId: Data, payload: Data, maxPacketSize: Int) {
let oldIdOut = packetId.outbound
var queuedCount = 0
var offset = 0
repeat {
let subPayloadLength = min(maxPacketSize, payload.count - offset)
let subPayloadData = payload.subdata(offset: offset, count: subPayloadLength)
let packet = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: packetId.outbound, payload: subPayloadData)
queue.outbound.append(packet)
packetId.outbound += 1
offset += maxPacketSize
queuedCount += subPayloadLength
} while (offset < payload.count)
assert(queuedCount == payload.count)
// packet count
let packetCount = packetId.outbound - oldIdOut
if (packetCount > 1) {
log.debug("Enqueued \(packetCount) control packets [\(oldIdOut)-\(packetId.outbound - 1)]")
} else {
log.debug("Enqueued 1 control packet [\(oldIdOut)]")
}
}
func writeOutboundPackets() -> [Data] {
var rawList: [Data] = []
for packet in queue.outbound {
if let sentDate = packet.sentDate {
let timeAgo = -sentDate.timeIntervalSinceNow
guard (timeAgo >= CoreConfiguration.retransmissionLimit) else {
log.debug("Skip control packet with id \(packet.packetId) (sent on \(sentDate), \(timeAgo) seconds ago)")
continue
}
}
log.debug("Send control packet with code \(packet.code.rawValue)")
if let payload = packet.payload {
if CoreConfiguration.logsSensitiveData {
log.debug("Control packet has payload (\(payload.count) bytes): \(payload.toHex())")
} else {
log.debug("Control packet has payload (\(payload.count) bytes)")
}
}
let raw = packet.serialized()
rawList.append(raw)
packet.sentDate = Date()
// track pending acks for sent packets
pendingAcks.insert(packet.packetId)
}
// log.verbose("Packets now pending ack: \(pendingAcks)")
return rawList
}
func hasPendingAcks() -> Bool {
return !pendingAcks.isEmpty
}
func readAcks(_ packetIds: [UInt32]) {
// drop queued out packets if ack-ed
for (i, packet) in queue.outbound.enumerated() {
if packetIds.contains(packet.packetId) {
queue.outbound.remove(at: i)
}
}
// remove ack-ed packets from pending
pendingAcks.subtract(packetIds)
// log.verbose("Packets still pending ack: \(pendingAcks)")
}
func writeAcks(withKey key: UInt8, sessionId: Data, ackPacketIds: [UInt32], ackRemoteSessionId: Data) -> Data {
let ackPacket = ControlPacket(key: key, sessionId: sessionId, ackIds: ackPacketIds as [NSNumber], ackRemoteSessionId: ackRemoteSessionId)
return ackPacket.serialized()
}
func currentControlData(withTLS tls: TLSBox) throws -> ZeroingData {
var length = 0
try tls.pullRawPlainText(plainBuffer.mutableBytes, length: &length)

View File

@ -527,23 +527,9 @@ public class SessionProxy {
}
}
let controlPacket = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: packetId, payload: payload)
controlChannel.queue.inbound.append(controlPacket)
controlChannel.queue.inbound.sort { $0.packetId < $1.packetId }
for queuedControlPacket in controlChannel.queue.inbound {
if (queuedControlPacket.packetId < controlChannel.packetId.inbound) {
controlChannel.queue.inbound.removeFirst()
continue
}
if (queuedControlPacket.packetId != controlChannel.packetId.inbound) {
continue
}
handleControlPacket(queuedControlPacket)
controlChannel.packetId.inbound += 1
controlChannel.queue.inbound.removeFirst()
let pendingInboundQueue = controlChannel.readInboundPacket(withCode: code, key: key, sessionId: sessionId, packetId: packetId, payload: payload)
for inboundPacket in pendingInboundQueue {
handleControlPacket(inboundPacket)
}
}
@ -916,75 +902,29 @@ public class SessionProxy {
guard let sessionId = sessionId else {
fatalError("Missing sessionId, do hardReset() first")
}
let oldIdOut = controlChannel.packetId.outbound
let maxCount = link.mtu
var queuedCount = 0
var offset = 0
repeat {
let subPayloadLength = min(maxCount, payload.count - offset)
let subPayloadData = payload.subdata(offset: offset, count: subPayloadLength)
let packet = ControlPacket(code: code, key: key, sessionId: sessionId, packetId: controlChannel.packetId.outbound, payload: subPayloadData)
controlChannel.queue.outbound.append(packet)
controlChannel.packetId.outbound += 1
offset += maxCount
queuedCount += subPayloadLength
} while (offset < payload.count)
assert(queuedCount == payload.count)
let packetCount = controlChannel.packetId.outbound - oldIdOut
if (packetCount > 1) {
log.debug("Enqueued \(packetCount) control packets [\(oldIdOut)-\(controlChannel.packetId.outbound - 1)]")
} else {
log.debug("Enqueued 1 control packet [\(oldIdOut)]")
}
// FIXME: init controlChannel with sessionId
controlChannel.enqueueOutboundPackets(withCode: code, key: key, sessionId: sessionId, payload: payload, maxPacketSize: link.mtu)
flushControlQueue()
}
// Ruby: flush_ctrl_q_out
private func flushControlQueue() {
for controlPacket in controlChannel.queue.outbound {
if let sentDate = controlPacket.sentDate {
let timeAgo = -sentDate.timeIntervalSinceNow
guard (timeAgo >= CoreConfiguration.retransmissionLimit) else {
log.debug("Skip control packet with id \(controlPacket.packetId) (sent on \(sentDate), \(timeAgo) seconds ago)")
continue
}
}
log.debug("Send control packet with code \(controlPacket.code.rawValue)")
if let payload = controlPacket.payload {
if CoreConfiguration.logsSensitiveData {
log.debug("Control packet has payload (\(payload.count) bytes): \(payload.toHex())")
} else {
log.debug("Control packet has payload (\(payload.count) bytes)")
}
}
let raw = controlPacket.serialized()
let rawList = controlChannel.writeOutboundPackets()
for raw in rawList {
log.debug("Send control packet (\(raw.count) bytes): \(raw.toHex())")
// track pending acks for sent packets
controlChannel.addPendingAck(controlPacket.packetId)
// WARNING: runs in Network.framework queue
link?.writePacket(raw) { [weak self] (error) in
if let error = error {
self?.queue.sync {
log.error("Failed LINK write during control flush: \(error)")
self?.deferStop(.reconnect, SessionError.failedLinkWrite)
return
}
}
// WARNING: runs in Network.framework queue
link?.writePackets(rawList) { [weak self] (error) in
if let error = error {
self?.queue.sync {
log.error("Failed LINK write during control flush: \(error)")
self?.deferStop(.reconnect, SessionError.failedLinkWrite)
return
}
}
controlPacket.sentDate = Date()
}
// log.verbose("Packets now pending ack: \(controlPendingAcks)")
}
// Ruby: setup_keys
@ -1124,17 +1064,8 @@ public class SessionProxy {
deferStop(.shutdown, SessionError.sessionMismatch)
return
}
// drop queued out packets if ack-ed
for (i, controlPacket) in controlChannel.queue.outbound.enumerated() {
if packetIds.contains(controlPacket.packetId) {
controlChannel.queue.outbound.remove(at: i)
}
}
// remove ack-ed packets from pending
controlChannel.removePendingAcks(packetIds)
// log.verbose("Packets still pending ack: \(controlPendingAcks)")
controlChannel.readAcks(packetIds)
// retry PUSH_REQUEST if ack queue is empty (all sent packets were ack'ed)
if isReliableLink && !controlChannel.hasPendingAcks() {
@ -1152,8 +1083,7 @@ public class SessionProxy {
return
}
let ackPacket = ControlPacket(key: key, sessionId: sessionId, ackIds: [packetId as NSNumber], ackRemoteSessionId: remoteSessionId)
let raw = ackPacket.serialized()
let raw = controlChannel.writeAcks(withKey: key, sessionId: sessionId, ackPacketIds: [packetId], ackRemoteSessionId: remoteSessionId)
// WARNING: runs in Network.framework queue
link?.writePacket(raw) { [weak self] (error) in