package dk.dma.ais.store.importer;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import dk.dma.ais.message.AisMessage;
import dk.dma.ais.packet.AisPacket;
import dk.dma.enav.model.geometry.Position;
import dk.dma.enav.model.geometry.PositionTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

* Simple SSTableGenerator
* @author Jens Tuxen
* @author Thomas Borg Salling

public abstract class PositionTrackingSSTableWriter extends SSTableWriter {

static final Logger LOG = LoggerFactory.getLogger(PositionTrackingSSTableWriter.class);

public static final long POSITION_TIMEOUT_MS = TimeUnit.MILLISECONDS.convert(20, TimeUnit.MINUTES);

* A position tracker used to keeping an eye on previously received
* messages.

private final Cache<Integer, PositionTime> tracker = CacheBuilder

protected PositionTrackingSSTableWriter(String outputDir, String keyspace, String schemaDefinition, String insertStatement) {
super(outputDir, keyspace, schemaDefinition, insertStatement);

protected Position targetPosition(AisPacket packet) {
Position targetPosition = null;

AisMessage message = packet.tryGetAisMessage();
if (message != null) {
targetPosition = message.getValidPosition();

if (targetPosition == null) {
// Try to find an estimated position
// Use the last received position message unless the position
// has timed out (POSITION_TIMEOUT_MS)
targetPosition = tracker.asMap().getOrDefault(message.getUserId(), null);
} else {
final long ts = packet.getBestTimestamp();
// Update the tracker with latest position
// but only update the tracker IF the new time is better
tracker.asMap().merge(message.getUserId(), targetPosition.withTime(ts), (a, b) -> a.getTime() > b.getTime() ? a : b);

return targetPosition;

protected static boolean isValid(Position position) {
return position != null && Position.isValid(position.getLatitude(), position.getLongitude());