Wrzucam kod mojego cache:
K1 - typ zgloszenia (jest ich. ok 2)
K2 - nazwa okna czasowego (np. minutowe, godzinne itd. jest ich ok. 8)
@RequiredArgsConstructor
public class OpenedWindowCache<K1, K2, V> implements WindowCache<K1, K2, V> {
private final ConcurrentMap<K1, ConcurrentMap<K2, OpenedWindowCache.Values<V>>> cache = new ConcurrentHashMap<>();
private final ToLongFunction<V> timestampExtractor;
private final Clock clock;
@Override
public boolean updateCache(K1 k1, K2 k2, V v) {
long vTimestamp = timestampExtractor.applyAsLong(v);
long currentTimestamp = clock.millis();
if (vTimestamp < currentTimestamp) {
return false;
}
ConcurrentMap<K2, OpenedWindowCache.Values<V>> cachedWindows = cache.getOrDefault(k1, new ConcurrentHashMap<>());
OpenedWindowCache.Values<V> cachedValues = cachedWindows.computeIfAbsent(k2, key -> new Values<>(timestampExtractor));
cachedValues.add(v);
cachedWindows.put(k2, cachedValues);
cache.put(k1, cachedWindows);
return true;
}
@Override
public Map<K2, V> getCachedValue(K1 k) {
long currentTimestamp = clock.millis();
return cache.get(k).entrySet().stream()
.collect(Collectors.toMap(key -> key.getKey(),
v -> v.getValue().getValue(currentTimestamp),
(a, b) -> b));
}
@Override
public List<Tuple<K1, Map<K2, V>>> getAllCachedValues() {
return cache.keySet().stream()
.map(key -> Tuple.of(key, getCachedValue(key)))
.collect(Collectors.toList());
}
private static class Values<V> {
private ToLongFunction<V> timestampExtractor;
private SortedByTimestampLinkedList<V> values;
public Values(ToLongFunction<V> timestampExtractor) {
this.timestampExtractor = timestampExtractor;
this.values = new SortedByTimestampLinkedList<>(timestampExtractor);
}
synchronized V getValue(long currentTimestamp) {
V result;
SortedByTimestampLinkedList<V> newValues = values.getFiltered(value -> timestampExtractor.applyAsLong(value) >= currentTimestamp);
if (newValues.isEmpty()) {
result = values.get(values.size() - 1);
} else {
result = values.get(0);
this.values = newValues;
}
return result;
}
synchronized boolean add(V v) {
return values.add(v);
}
}
}
public class SortedByTimestampLinkedList<E> {
private final List<E> list;
private final ToLongFunction<E> timestampExtractor;
public SortedByTimestampLinkedList(ToLongFunction<E> timestampExtractor) {
this(timestampExtractor, new LinkedList<>());
}
public SortedByTimestampLinkedList(ToLongFunction<E> timestampExtractor, List<E> list) {
this.timestampExtractor = timestampExtractor;
this.list = list;
}
public E get(int i) {
return list.get(i);
}
public SortedByTimestampLinkedList<E> getFiltered(Predicate<E> predicate) {
List<E> filteredList = list.stream()
.filter(predicate)
.collect(Collectors.toList());
return new SortedByTimestampLinkedList<>(timestampExtractor, filteredList);
}
public boolean add(E e) {
if (list.isEmpty()) {
list.add(e);
}
long vTimestamp = timestampExtractor.applyAsLong(e);
for (int i = 0; i < list.size(); i++) {
E idxValue = list.get(i);
long idxTimestamp = timestampExtractor.applyAsLong(idxValue);
if (vTimestamp < idxTimestamp) {
list.add(i, e);
break;
} else if (vTimestamp == idxTimestamp) {
list.set(i, e);
break;
} else if (i + 1 == list.size()) {
list.add(e);
break;
}
}
return true;
}
public int size() {
return list.size();
}
public boolean isEmpty() {
return list.isEmpty();
}
}
użycie:
class SummaryMsgHandler {
...
private final WindowCache<String, String, SummaryTrade> windowCache;
...
@Override
public void accept(SummaryTrade payload, MessageHeaders headers) {
windowCache.updateCache(payload.getCode(), payload.getWindowDurationName(), payload);
notifyUpdated(payload.getCode(),
headers.getOrDefault(KafkaHeaders.RECEIVED_TIMESTAMP, "").toString());
}
...