Problem: We do have two sets of channels 1. Primary Channels(All configuration documents) 2. Secondary Channels (Documents that contain attachments)
After user login we are starting the cblite replication with only primary channels. When the primary channels sync is in Idle state, we are stopping the cblite replication and updating to secondary channels and restarting the replication. When we restarted the cblite replication with new channels (both with primary and secondary) we are getting documents that are not part of cblite replication channels.
Cblite Version: 2.8.4
Initialization of Replication:
func setupSyncWith(syncUrl: String, syncBucketName:String ,authenticator : DEAuthenticator?) {
if Defaults[.isIntialSyncCompleted] {
Defaults[.channels] = Defaults[.primaryChannels] + Defaults[.secondaryChannels]
}else {
Defaults[.channels] = Defaults[.primaryChannels]
}
DEDBHelper.shared.initWithUrl(syncUrl: syncUrl + syncBucketName, authenticator: authenticator, Channels: Defaults[.channels])
}
//// Initialize Replicator
public func initWithUrl(syncUrl:String, authenticator: DEAuthenticator?, Channels channels:Array?) {
self.syncUrl = syncUrl
self.dbName = NSString(string: self.syncUrl).lastPathComponent
if channels != nil {
self.channels = channels!
}
if self.database == nil {
self.database = self.database(name: self.dbName)
}
if authenticator != nil {
self.authenticator = authenticator
self.startSync()
}
}
///Start Replicator
internal func startSync() {
if database == nil {
return
}
//startLogging()
print("\(Date().description(with: Locale.current)) Start Logging")
let targetEndpoint = URLEndpoint(url: URL(string: self.syncUrl.replacingOccurrences(of: "http", with: "ws"))!)
replicatorConfig = ReplicatorConfiguration(database: self.database, target: targetEndpoint)
replicatorConfig.replicatorType = .pushAndPull
if channels.count > 0 {
replicatorConfig.channels = self.channels
}
replicatorConfig.continuous = true
// Add authentication.
if let authenticator = getAuthenticator() {
replicatorConfig.authenticator = authenticator
}
// Create a replicator.
self.replicator = Replicator(config: replicatorConfig)
// Listen to replicator change events.
self.listenerToken = self.replicator.addChangeListener { (change) in
print("\(Date().description(with: Locale.current))\(change.status.progress)")
//Checking whether app registered for callbacks or not
if self.delegate != nil {
var syncChange = DESyncChange()
syncChange.status = self.getSyncStatus(change: change)
syncChange.error = self.getSyncError(change: change)
self.delegate.syncListener(status : syncChange)
}
}
self.documentListenerToken = self.replicator.addDocumentReplicationListener { (replication) in
print("Replication type :: \(replication.isPush ? "Push" : "Pull")")
for document in replication.documents {
if (document.error == nil) {
print("Doc ID :: \(document.id)")
} else {
if self.delegate != nil {
self.delegate.documentSyncError(error: document.error, documentId: document.id)
}
}
}
}
// Start replication.
self.replicator.start()
}
///
func getAuthenticator() → Authenticator? {
if (self.authenticator == nil) {
return nil
}
if let sessionAuth = self.authenticator as? DESessionAuthenticator {
if sessionAuth.cookie.isEmpty {
return SessionAuthenticator(sessionID: sessionAuth.token)
}
return SessionAuthenticator(sessionID: sessionAuth.token, cookieName: sessionAuth.cookie)
}
return nil
}
//Stop Sync
///To stop sync
internal func stopSync() {
if replicator != nil {
if self.replicator.status.activity != .busy {
print("\(Date().description(with: Locale.current)) Stop Replicator")
if let token = documentListenerToken {
self.replicator.removeChangeListener(withToken: documentListenerToken)
}
self.replicator.stop()
print("\(Date().description(with: Locale.current)) Replication status after replication stop executed: \(self.replicator.status.activity)")
}else {
print("\(Date().description(with: Locale.current)) Retry To Stop Replicator: \(self.replicator.status.activity)")
self.stopSync()
}
}
}
Logs:
cbllog.zip (4.5 MB)