All files / src/crdt OperationQueue.ts

70.58% Statements 12/17
100% Branches 1/1
66.66% Functions 8/12
73.33% Lines 11/15

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69                          1x 1x           1x   1x     3x         2x               1x             1x         2x                     1x 1x            
import Dexie from 'dexie'
import type { CrdtOperation } from './types'
 
/**
 * Persistent offline operation queue backed by IndexedDB.
 *
 * When the device is offline, mutations are stored here.
 * On reconnect, they are flushed to the server in order.
 */
class OperationQueueDb extends Dexie {
  queue!: Dexie.Table<CrdtOperation & { _synced: 0 | 1 }, string>
 
  constructor() {
    super('listme-opqueue')
    this.version(1).stores({
      queue: 'id, listId, createdAt, _synced',
    })
  }
}
 
const db = new OperationQueueDb()
 
export const OperationQueue = {
  /** Enqueue an operation to be synced later. */
  async enqueue(op: CrdtOperation): Promise<void> {
    await db.queue.put({ ...op, _synced: 0 })
  },
 
  /** Return all unsynced operations for a list, oldest first. */
  async getPending(listId: string): Promise<CrdtOperation[]> {
    return db.queue
      .where('listId').equals(listId)
      .and(op => op._synced === 0)
      .sortBy('createdAt')
  },
 
  /** Return ALL unsynced operations across all lists, oldest first. */
  async getAllPending(): Promise<CrdtOperation[]> {
    return db.queue
      .where('_synced').equals(0)
      .sortBy('createdAt')
  },
 
  /** Mark an operation as successfully synced. */
  async markSynced(opId: string): Promise<void> {
    await db.queue.update(opId, { _synced: 1 })
  },
 
  /** Mark multiple operations as synced (batch). */
  async markAllSynced(opIds: string[]): Promise<void> {
    await db.queue.bulkUpdate(opIds.map(id => ({ key: id, changes: { _synced: 1 as const } })))
  },
 
  /** Update all queued ops that reference tempId to use serverId instead. */
  async remapListId(tempId: string, serverId: string): Promise<void> {
    const ops = await db.queue.where('listId').equals(tempId).toArray()
    await Promise.all(ops.map(op => db.queue.update(op.id, { listId: serverId })))
  },
 
  /** Remove synced operations older than given age (ms) to keep DB small. */
  async pruneOld(maxAgeMs = 7 * 24 * 60 * 60 * 1000): Promise<void> {
    const cutoff = Date.now() - maxAgeMs
    await db.queue
      .where('_synced').equals(1)
      .and(op => op.createdAt < cutoff)
      .delete()
  },
}