iOS/Sources/Shared/API/Models/ModelManager.swift

392 lines
14 KiB
Swift

import Foundation
import HAKit
import PromiseKit
import RealmSwift
public class ModelManager: ServerObserver {
private var notificationTokens = [NotificationToken]()
private var hakitTokens = [HACancellable]()
private var subscribedSubscriptions = [SubscribeDefinition]()
private var cleanupDefinitions = [CleanupDefinition]()
public var workQueue: DispatchQueue = .global(qos: .userInitiated)
static var isAppInForeground: () -> Bool = { false }
deinit {
hakitTokens.forEach { $0.cancel() }
notificationTokens.forEach { $0.invalidate() }
NotificationCenter.default.removeObserver(self)
}
public func observe<T>(
for collection: AnyRealmCollection<T>,
handler: @escaping (AnyRealmCollection<T>) -> Promise<Void>
) {
notificationTokens.append(collection.observe { change in
switch change {
case .initial:
break
case .update(let collection, deletions: _, insertions: _, modifications: _):
handler(collection).cauterize()
case let .error(error):
Current.Log.error("failed to watch \(collection): \(error)")
}
})
}
public struct CleanupDefinition {
public enum OrphanMode {
case delete(handler: (Realm, [Object]) -> Void)
case replace
}
public enum CleanupType {
case age(createdKey: String, duration: Measurement<UnitDuration>)
case orphaned(serverIdentifierKey: String, allowedPredicate: NSPredicate, mode: OrphanMode)
}
public var model: Object.Type
public var cleanupTypes: [CleanupType]
public init(
model: Object.Type,
createdKey: String,
duration: Measurement<UnitDuration> = .init(value: 256, unit: .hours)
) {
self.model = model
self.cleanupTypes = [.age(createdKey: createdKey, duration: duration)]
}
init<UM: Object & UpdatableModel>(
orphansOf model: UM.Type
) {
self.model = model
self.cleanupTypes = [
.orphaned(
serverIdentifierKey: model.serverIdentifierKey(),
allowedPredicate: model.updateEligiblePredicate,
mode: .delete(handler: { realm, objects in
if let objects = objects as? [UM] {
model.willDelete(objects: objects, server: nil, realm: realm)
} else {
preconditionFailure("invalid object type passed into delete handler")
}
})
),
.orphaned(
serverIdentifierKey: model.serverIdentifierKey(),
allowedPredicate: NSCompoundPredicate(notPredicateWithSubpredicate: model.updateEligiblePredicate),
mode: .replace
),
]
}
public init(
orphansOf model: Object.Type,
serverIdentifierKey: String,
allowedPredicate: NSPredicate,
mode: OrphanMode
) {
self.model = model
self.cleanupTypes = [
.orphaned(
serverIdentifierKey: serverIdentifierKey,
allowedPredicate: allowedPredicate,
mode: mode
),
]
}
public static let defaults: [Self] = [
CleanupDefinition(
model: LocationHistoryEntry.self,
createdKey: #keyPath(LocationHistoryEntry.CreatedAt)
),
CleanupDefinition(
model: LocationError.self,
createdKey: #keyPath(LocationError.CreatedAt)
),
CleanupDefinition(
model: ClientEvent.self,
createdKey: #keyPath(ClientEvent.date)
),
CleanupDefinition(orphansOf: RLMScene.self),
CleanupDefinition(orphansOf: RLMZone.self),
CleanupDefinition(orphansOf: Action.self),
CleanupDefinition(orphansOf: NotificationCategory.self),
CleanupDefinition(
orphansOf: WatchComplication.self,
serverIdentifierKey: #keyPath(WatchComplication.serverIdentifier),
allowedPredicate: .init(value: true),
mode: .replace
),
]
}
public func cleanup(
definitions: [CleanupDefinition] = CleanupDefinition.defaults
) -> Promise<Void> {
let (promise, seal) = Promise<Void>.pending()
Current.servers.add(observer: self)
cleanupDefinitions = definitions
workQueue.async {
let realm = Current.realm()
let writes = definitions.map { definition in
realm.reentrantWrite {
self.cleanup(using: definition, realm: realm)
}
}
when(fulfilled: writes).pipe(to: seal.resolve)
}
return promise
}
private func cleanup(
using definition: CleanupDefinition,
realm: Realm
) {
let deleteObjects = { (_ objects: Results<Object>) in
if objects.isEmpty == false {
Current.Log.info("delete \(definition.model): \(objects.count)")
realm.delete(objects)
}
}
for cleanupType in definition.cleanupTypes {
switch cleanupType {
case let .age(createdKey: createdKey, duration: duration):
let duration = duration.converted(to: .seconds).value
let date = Current.date().addingTimeInterval(-duration)
deleteObjects(
realm
.objects(definition.model)
.filter("%K < %@", createdKey, date)
)
case let .orphaned(
serverIdentifierKey: serverIdentifierKey,
allowedPredicate: allowedPredicate,
mode: mode
):
let serverIdentifiers = Current.servers.all.map(\.identifier.rawValue)
let objects = realm.objects(definition.model)
.filter(allowedPredicate)
.filter("not %K in %@", serverIdentifierKey, serverIdentifiers)
switch mode {
case let .delete(handler):
handler(realm, Array(objects))
deleteObjects(objects)
case .replace:
if let replacement = Current.servers.all.first, !objects.isEmpty {
Current.Log.info("migrate \(definition.model): \(objects.count) to \(replacement.identifier)")
for object in objects {
object[serverIdentifierKey] = replacement.identifier.rawValue
}
}
}
}
}
}
public struct SubscribeDefinition {
public var subscribe: (
_ connection: HAConnection,
_ server: Server,
_ queue: DispatchQueue,
_ modelManager: ModelManager
) -> [HACancellable]
static func states<
UM: Object & UpdatableModel
>(
domain: String,
type: UM.Type
) -> Self where UM.Source == HAEntity {
.init(subscribe: { connection, server, queue, manager in
// working around a swift compiler crash, xcode 12.4
let someManager = manager
var lastEntities = Set<HAEntity>()
var lastUpdate: Date?
return [
connection.caches.states.subscribe { [weak someManager] token, value in
queue.async {
guard let manager = someManager else {
token.cancel()
return
}
DispatchQueue.main.async {
guard ModelManager.isAppInForeground() else { return }
Current.appEntitiesModel.updateModel(value.all, server: server)
if let lastUpdate {
// Prevent sequential updates in short time
guard Date().timeIntervalSince(lastUpdate) > 15 else { return }
}
let entitiesForDomain = value.all.filter({ $0.domain == domain })
if entitiesForDomain != lastEntities {
manager.store(type: type, from: server, sourceModels: entitiesForDomain).cauterize()
lastEntities = entitiesForDomain
lastUpdate = Date()
}
}
}
},
]
})
}
public static let defaults: [Self] = [
.states(domain: "zone", type: RLMZone.self),
.states(domain: "scene", type: RLMScene.self),
]
}
public func subscribe(
definitions: [SubscribeDefinition] = SubscribeDefinition.defaults,
isAppInForeground: @escaping () -> Bool
) {
ModelManager.isAppInForeground = isAppInForeground
Current.servers.add(observer: self)
subscribedSubscriptions.removeAll()
hakitTokens.forEach { $0.cancel() }
hakitTokens = definitions.flatMap { definition -> [HACancellable] in
Current.apis.filter({ $0.server.info.connection.activeURL() != nil }).flatMap { api in
// Since we filtered activeURL, we can force unwrap here
definition.subscribe(api.connection!, api.server, workQueue, self)
}
}
subscribedSubscriptions = definitions
}
public func unsubscribe() {
subscribedSubscriptions.removeAll()
hakitTokens.forEach { $0.cancel() }
subscribedSubscriptions = []
}
public struct FetchDefinition {
public var update: (
_ api: HomeAssistantAPI,
_ queue: DispatchQueue,
_ modelManager: ModelManager
) -> Promise<Void>
public static let defaults: [Self] = [
FetchDefinition(update: { api, queue, manager in
api.GetMobileAppConfig().then(on: queue) {
when(fulfilled: [
manager.store(
type: NotificationCategory.self,
from: api.server,
sourceModels: $0.push.categories
),
manager.store(type: Action.self, from: api.server, sourceModels: $0.actions),
])
}
}),
]
}
public func fetch(
definitions: [FetchDefinition] = FetchDefinition.defaults,
apis: [HomeAssistantAPI] = Current.apis
) -> Promise<Void> {
when(fulfilled: apis.map { api in
when(fulfilled: definitions.map { $0.update(api, workQueue, self) })
}).asVoid()
}
enum StoreError: Error {
case missingPrimaryKey
}
func store<UM: Object & UpdatableModel>(
type realmObjectType: UM.Type,
from server: Server,
sourceModels: some Collection<UM.Source>
) -> Promise<Void> {
let realm = Current.realm()
return realm.reentrantWrite {
guard let realmPrimaryKey = realmObjectType.primaryKey() else {
Current.Log.error("invalid realm object type: \(realmObjectType)")
throw StoreError.missingPrimaryKey
}
let allObjects = realm.objects(UM.self)
.filter(UM.updateEligiblePredicate)
.filter("%K = %@", UM.serverIdentifierKey(), server.identifier.rawValue)
let existingIDs = Set(allObjects.compactMap { $0[realmPrimaryKey] as? String })
let incomingIDs = Set(sourceModels.map {
UM.primaryKey(sourceIdentifier: $0.primaryKey, serverIdentifier: server.identifier.rawValue)
})
let deletedIDs = existingIDs.subtracting(incomingIDs)
let newIDs = incomingIDs.subtracting(existingIDs)
let deleteObjects = allObjects
.filter("%K in %@", realmPrimaryKey, deletedIDs)
Current.Log.verbose(
[
"updating \(UM.self)",
"server(\(server.identifier))",
"from(\(existingIDs.count))",
"eligible(\(incomingIDs.count))",
"deleted(\(deleteObjects.count))",
"ignored(\(deletedIDs.count - deleteObjects.count))",
"new(\(newIDs.count))",
].joined(separator: " ")
)
let updatedModels: [UM] = sourceModels.compactMap { model in
let updating: UM
let fullPrimaryKey = UM.primaryKey(
sourceIdentifier: model.primaryKey,
serverIdentifier: server.identifier.rawValue
)
if let existing = realm.object(ofType: UM.self, forPrimaryKey: fullPrimaryKey) {
updating = existing
} else {
Current.Log.verbose("creating \(fullPrimaryKey)")
updating = UM()
}
if updating.realm == nil {
updating.setValue(fullPrimaryKey, forKey: realmPrimaryKey)
} else {
assert(updating.value(forKey: realmPrimaryKey) as? String == fullPrimaryKey)
}
updating.setValue(server.identifier.rawValue, forKey: UM.serverIdentifierKey())
if updating.update(with: model, server: server, using: realm) {
return updating
} else {
return nil
}
}
realm.add(updatedModels, update: .all)
UM.didUpdate(objects: updatedModels, server: server, realm: realm)
UM.willDelete(objects: Array(deleteObjects), server: server, realm: realm)
realm.delete(deleteObjects)
}
}
public func serversDidChange(_ serverManager: ServerManager) {
subscribe(definitions: subscribedSubscriptions, isAppInForeground: ModelManager.isAppInForeground)
cleanup(definitions: cleanupDefinitions).cauterize()
}
}