Files
yjs/provider/websocket/WebSocketProvider.js

158 lines
3.9 KiB
JavaScript
Raw Normal View History

2018-11-25 03:17:00 +01:00
/**
* @module provider/websocket
*/
2018-10-30 00:51:09 +01:00
/* eslint-env browser */
2018-11-27 14:59:12 +01:00
import * as Y from '../../index.js'
import * as bc from '../../lib/broadcastchannel.js'
2018-10-30 00:51:09 +01:00
const messageSync = 0
const messageAwareness = 1
const reconnectTimeout = 3000
2018-10-30 00:51:09 +01:00
/**
* @param {WebsocketsSharedDocument} doc
* @param {ArrayBuffer} buf
* @return {Y.Encoder}
*/
const readMessage = (doc, buf) => {
const decoder = Y.createDecoder(buf)
const encoder = Y.createEncoder()
const messageType = Y.readVarUint(decoder)
switch (messageType) {
case messageSync:
Y.writeVarUint(encoder, messageSync)
doc.mux(() =>
Y.readSyncMessage(decoder, encoder, doc)
)
break
case messageAwareness:
Y.readAwarenessMessage(decoder, doc)
break
}
return encoder
}
2018-10-30 00:51:09 +01:00
const setupWS = (doc, url) => {
const websocket = new WebSocket(url)
websocket.binaryType = 'arraybuffer'
doc.ws = websocket
websocket.onmessage = event => {
const encoder = readMessage(doc, event.data)
if (Y.length(encoder) > 1) {
2018-10-30 00:51:09 +01:00
websocket.send(Y.toBuffer(encoder))
}
}
websocket.onclose = () => {
doc.ws = null
doc.wsconnected = false
doc.emit('status', {
status: 'connected'
})
setTimeout(setupWS, reconnectTimeout, doc, url)
}
websocket.onopen = () => {
doc.wsconnected = true
doc.emit('status', {
status: 'disconnected'
})
// always send sync step 1 when connected
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
2018-10-30 00:51:09 +01:00
Y.writeSyncStep1(encoder, doc)
websocket.send(Y.toBuffer(encoder))
// force send stored awareness info
doc.setAwarenessField(null, null)
2018-10-30 00:51:09 +01:00
}
}
const broadcastUpdate = (y, transaction) => {
if (transaction.encodedStructsLen > 0) {
2018-10-30 00:51:09 +01:00
y.mux(() => {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
2018-10-30 00:51:09 +01:00
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
const buf = Y.toBuffer(encoder)
if (y.wsconnected) {
y.ws.send(buf)
}
bc.publish(y.url, buf)
2018-10-30 00:51:09 +01:00
})
}
}
class WebsocketsSharedDocument extends Y.Y {
constructor (url) {
super()
this.url = url
2018-10-30 00:51:09 +01:00
this.wsconnected = false
this.mux = Y.createMutex()
this.ws = null
this._localAwarenessState = {}
this.awareness = new Map()
2018-10-30 00:51:09 +01:00
setupWS(this, url)
this.on('afterTransaction', broadcastUpdate)
this._bcSubscriber = data => {
const encoder = readMessage(this, data)
if (Y.length(encoder) > 1) {
this.mux(() => {
bc.publish(url, Y.toBuffer(encoder))
})
}
}
bc.subscribe(url, this._bcSubscriber)
// send sync step1 to bc
this.mux(() => {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeSyncStep1(encoder, this)
bc.publish(url, Y.toBuffer(encoder))
})
2018-10-30 00:51:09 +01:00
}
getLocalAwarenessInfo () {
return this._localAwarenessState
}
getAwarenessInfo () {
return this.awareness
}
setAwarenessField (field, value) {
if (field !== null) {
this._localAwarenessState[field] = value
}
2018-11-09 01:23:16 +01:00
if (this.wsconnected) {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageAwareness)
Y.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }])
const buf = Y.toBuffer(encoder)
this.ws.send(buf)
}
}
2018-10-30 00:51:09 +01:00
}
2018-11-25 03:17:00 +01:00
export class WebsocketProvider {
2018-10-30 00:51:09 +01:00
constructor (url) {
// ensure that url is always ends with /
while (url[url.length - 1] === '/') {
url = url.slice(0, url.length - 1)
}
this.url = url + '/'
/**
* @type {Map<string, WebsocketsSharedDocument>}
*/
this.docs = new Map()
}
/**
* @param {string} name
* @return {WebsocketsSharedDocument}
*/
get (name) {
let doc = this.docs.get(name)
if (doc === undefined) {
doc = new WebsocketsSharedDocument(this.url + name)
}
return doc
}
}