SyncController.java
package com.oliwier.listmebackend.api;
import com.oliwier.listmebackend.api.dto.CrdtOperationResponse;
import com.oliwier.listmebackend.crdt.IncomingOperation;
import com.oliwier.listmebackend.crdt.SyncEngine;
import com.oliwier.listmebackend.domain.model.Device;
import com.oliwier.listmebackend.domain.model.ShoppingList;
import com.oliwier.listmebackend.domain.model.User;
import com.oliwier.listmebackend.domain.repository.ListDeviceRepository;
import com.oliwier.listmebackend.domain.repository.ShoppingListRepository;
import com.oliwier.listmebackend.identity.CurrentDevice;
import com.oliwier.listmebackend.identity.CurrentUser;
import com.oliwier.listmebackend.notification.WebPushService;
import com.oliwier.listmebackend.websocket.ListSyncBroadcaster;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.server.ResponseStatusException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
/**
* CRDT sync endpoints.
*
* GET /api/lists/{listId}/crdt/clock → current server vector clock for a list
* GET /api/lists/{listId}/crdt/ops → ops the client hasn't seen yet (since given clock)
* POST /api/lists/{listId}/crdt/ops → push ops from client to server
*/
@RestController
@RequestMapping("/api/lists/{listId}/crdt")
@RequiredArgsConstructor
@Slf4j
@Transactional(readOnly = true)
public class SyncController {
private final SyncEngine syncEngine;
private final ListDeviceRepository listDeviceRepository;
private final ShoppingListRepository listRepository;
private final ListSyncBroadcaster broadcaster;
private final SimpMessagingTemplate messaging;
private final WebPushService webPushService;
@GetMapping("/clock")
public Map<String, Long> getClock(@PathVariable UUID listId, @CurrentDevice Device device) {
requireAccess(listId, device);
return syncEngine.getCurrentClock(listId);
}
@GetMapping("/ops")
public List<CrdtOperationResponse> getOps(
@PathVariable UUID listId,
@CurrentDevice Device device,
@RequestParam(required = false) Map<String, Long> since) {
requireAccess(listId, device);
Map<String, Long> clientClock = since != null ? since : Map.of();
return syncEngine.getOperationsSince(listId, clientClock)
.stream()
.map(CrdtOperationResponse::from)
.toList();
}
@PostMapping("/ops")
@ResponseStatus(HttpStatus.NO_CONTENT)
@Transactional
public void pushOps(
@PathVariable UUID listId,
@CurrentDevice Device device,
@CurrentUser User user,
@RequestBody List<IncomingOperation> ops) {
requireAccess(listId, device);
SyncEngine.SyncResult result = syncEngine.applyIncoming(ops, device);
result.applied().forEach(op -> broadcaster.broadcastOp(listId, op));
ShoppingList list = listRepository.findById(listId).orElse(null);
String listName = list != null ? list.getName() : "";
log.info("[Sync] pushOps listId={} user={} applied={} conflicts={}",
listId, user != null ? user.getId() : "null", result.applied().size(), result.conflicts().size());
// Push notification: item(s) added — notify other participants
boolean hasItemCreate = result.applied().stream()
.anyMatch(op -> "ITEM_CREATE".equals(op.getOperationType()));
if (hasItemCreate) {
String firstItemName = result.applied().stream()
.filter(op -> "ITEM_CREATE".equals(op.getOperationType()))
.map(op -> (String) op.getPayload().getOrDefault("name", ""))
.findFirst().orElse("");
var participants = listDeviceRepository.findByListId(listId);
log.info("[Sync] ITEM_CREATE detected — {} participant device(s) on list", participants.size());
participants.stream()
.peek(ld -> log.info("[Sync] participant device={} user={}",
ld.getDevice().getId(),
ld.getDevice().getUser() != null ? ld.getDevice().getUser().getId() : "null"))
.map(ld -> ld.getDevice().getUser())
.filter(u -> u != null && !u.getId().equals(user != null ? user.getId() : null))
.map(User::getId).distinct()
.forEach(uid -> webPushService.sendToUser(uid,
"Neuer Artikel in \u201e" + listName + "\u201c",
firstItemName.isEmpty() ? "Artikel hinzugefügt" : firstItemName,
"/" + uid));
}
// WebSocket + push notification: conflict auto-resolved
if (!result.conflicts().isEmpty()) {
Map<String, Object> notification = new java.util.HashMap<>();
notification.put("type", "CONFLICT_DETECTED");
notification.put("listId", listId.toString());
notification.put("listName", listName);
notification.put("conflictCount", result.conflicts().size());
messaging.convertAndSend("/topic/user/" + user.getId(), (Object) notification);
webPushService.sendToUser(user.getId(),
"Konflikt automatisch gel\u00f6st",
"In \u201e" + listName + "\u201c wurden gleichzeitige \u00c4nderungen zusammengef\u00fchrt.",
"/" + user.getId());
}
}
private void requireAccess(UUID listId, Device device) {
if (!listDeviceRepository.existsByListIdAndDeviceId(listId, device.getId())) {
throw new ResponseStatusException(HttpStatus.FORBIDDEN, "Not a participant of this list");
}
}
}