I am using RsJS 5 (5.0.1) to cache in Angular 2. It works well.
The meat of the caching function is:
const observable = Observable.defer( () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) ) .publishReplay(1, this.RECACHE_INTERVAL) .refCount().take(1) .do(() => this.console.log('CACHE HIT', cacheKey));
The actualFn
is the this.http.get('/some/resource')
.
Like I say, this is working perfectly for me. The cache is returned from the observable for dureation of the RECACHE_INTERVAL
. If a request is made after that inverval, the actualFn()
will be called.
What I am trying to figure out is when the RECACHE_INTERVAL
expires and the actualFn()
is called -- how to return the last value. There is a space of time between when the RECACHE_INTERVAL
expires and the actualFn()
is replayed that the observable doesn't return a value. I would like to get rid of that gap in time and always return the last value.
I could use a side effect and store the last good value call .next(lastValue)
while waiting for the HTTP response to return, but this seems naive. I would like to use a "RxJS" way, a pure function solution -- if possible.
3 Answers
Answers 1
Almost any complicated logic quickly goes out of control if you use plain rxjs
. I would rather implement custom cache
operator from scratch, you can use this gist as an example.
Answers 2
Your example looks exactly the same as an example is SO Documentation on how to make caching with RxJS 5: Caching HTTP responses
If you modify it a little you can simulate the situation that you describe but I don't think it happens as you think:
See this demo: https://jsbin.com/todude/10/edit?js,console
Notice that I'm trying to get cached results at 1200ms
when the case is invalidated and then at 1300ms
when the previous request is still pending (it takes 200ms
). Both results are received as they should.
This happens because when you subscribe and the publishReplay()
doesn't contain any valid value it won't emit anything and won't complete immediately (thanks to take(1)
) so it needs to subscribe to its source which makes the HTTP requests (this in fact happens in refCount()
).
Then the second subscriber won't receive anything as well and will be added to the array of observers in publishReplay()
. It won't make another subscription because it's already subscribed to its source (refCount()
) and is waiting for response.
So the situation you're describing shouldn't happen I think. Eventually make a demo that demonstrates your problem.
EDIT:
Emitting both invalidated item and fresh items
The following example shows a little different functionality than the linked example. If the cached response is invalidated it'll be emitted anyway and then it receives also the new value. This means the subscriber receives one or two values:
- 1 value: The cached value
- 2 values: The invalidated cached value and then new a fresh value that'll be cached from now on.
The code could look like the following:
let counter = 1; const RECACHE_INTERVAL = 1000; function mockDataFetch() { return Observable.of(counter++) .delay(200); } let source = Observable.defer(() => { const now = (new Date()).getTime(); return mockDataFetch() .map(response => { return { 'timestamp': now, 'response': response, }; }); }); let updateRequest = source .publishReplay(1) .refCount() .concatMap(value => { if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { return Observable.from([value.response, null]); } else { return Observable.of(value.response); } }) .takeWhile(value => value); setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500);
See live demo: https://jsbin.com/ketemi/2/edit?js,console
This prints to console the following output:
Response 0: 1 Response 50: 1 Response 200: 1 Response 1200: 1 Response 1300: 1 Response 1200: 2 Response 1300: 2 Response 1500: 2 Response 3500: 2 Response 3500: 3
Notice 1200
and 1300
received first the old cached value 1
immediately and then another value with the fresh 2
value.
On the other hand 1500
received only the new value because 2
is already cached and is valid.
The most confusing thing is probably why am I using concatMap().takeWhile()
. This is because I need to make sure that the fresh response (not the invalidated) is the last value before sending complete notification and there's probably no operator for that (neither first()
nor takeWhile()
are applicable for this use-case).
Emitting only the current item without waiting for refresh
Yet another use-case could be when we want to emit only the cached value while not waiting for fresh response from the HTTP request.
let counter = 1; const RECACHE_INTERVAL = 1000; function mockDataFetch() { return Observable.of(counter++) .delay(200); } let source = Observable.defer(() => { const now = (new Date()).getTime(); return mockDataFetch() .map(response => { return { 'timestamp': now, 'response': response, }; }); }); let updateRequest = source .publishReplay(1) .refCount() .concatMap((value, i) => { if (i === 0) { if (value.timestamp + RECACHE_INTERVAL > (new Date()).getTime()) { // is cached item valid? return Observable.from([value.response, null]); } else { return Observable.of(value.response); } } return Observable.of(null); }) .takeWhile(value => value); setTimeout(() => updateRequest.subscribe(val => console.log("Response 0:", val)), 0); setTimeout(() => updateRequest.subscribe(val => console.log("Response 50:", val)), 50); setTimeout(() => updateRequest.subscribe(val => console.log("Response 200:", val)), 200); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1200:", val)), 1200); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1300:", val)), 1300); setTimeout(() => updateRequest.subscribe(val => console.log("Response 1500:", val)), 1500); setTimeout(() => updateRequest.subscribe(val => console.log("Response 3500:", val)), 3500); setTimeout(() => updateRequest.subscribe(val => console.log("Response 3800:", val)), 3800);
See live demo: https://jsbin.com/kebapu/2/edit?js,console
This example prints to console:
Response 0: 1 Response 50: 1 Response 200: 1 Response 1200: 1 Response 1300: 1 Response 1500: 2 Response 3500: 2 Response 3800: 3
Notice that both 1200
and 1300
receive value 1
because that's the cached value even though it's invalid now. The first call at 1200
just spawns a new HTTP request without waiting for its response and emits only the cached value. Then at 1500
the fresh value is cached so it's just reemitted. The same applies at 3500
and 3800
.
Note, that the subscriber at 1200
will receive the next
notification immediately but the complete
notification will be sent only after the HTTP request has finished. We need to wait because if we sent complete
right after next
it'd make the chain to dispose its disposables which should also cancel the HTTP request (which is what we definitely don't want to do).
Answers 3
Updated answer:
If always want to use the previous value while a new request is being made then can put another subject in the chain which keeps the most recent value.
You can then repeat the value so it is possible to tell if it came from the cache or not. The subscriber can then filter out the cached values if they are not interested in those.
// Take values while they pass the predicate, then return one more // i.e also return the first value which returned false const takeWhileInclusive = predicate => src => src .flatMap(v => Observable.from([v, v])) .takeWhile((v, index) => index % 2 === 0 ? true : predicate(v, index) ) .filter((v, index) => index % 2 !== 1); // Source observable will still push its values into the subject // even after the subscriber unsubscribes const keepHot = subject => src => Observable.create(subscriber => { src.subscribe(subject); return subject.subscribe(subscriber); }); const cachedRequest = request // Subjects below only store the most recent value // so make sure most recent is marked as 'fromCache' .flatMap(v => Observable.from([ {fromCache: false, value: v}, {fromCache: true, value: v} ])) // Never complete subject .concat(Observable.never()) // backup cache while new request is in progress .let(keepHot(new ReplaySubject(1))) // main cache with expiry time .let(keepHot(new ReplaySubject(1, this.RECACHE_INTERVAL))) .publish() .refCount() .let(takeWhileInclusive(v => v.fromCache)); // Cache will be re-filled by request when there is another subscription after RECACHE_INTERVAL // Subscribers will get the most recent cached value first then an updated value
https://acutmore.jsbin.com/kekevib/8/edit?js,console
Original answer:
Instead of setting a window size on the replaySubject - you could change the source observable to repeat after a delay.
const observable = Observable.defer( () => actualFn().do(() => this.console.log('CACHE MISS', cacheKey)) ) .repeatWhen(_ => _.delay(this.RECACHE_INTERVAL)) .publishReplay(1) .refCount() .take(1) .do(() => this.console.log('CACHE HIT', cacheKey));
The repeatWhen
operator requires RxJs-beta12 or higher https://github.com/ReactiveX/rxjs/blob/master/CHANGELOG.md#500-beta12-2016-09-09
0 comments:
Post a Comment