Files
yjs/src/Connector.js

367 lines
11 KiB
JavaScript
Raw Normal View History

2015-11-30 12:26:02 +01:00
/* @flow */
2015-09-10 19:41:07 +02:00
'use strict'
module.exports = function (Y/* :any */) {
2015-11-07 22:12:48 +01:00
class AbstractConnector {
2015-11-30 12:26:02 +01:00
/* ::
y: YConfig;
role: SyncRole;
connections: Object;
isSynced: boolean;
userEventListeners: Array<Function>;
whenSyncedListeners: Array<Function>;
currentSyncTarget: ?UserId;
2015-11-30 12:47:33 +01:00
syncingClients: Array<UserId>;
2015-11-30 12:26:02 +01:00
forwardToSyncingClients: boolean;
debug: boolean;
broadcastedHB: boolean;
syncStep2: Promise;
userId: UserId;
send: Function;
broadcast: Function;
*/
2015-11-07 22:12:48 +01:00
/*
opts contains the following information:
role : String Role of this client ("master" or "slave")
userId : String Uniquely defines the user.
debug: Boolean Whether to print debug messages (optional)
*/
constructor (y, opts) {
this.y = y
if (opts == null) {
opts = {}
}
if (opts.role == null || opts.role === 'master') {
this.role = 'master'
} else if (opts.role === 'slave') {
this.role = 'slave'
} else {
throw new Error("Role must be either 'master' or 'slave'!")
}
this.y.db.forwardAppliedOperations = opts.forwardAppliedOperations || false
2015-11-07 22:12:48 +01:00
this.role = opts.role
this.connections = {}
this.isSynced = false
this.userEventListeners = []
this.whenSyncedListeners = []
2015-07-21 17:14:03 +02:00
this.currentSyncTarget = null
2015-11-07 22:12:48 +01:00
this.syncingClients = []
this.forwardToSyncingClients = opts.forwardToSyncingClients !== false
this.debug = opts.debug === true
this.broadcastedHB = false
this.syncStep2 = Promise.resolve()
}
2015-11-07 22:12:48 +01:00
reconnect () {
}
2015-11-07 22:12:48 +01:00
disconnect () {
this.connections = {}
this.isSynced = false
this.currentSyncTarget = null
this.broadcastedHB = false
this.syncingClients = []
this.whenSyncedListeners = []
return this.y.db.stopGarbageCollector()
}
2015-11-07 22:12:48 +01:00
setUserId (userId) {
this.userId = userId
return this.y.db.setUserId(userId)
}
2015-11-07 22:12:48 +01:00
onUserEvent (f) {
this.userEventListeners.push(f)
2015-07-21 17:14:03 +02:00
}
2015-11-07 22:12:48 +01:00
userLeft (user) {
delete this.connections[user]
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
this.syncingClients = this.syncingClients.filter(function (cli) {
return cli !== user
2015-07-21 17:14:03 +02:00
})
2015-11-07 22:12:48 +01:00
for (var f of this.userEventListeners) {
f({
action: 'userLeft',
user: user
})
}
}
2015-11-07 22:12:48 +01:00
userJoined (user, role) {
if (role == null) {
throw new Error('You must specify the role of the joined user!')
}
if (this.connections[user] != null) {
throw new Error('This user already joined!')
}
this.connections[user] = {
isSynced: false,
role: role
}
for (var f of this.userEventListeners) {
f({
action: 'userJoined',
user: user,
role: role
})
}
if (this.currentSyncTarget == null) {
this.findNextSyncTarget()
}
}
2015-11-07 22:12:48 +01:00
// Execute a function _when_ we are connected.
// If not connected, wait until connected
whenSynced (f) {
if (this.isSynced) {
f()
} else {
this.whenSyncedListeners.push(f)
}
}
2015-11-07 22:12:48 +01:00
/*
2015-09-17 00:21:01 +02:00
2015-11-07 22:12:48 +01:00
returns false, if there is no sync target
true otherwise
*/
findNextSyncTarget () {
if (this.currentSyncTarget != null || this.isSynced) {
return // "The current sync has not finished!"
}
2015-11-07 22:12:48 +01:00
var syncUser = null
for (var uid in this.connections) {
if (!this.connections[uid].isSynced) {
syncUser = uid
break
}
}
2015-11-07 22:12:48 +01:00
if (syncUser != null) {
var conn = this
this.currentSyncTarget = syncUser
this.y.db.requestTransaction(function *() {
2015-11-30 12:26:02 +01:00
var stateSet = yield* this.getStateSet()
var deleteSet = yield* this.getDeleteSet()
2015-11-07 22:12:48 +01:00
conn.send(syncUser, {
type: 'sync step 1',
2015-11-30 12:26:02 +01:00
stateSet: stateSet,
deleteSet: deleteSet
2015-11-07 22:12:48 +01:00
})
})
} else {
this.isSynced = true
// call when synced listeners
for (var f of this.whenSyncedListeners) {
f()
}
this.whenSyncedListeners = []
this.y.db.requestTransaction(function *() {
yield* this.garbageCollectAfterSync()
2015-07-21 17:14:03 +02:00
})
}
2015-07-14 22:39:01 +02:00
}
2015-11-07 22:12:48 +01:00
send (uid, message) {
if (this.debug) {
2015-11-30 12:26:02 +01:00
console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line
2015-11-07 22:12:48 +01:00
}
}
2015-11-07 22:12:48 +01:00
/*
You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/
2015-11-30 12:26:02 +01:00
receiveMessage (sender/* :UserId */, message/* :Message */) {
2015-11-07 22:12:48 +01:00
if (sender === this.userId) {
return
}
if (this.debug) {
2015-11-30 12:26:02 +01:00
console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line
2015-11-07 22:12:48 +01:00
}
2015-11-30 12:26:02 +01:00
if (message.type === 'sync step 1') {
2015-11-07 22:12:48 +01:00
// TODO: make transaction, stream the ops
let conn = this
2015-11-30 12:26:02 +01:00
let m = message
2015-11-07 22:12:48 +01:00
this.y.db.requestTransaction(function *() {
var currentStateSet = yield* this.getStateSet()
yield* this.applyDeleteSet(m.deleteSet)
2015-11-07 22:12:48 +01:00
var ds = yield* this.getDeleteSet()
var ops = yield* this.getOperations(m.stateSet)
conn.send(sender, {
type: 'sync step 2',
os: ops,
stateSet: currentStateSet,
deleteSet: ds
})
if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender)
setTimeout(function () {
conn.syncingClients = conn.syncingClients.filter(function (cli) {
return cli !== sender
})
conn.send(sender, {
type: 'sync done'
})
2015-11-30 12:26:02 +01:00
}, 5000) // TODO: conn.syncingClientDuration)
2015-11-07 22:12:48 +01:00
} else {
conn.send(sender, {
2015-07-21 17:14:03 +02:00
type: 'sync done'
})
2015-11-07 22:12:48 +01:00
}
conn._setSyncedWith(sender)
})
2015-11-30 12:26:02 +01:00
} else if (message.type === 'sync step 2') {
2015-11-07 22:12:48 +01:00
let conn = this
var broadcastHB = !this.broadcastedHB
this.broadcastedHB = true
var db = this.y.db
2015-11-30 12:26:02 +01:00
var defer = {}
defer.promise = new Promise(function (resolve) {
defer.resolve = resolve
})
this.syncStep2 = defer.promise
2015-11-30 12:26:02 +01:00
let m /* :MessageSyncStep2 */ = message
db.requestTransaction(function * () {
yield* this.applyDeleteSet(m.deleteSet)
this.store.apply(m.os)
db.requestTransaction(function * () {
var ops = yield* this.getOperations(m.stateSet)
if (ops.length > 0) {
2015-11-30 12:26:02 +01:00
var update /* :MessageUpdate */ = {
type: 'update',
ops: ops
}
if (!broadcastHB) { // TODO: consider to broadcast here..
2015-11-30 12:26:02 +01:00
conn.send(sender, update)
} else {
// broadcast only once!
2015-11-30 12:26:02 +01:00
conn.broadcast(update)
}
}
defer.resolve()
})
})
2015-11-30 12:26:02 +01:00
} else if (message.type === 'sync done') {
2015-11-07 22:12:48 +01:00
var self = this
this.syncStep2.then(function () {
self._setSyncedWith(sender)
})
2015-11-30 12:26:02 +01:00
} else if (message.type === 'update') {
2015-11-07 22:12:48 +01:00
if (this.forwardToSyncingClients) {
for (var client of this.syncingClients) {
2015-11-30 12:26:02 +01:00
this.send(client, message)
2015-11-07 22:12:48 +01:00
}
}
if (this.y.db.forwardAppliedOperations) {
2015-11-30 12:26:02 +01:00
var delops = message.ops.filter(function (o) {
return o.struct === 'Delete'
})
if (delops.length > 0) {
this.broadcast({
type: 'update',
ops: delops
})
}
}
2015-11-30 12:26:02 +01:00
this.y.db.apply(message.ops)
}
}
2015-11-07 22:12:48 +01:00
_setSyncedWith (user) {
var conn = this.connections[user]
if (conn != null) {
conn.isSynced = true
}
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
}
2015-11-07 22:12:48 +01:00
/*
Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
too much overhead. Y is very likely to get changed a lot in the future
2015-09-29 13:59:38 +02:00
2015-11-07 22:12:48 +01:00
Because we don't want to encode JSON as string (with character escaping, wich makes it pretty much unreadable)
we encode the JSON as XML.
2015-09-29 13:59:38 +02:00
2015-11-07 22:12:48 +01:00
When the HB support encoding as XML, the format should look pretty much like this.
2015-09-29 13:59:38 +02:00
2015-11-07 22:12:48 +01:00
does not support primitive values as array elements
expects an ltx (less than xml) object
*/
2015-11-30 12:26:02 +01:00
parseMessageFromXml (m/* :any */) {
2015-11-07 22:12:48 +01:00
function parseArray (node) {
for (var n of node.children) {
if (n.getAttribute('isArray') === 'true') {
return parseArray(n)
} else {
return parseObject(n)
}
}
}
2015-11-30 12:26:02 +01:00
function parseObject (node/* :any */) {
2015-11-07 22:12:48 +01:00
var json = {}
for (var attrName in node.attrs) {
var value = node.attrs[attrName]
var int = parseInt(value, 10)
if (isNaN(int) || ('' + int) !== value) {
json[attrName] = value
} else {
json[attrName] = int
}
}
2015-11-30 12:26:02 +01:00
for (var n/* :any */ in node.children) {
2015-11-07 22:12:48 +01:00
var name = n.name
if (n.getAttribute('isArray') === 'true') {
json[name] = parseArray(n)
} else {
json[name] = parseObject(n)
}
}
2015-11-07 22:12:48 +01:00
return json
}
2015-11-07 22:12:48 +01:00
parseObject(m)
}
2015-11-07 22:12:48 +01:00
/*
encode message in xml
we use string because Strophe only accepts an "xml-string"..
So {a:4,b:{c:5}} will look like
<y a="4">
<b c="5"></b>
</y>
m - ltx element
json - Object
*/
encodeMessageToXml (msg, obj) {
// attributes is optional
function encodeObject (m, json) {
for (var name in json) {
var value = json[name]
if (name == null) {
// nop
} else if (value.constructor === Object) {
encodeObject(m.c(name), value)
} else if (value.constructor === Array) {
encodeArray(m.c(name), value)
} else {
m.setAttribute(name, value)
}
}
}
2015-11-07 22:12:48 +01:00
function encodeArray (m, array) {
m.setAttribute('isArray', 'true')
for (var e of array) {
if (e.constructor === Object) {
encodeObject(m.c('array-element'), e)
} else {
encodeArray(m.c('array-element'), e)
}
}
}
2015-11-07 22:12:48 +01:00
if (obj.constructor === Object) {
encodeObject(msg.c('y', { xmlns: 'http://y.ninja/connector-stanza' }), obj)
} else if (obj.constructor === Array) {
encodeArray(msg.c('y', { xmlns: 'http://y.ninja/connector-stanza' }), obj)
} else {
throw new Error("I can't encode this json!")
}
}
}
2015-11-07 22:12:48 +01:00
Y.AbstractConnector = AbstractConnector
}