mirror of
https://github.com/yjs/yjs.git
synced 2025-12-25 16:09:30 +01:00
read struct refs as array
This commit is contained in:
@@ -23,9 +23,7 @@ import {
|
||||
|
||||
import * as encoding from 'lib0/encoding.js'
|
||||
import * as decoding from 'lib0/decoding.js'
|
||||
import * as map from 'lib0/map.js'
|
||||
import * as binary from 'lib0/binary.js'
|
||||
import * as iterator from 'lib0/iterator.js'
|
||||
|
||||
/**
|
||||
* @typedef {Map<number, number>} StateMap
|
||||
@@ -43,48 +41,53 @@ export const structRefs = [
|
||||
]
|
||||
|
||||
/**
|
||||
* @param {decoding.Decoder} decoder
|
||||
* @param {number} structsLen
|
||||
* @param {ID} nextID
|
||||
* @param {number} localState next expected clock by nextID.client
|
||||
* @return {IterableIterator<AbstractRef>}
|
||||
* @param {encoding.Encoder} encoder
|
||||
* @param {Array<AbstractStruct>} structs All structs by `client`
|
||||
* @param {number} client
|
||||
* @param {number} clock write structs starting with `ID(client,clock)`
|
||||
*/
|
||||
const createStructReaderIterator = (decoder, structsLen, nextID, localState) => iterator.createIterator(() => {
|
||||
let done = false
|
||||
let value
|
||||
do {
|
||||
if (structsLen === 0) {
|
||||
done = true
|
||||
value = undefined
|
||||
break
|
||||
}
|
||||
const info = decoding.readUint8(decoder)
|
||||
value = new structRefs[binary.BITS5 & info](decoder, nextID, info)
|
||||
nextID = createID(nextID.client, nextID.clock + value.length)
|
||||
structsLen--
|
||||
} while (nextID.clock <= localState) // read until we find something new (check nextID.clock instead because it equals `clock+len`)
|
||||
return { done, value }
|
||||
})
|
||||
const writeStructs = (encoder, structs, client, clock) => {
|
||||
// write first id
|
||||
const startNewStructs = findIndexSS(structs, clock)
|
||||
// write # encoded structs
|
||||
encoding.writeVarUint(encoder, structs.length - startNewStructs)
|
||||
writeID(encoder, createID(client, clock))
|
||||
const firstStruct = structs[startNewStructs]
|
||||
// write first struct with an offset
|
||||
firstStruct.write(encoder, clock - firstStruct.id.clock, 0)
|
||||
for (let i = startNewStructs + 1; i < structs.length; i++) {
|
||||
structs[i].write(encoder, 0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {encoding.Encoder} encoder
|
||||
* @param {Transaction} transaction
|
||||
* @param {decoding.Decoder} decoder
|
||||
* @param {number} numOfStructs
|
||||
* @param {ID} nextID
|
||||
* @return {Array<AbstractRef>}
|
||||
*/
|
||||
export const writeStructsFromTransaction = (encoder, transaction) => writeStructs(encoder, transaction.y.store, transaction.beforeState)
|
||||
const readStructRefs = (decoder, numOfStructs, nextID) => {
|
||||
/**
|
||||
* @type {Array<AbstractRef>}
|
||||
*/
|
||||
const refs = []
|
||||
for (let i = 0; i < numOfStructs; i++) {
|
||||
const info = decoding.readUint8(decoder)
|
||||
const ref = new structRefs[binary.BITS5 & info](decoder, nextID, info)
|
||||
nextID = createID(nextID.client, nextID.clock + ref.length)
|
||||
refs.push(ref)
|
||||
}
|
||||
return refs
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {encoding.Encoder} encoder
|
||||
* @param {StructStore} store
|
||||
* @param {StateMap} _sm
|
||||
*/
|
||||
export const writeStructs = (encoder, store, _sm) => {
|
||||
export const writeClientsStructs = (encoder, store, _sm) => {
|
||||
// we filter all valid _sm entries into sm
|
||||
const sm = new Map()
|
||||
const encoderUserPosMap = map.create()
|
||||
const startMessagePos = encoding.length(encoder)
|
||||
// write diff to pos of end of this message
|
||||
// we use it in readStructs to jump ahead to the end of the message
|
||||
encoding.writeUint32(encoder, 0)
|
||||
_sm.forEach((clock, client) => {
|
||||
// only write if new structs are available
|
||||
if (getState(store, client) > clock) {
|
||||
@@ -99,59 +102,28 @@ export const writeStructs = (encoder, store, _sm) => {
|
||||
// write # states that were updated
|
||||
encoding.writeVarUint(encoder, sm.size)
|
||||
sm.forEach((clock, client) => {
|
||||
// write first id
|
||||
writeID(encoder, createID(client, clock))
|
||||
encoderUserPosMap.set(client, encoding.length(encoder))
|
||||
// write diff to pos where structs are written
|
||||
encoding.writeUint32(encoder, 0)
|
||||
})
|
||||
sm.forEach((clock, client) => {
|
||||
const decPos = encoderUserPosMap.get(client)
|
||||
// fill out diff to pos where structs are written
|
||||
encoding.setUint32(encoder, decPos, encoding.length(encoder) - decPos)
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
const startNewStructs = findIndexSS(structs, clock)
|
||||
// write # encoded structs
|
||||
encoding.writeVarUint(encoder, structs.length - startNewStructs)
|
||||
const firstStruct = structs[startNewStructs]
|
||||
// write first struct with an offset (may be 0)
|
||||
firstStruct.write(encoder, clock - firstStruct.id.clock, 0)
|
||||
for (let i = startNewStructs + 1; i < structs.length; i++) {
|
||||
structs[i].write(encoder, 0, 0)
|
||||
}
|
||||
writeStructs(encoder, store.clients.get(client), client, clock)
|
||||
})
|
||||
// fill out diff to pos of end of message
|
||||
encoding.setUint32(encoder, startMessagePos, encoding.length(encoder) - startMessagePos)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {decoding.Decoder} decoder The decoder object to read data from.
|
||||
* @param {Map<number,number>} localState
|
||||
* @return {Map<number,IterableIterator<AbstractRef>>}
|
||||
* @return {Map<number,Array<AbstractRef>>}
|
||||
*/
|
||||
const readStructReaders = (decoder, localState) => {
|
||||
export const readClientsStructRefs = decoder => {
|
||||
/**
|
||||
* @type {Map<number,IterableIterator<AbstractRef>>}
|
||||
* @type {Map<number,Array<AbstractRef>>}
|
||||
*/
|
||||
const structReaders = new Map()
|
||||
const endOfMessagePos = decoder.pos + decoding.readUint32(decoder)
|
||||
const clientbeforeState = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < clientbeforeState; i++) {
|
||||
const clientRefs = new Map()
|
||||
const numOfStateUpdates = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < numOfStateUpdates; i++) {
|
||||
const numberOfStructs = decoding.readVarUint(decoder)
|
||||
const nextID = readID(decoder)
|
||||
const decoderPos = decoder.pos + decoding.readUint32(decoder)
|
||||
const structReaderDecoder = decoding.clone(decoder, decoderPos)
|
||||
const numberOfStructs = decoding.readVarUint(structReaderDecoder)
|
||||
structReaders.set(nextID.client, createStructReaderIterator(structReaderDecoder, numberOfStructs, nextID, localState.get(nextID.client) || 0))
|
||||
const refs = readStructRefs(decoder, numberOfStructs, nextID)
|
||||
clientRefs.set(nextID.client, refs)
|
||||
}
|
||||
// Decoder is still stuck at creating struct readers.
|
||||
// Jump ahead to end of message so that reading can continue.
|
||||
// We will use the created struct readers for the remaining part of this workflow.
|
||||
decoder.pos = endOfMessagePos
|
||||
return structReaders
|
||||
return clientRefs
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,86 +147,69 @@ const readStructReaders = (decoder, localState) => {
|
||||
*
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
* @param {Map<number,number>} localState
|
||||
* @param {Map<number,IterableIterator<AbstractRef>>} structReaders
|
||||
* @param {Array<AbstractRef>} stack Stack of pending structs waiting for struct dependencies.
|
||||
* Maximum length of stack is structReaders.size.
|
||||
* @param {IterableIterator<IterableIterator<AbstractRef>>} structReaderIterator
|
||||
* @param {IteratorResult<IterableIterator<AbstractRef>>} structReaderIteratorResult
|
||||
*
|
||||
* @todo reimplement without iterators - read everything in arrays instead
|
||||
*/
|
||||
const execStructReaders = (transaction, store, localState, structReaders, stack, structReaderIterator, structReaderIteratorResult) => {
|
||||
const resumeStructIntegration = (transaction, store) => {
|
||||
const stack = store.pendingStack
|
||||
const clientsStructRefs = store.pendingClientsStructRefs
|
||||
// iterate over all struct readers until we are done
|
||||
while (stack.length !== 0 || !structReaderIteratorResult.done) {
|
||||
while (stack.length !== 0 || clientsStructRefs.size !== 0) {
|
||||
if (stack.length === 0) {
|
||||
// stack is empty. We know that there there are more structReaders to be processed
|
||||
const nextStructRes = structReaderIteratorResult.value.next()
|
||||
if (nextStructRes.done) {
|
||||
// current structReaderIteratorResult is empty, use next one
|
||||
structReaderIteratorResult = structReaderIterator.next()
|
||||
} else {
|
||||
stack.push(nextStructRes.value)
|
||||
}
|
||||
} else {
|
||||
const ref = stack[stack.length - 1]
|
||||
const m = ref._missing
|
||||
while (m.length > 0) {
|
||||
const missing = m[m.length - 1]
|
||||
if (!exists(store, missing)) {
|
||||
// get the struct reader that has the missing struct
|
||||
const reader = structReaders.get(missing.client)
|
||||
const nextRef = reader === undefined ? undefined : reader.next().value
|
||||
if (nextRef === undefined) {
|
||||
// This update message causally depends on another update message.
|
||||
// Store current stack and readers in StructStore and resume the computation at another time
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing, structReaderIterator, structReaderIteratorResult })
|
||||
return
|
||||
}
|
||||
stack.push(nextRef)
|
||||
break
|
||||
}
|
||||
ref._missing.pop()
|
||||
}
|
||||
if (m.length === 0) {
|
||||
const localClock = (localState.get(ref.id.client) || 0)
|
||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
||||
if (offset < ref.length) {
|
||||
if (ref.id.clock + offset !== localClock) {
|
||||
// A previous message from this client is missing
|
||||
// Store current stack and readers in StructStore and resume the computation at another time
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing: createID(ref.id.client, localClock), structReaderIterator, structReaderIteratorResult })
|
||||
return
|
||||
}
|
||||
ref.toStruct(transaction.y, store, offset).integrate(transaction)
|
||||
}
|
||||
stack.pop()
|
||||
// take any first struct from clientsStructRefs and put it on the stack
|
||||
const [client, structRefs] = clientsStructRefs.entries().next().value
|
||||
stack.push(structRefs.refs[structRefs.i++])
|
||||
if (structRefs.refs.length === structRefs.i) {
|
||||
clientsStructRefs.delete(client)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (stack.length > 0) {
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id, structReaderIterator, structReaderIteratorResult })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to resume pending struct readers in `store.pendingReaders` while `pendingReaders.nextMissing`
|
||||
* exists.
|
||||
*
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*/
|
||||
const tryResumePendingStructReaders = (transaction, store) => {
|
||||
let resume = true
|
||||
const pendingReaders = store.pendingStructReaders
|
||||
while (resume) {
|
||||
resume = false
|
||||
for (const pendingReader of pendingReaders) {
|
||||
if (exists(store, pendingReader.missing)) {
|
||||
resume = true // found at least one more reader to execute
|
||||
pendingReaders.delete(pendingReader)
|
||||
execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack, pendingReader.structReaderIterator, pendingReader.structReaderIteratorResult)
|
||||
const ref = stack[stack.length - 1]
|
||||
const m = ref._missing
|
||||
const client = ref.id.client
|
||||
const localClock = getState(store, client)
|
||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
||||
if (ref.id.clock + offset !== localClock) {
|
||||
// A previous message from this client is missing
|
||||
// check if there is a pending structRef with a smaller clock and switch them
|
||||
const structRefs = clientsStructRefs.get(client)
|
||||
if (structRefs !== undefined) {
|
||||
const r = structRefs.refs[structRefs.i]
|
||||
if (r.id.clock < ref.id.clock) {
|
||||
// put ref with smaller clock on stack instead and continue
|
||||
structRefs.refs[structRefs.i] = ref
|
||||
stack[stack.length - 1] = r
|
||||
// sort the set because this approach might bring the list out of order
|
||||
structRefs.refs = structRefs.refs.slice(structRefs.i).sort((r1, r2) => r1.id.client - r2.id.client)
|
||||
structRefs.i = 0
|
||||
continue
|
||||
}
|
||||
}
|
||||
// wait until missing struct is available
|
||||
return
|
||||
}
|
||||
while (m.length > 0) {
|
||||
const missing = m[m.length - 1]
|
||||
if (!exists(store, missing)) {
|
||||
const client = missing.client
|
||||
// get the struct reader that has the missing struct
|
||||
const structRefs = clientsStructRefs.get(client)
|
||||
if (structRefs === undefined) {
|
||||
// This update message causally depends on another update message.
|
||||
return
|
||||
}
|
||||
stack.push(structRefs.refs[structRefs.i++])
|
||||
if (structRefs.i === structRefs.refs.length) {
|
||||
clientsStructRefs.delete(client)
|
||||
}
|
||||
break
|
||||
}
|
||||
ref._missing.pop()
|
||||
}
|
||||
if (m.length === 0) {
|
||||
if (offset < ref.length) {
|
||||
ref.toStruct(transaction.y, store, offset).integrate(transaction)
|
||||
}
|
||||
stack.pop()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -271,6 +226,43 @@ export const tryResumePendingDeleteReaders = (transaction, store) => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Map<number,{refs:Array<AbstractRef>,i:number}>} pendingClientsStructRefs
|
||||
* @param {number} client
|
||||
* @param {Array<AbstractRef>} refs
|
||||
*/
|
||||
const setPendingClientsStructRefs = (pendingClientsStructRefs, client, refs) => {
|
||||
pendingClientsStructRefs.set(client, { refs, i: 0 })
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {encoding.Encoder} encoder
|
||||
* @param {Transaction} transaction
|
||||
*/
|
||||
export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.y.store, transaction.beforeState)
|
||||
|
||||
/**
|
||||
* @param {StructStore} store
|
||||
* @param {Map<number, Array<AbstractRef>>} clientsStructsRefs
|
||||
*/
|
||||
const mergeReadStructsIntoPendingReads = (store, clientsStructsRefs) => {
|
||||
const pendingClientsStructRefs = store.pendingClientsStructRefs
|
||||
for (const [client, structRefs] of clientsStructsRefs) {
|
||||
const pendingStructRefs = pendingClientsStructRefs.get(client)
|
||||
if (pendingStructRefs === undefined) {
|
||||
setPendingClientsStructRefs(pendingClientsStructRefs, client, structRefs)
|
||||
} else {
|
||||
// merge into existing structRefs
|
||||
const merged = pendingStructRefs.i > 0 ? pendingStructRefs.refs.slice(pendingStructRefs.i) : pendingStructRefs.refs
|
||||
for (let i = 0; i < structRefs.length; i++) {
|
||||
merged.push(structRefs[i])
|
||||
}
|
||||
pendingStructRefs.i = 0
|
||||
pendingStructRefs.refs = merged.sort((r1, r2) => r1.id.clock - r2.id.clock)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the next Item in a Decoder and fill this Item with the read data.
|
||||
*
|
||||
@@ -283,11 +275,9 @@ export const tryResumePendingDeleteReaders = (transaction, store) => {
|
||||
* @private
|
||||
*/
|
||||
export const readStructs = (decoder, transaction, store) => {
|
||||
const localState = getStates(store)
|
||||
const readers = readStructReaders(decoder, localState)
|
||||
const structReaderIterator = readers.values()
|
||||
execStructReaders(transaction, store, localState, readers, [], structReaderIterator, structReaderIterator.next())
|
||||
tryResumePendingStructReaders(transaction, store)
|
||||
const clientsStructRefs = readClientsStructRefs(decoder)
|
||||
mergeReadStructsIntoPendingReads(store, clientsStructRefs)
|
||||
resumeStructIntegration(transaction, store)
|
||||
tryResumePendingDeleteReaders(transaction, store)
|
||||
}
|
||||
|
||||
@@ -307,6 +297,6 @@ export const readModel = (decoder, transaction, store) => {
|
||||
* @param {Map<number,number>} [targetState] The state of the target that receives the update. Leave empty to write all known structs
|
||||
*/
|
||||
export const writeModel = (encoder, store, targetState = new Map()) => {
|
||||
writeStructs(encoder, store, targetState)
|
||||
writeClientsStructs(encoder, store, targetState)
|
||||
writeDeleteSet(encoder, createDeleteSetFromStructStore(store))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user