Example 14.1
@Slf4j
public class Example14_1 {
public static void main(String[] args) {
Mono
.justOrEmpty(null)
.subscribe(data -> {},
error -> {},
() -> log.info("# onComplete"));
}
}
Example 14.2
/**
* fromIterable 예제
*/
@Slf4j
public class Example14_2 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.coins)
.subscribe(coin ->
log.info("coin 명: {}, 현재가: {}", coin.getT1(), coin.getT2())
);
}
}
Example 14.3
@Slf4j
public class Example14_3 {
public static void main(String[] args) {
Flux
.fromStream(() -> SampleData.coinNames.stream())
.filter(coin -> coin.equals("BTC") || coin.equals("ETH"))
.subscribe(data -> log.info("{}", data));
}
}
Example 14.4
/**
* range 예제
*/
@Slf4j
public class Example14_4 {
public static void main(String[] args) {
Flux
.range(5, 10)
.subscribe(data -> log.info("{}", data));
}
}
Example 14.5
/**
* range 예제
*/
@Slf4j
public class Example14_5 {
public static void main(String[] args) {
Flux
.range(7, 5)
.map(idx -> SampleData.btcTopPricesPerYear.get(idx))
.subscribe(tuple -> log.info("{}'s {}", tuple.getT1(), tuple.getT2()));
}
}
Example 14.6
/**
* defer 예제
*/
@Slf4j
public class Example14_6 {
public static void main(String[] args) throws InterruptedException {
log.info("# start: {}", LocalDateTime.now());
Mono<LocalDateTime> justMono = Mono.just(LocalDateTime.now());
Mono<LocalDateTime> deferMono = Mono.defer(() ->
Mono.just(LocalDateTime.now()));
Thread.sleep(2000);
justMono.subscribe(data -> log.info("# onNext just1: {}", data));
deferMono.subscribe(data -> log.info("# onNext defer1: {}", data));
Thread.sleep(2000);
justMono.subscribe(data -> log.info("# onNext just2: {}", data));
deferMono.subscribe(data -> log.info("# onNext defer2: {}", data));
}
}
Example 14.7
/**
* defer 예제
*/
@Slf4j
public class Example14_7 {
public static void main(String[] args) throws InterruptedException {
log.info("# start: {}", LocalDateTime.now());
Mono
.just("Hello")
.delayElement(Duration.ofSeconds(3))
.switchIfEmpty(sayDefault())
// .switchIfEmpty(Mono.defer(() -> sayDefault()))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3500);
}
private static Mono<String> sayDefault() {
log.info("# Say Hi");
return Mono.just("Hi");
}
}
Example 14.8
/**
* using 예제
*/
@Slf4j
public class Example14_8 {
public static void main(String[] args) {
Path path = Paths.get("D:\\resources\\using_example.txt");
Flux
.using(() -> Files.lines(path), Flux::fromStream, Stream::close)
.subscribe(log::info);
}
}
Example 14.9
/**
* generate 예제
*/
@Slf4j
public class Example14_9 {
public static void main(String[] args) {
Flux
.generate(() -> 0, (state, sink) -> {
sink.next(state);
if (state == 10)
sink.complete();
return ++state;
})
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.10
/**
* generate 예제
*/
@Slf4j
public class Example14_10 {
public static void main(String[] args) {
final int dan = 3;
Flux
.generate(() -> Tuples.of(dan, 1), (state, sink) -> {
sink.next(state.getT1() + " * " +
state.getT2() + " = " + state.getT1() * state.getT2());
if (state.getT2() == 9)
sink.complete();
return Tuples.of(state.getT1(), state.getT2() + 1);
}, state -> log.info("# 구구단 {}단 종료!", state.getT1()))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.11
/**
* generate 예제
*/
@Slf4j
public class Example14_11 {
public static void main(String[] args) {
Map<Integer, Tuple2<Integer, Long>> map =
SampleData.getBtcTopPricesPerYearMap();
Flux
.generate(() -> 2019, (state, sink) -> {
if (state > 2021) {
sink.complete();
} else {
sink.next(map.get(state));
}
return ++state;
})
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.12
/**
* create 예제
* - pull 방식
*/
@Slf4j
public class Example14_12 {
static int SIZE = 0;
static int COUNT = -1;
final static List<Integer> DATA_SOURCE = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
public static void main(String[] args) {
log.info("# start");
Flux.create((FluxSink<Integer> sink) -> {
sink.onRequest(n -> {
try {
Thread.sleep(1000L);
for (int i = 0; i < n; i++) {
if (COUNT >= 9) {
sink.complete();
} else {
COUNT++;
sink.next(DATA_SOURCE.get(COUNT));
}
}
} catch (InterruptedException e) {}
});
sink.onDispose(() -> log.info("# clean up"));
}).subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(2);
}
@Override
protected void hookOnNext(Integer value) {
SIZE++;
log.info("# onNext: {}", value);
if (SIZE == 2) {
request(2);
SIZE = 0;
}
}
@Override
protected void hookOnComplete() {
log.info("# onComplete");
}
});
}
}
Example 14.13
/**
* create 예제
* - push 방식
*/
@Slf4j
public class Example14_13 {
public static void main(String[] args) throws InterruptedException {
CryptoCurrencyPriceEmitter priceEmitter = new CryptoCurrencyPriceEmitter();
Flux.create((FluxSink<Integer> sink) ->
priceEmitter.setListener(new CryptoCurrencyPriceListener() {
@Override
public void onPrice(List<Integer> priceList) {
priceList.stream().forEach(price -> {
sink.next(price);
});
}
@Override
public void onComplete() {
sink.complete();
}
}))
.publishOn(Schedulers.parallel())
.subscribe(
data -> log.info("# onNext: {}", data),
error -> {},
() -> log.info("# onComplete"));
Thread.sleep(3000L);
priceEmitter.flowInto();
Thread.sleep(2000L);
priceEmitter.complete();
}
}
Example 14.14
/**
* create 예제
* - Backpressure 전략 적용
*/
@Slf4j
public class Example14_14 {
static int start = 1;
static int end = 4;
public static void main(String[] args) throws InterruptedException {
Flux.create((FluxSink<Integer> emitter) -> {
emitter.onRequest(n -> {
log.info("# requested: " + n);
try {
Thread.sleep(500L);
for (int i = start; i <= end; i++) {
emitter.next(i);
}
start += 4;
end += 4;
} catch (InterruptedException e) {}
});
emitter.onDispose(() -> {
log.info("# clean up");
});
}, FluxSink.OverflowStrategy.DROP)
.subscribeOn(Schedulers.boundedElastic())
.publishOn(Schedulers.parallel(), 2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
Example 14.15
/**
* filter 예제
*/
@Slf4j
public class Example14_15 {
public static void main(String[] args) {
Flux
.range(1, 20)
.filter(num -> num % 2 != 0)
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.16
/**
* filter 예제
*/
@Slf4j
public class Example14_16 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT2() > 20_000_000)
.subscribe(data -> log.info(data.getT1() + ":" + data.getT2()));
}
}
Example 14.17
/**
* filterWhen 예제
*/
@Slf4j
public class Example14_17 {
public static void main(String[] args) throws InterruptedException {
Map<CovidVaccine, Tuple2<CovidVaccine, Integer>> vaccineMap =
getCovidVaccines();
Flux
.fromIterable(SampleData.coronaVaccineNames)
.filterWhen(vaccine -> Mono
.just(vaccineMap.get(vaccine).getT2() >= 3_000_000)
.publishOn(Schedulers.parallel()))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(1000);
}
}
Example 14.18
/**
* skip 예제
*/
@Slf4j
public class Example14_18 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.skip(2)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(5500L);
}
}
Example 14.19
/**
* skip 예제
*/
@Slf4j
public class Example14_19 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofMillis(300))
.skip(Duration.ofSeconds(1))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2000L);
}
}
Example 14.20
/**
* skip 예제
*/
@Slf4j
public class Example14_20 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT2() >= 20_000_000)
.skip(2)
.subscribe(tuple -> log.info("{}, {}", tuple.getT1(), tuple.getT2()));
}
}
Example 14.21
/**
* take 예제
*/
@Slf4j
public class Example14_21 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(3)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(4000L);
}
}
Example 14.22
/**
* take 예제
*/
@Slf4j
public class Example14_22 {
public static void main(String[] args) throws InterruptedException {
Flux
.interval(Duration.ofSeconds(1))
.take(Duration.ofMillis(2500))
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(3000L);
}
}
Example 14.23
/**
* takeLast 예제
*/
@Slf4j
public class Example14_23 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeLast(2)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
Example 14.24
/**
* takeUntil 예제
*/
@Slf4j
public class Example14_24 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeUntil(tuple -> tuple.getT2() > 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
Example 14.25
/**
* takeWhile 예제
*/
@Slf4j
public class Example14_25 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.takeWhile(tuple -> tuple.getT2() < 20_000_000)
.subscribe(tuple -> log.info("# onNext: {}, {}",
tuple.getT1(), tuple.getT2()));
}
}
Example 14.26
/**
* next 예제
*/
@Slf4j
public class Example14_26 {
public static void main(String[] args) {
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.next()
.subscribe(tuple -> log.info("# onNext: {}, {}", tuple.getT1(), tuple.getT2()));
}
}
Example 14.27
/**
* map 예제
*/
@Slf4j
public class Example14_27 {
public static void main(String[] args) {
Flux
.just("1-Circle", "3-Circle", "5-Circle")
.map(circle -> circle.replace("Circle", "Rectangle"))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.28
/**
* map 예제
*/
@Slf4j
public class Example14_28 {
public static void main(String[] args) {
final double buyPrice = 50_000_000;
Flux
.fromIterable(SampleData.btcTopPricesPerYear)
.filter(tuple -> tuple.getT1() == 2021)
.doOnNext(data -> log.info("# doOnNext: {}", data))
.map(tuple -> calculateProfitRate(buyPrice, tuple.getT2()))
.subscribe(data -> log.info("# onNext: {}%", data));
}
private static double calculateProfitRate(final double buyPrice, Long topPrice) {
return (topPrice - buyPrice) / buyPrice * 100;
}
}
Example 14.29
/**
* flatMap 예제
*/
@Slf4j
public class Example14_29 {
public static void main(String[] args) {
Flux
.just("Good", "Bad")
.flatMap(feeling -> Flux
.just("Morning", "Afternoon", "Evening")
.map(time -> feeling + " " + time))
.subscribe(log::info);
}
}
Example 14.30
/**
* flatMap 예제
*/
@Slf4j
public class Example14_30 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(2, 8)
.flatMap(dan -> Flux
.range(1, 9)
.publishOn(Schedulers.parallel())
.map(n -> dan + " * " + n + " = " + dan * n))
.subscribe(log::info);
Thread.sleep(100L);
}
}
Example 14.31
/**
* concat 예제
*/
@Slf4j
public class Example14_31 {
public static void main(String[] args) {
Flux
.concat(Flux.just(1, 2, 3), Flux.just(4, 5))
.subscribe(data -> log.info("# onNext: {}", data));
}
}
Example 14.32
/**
* concat 예제
*/
@Slf4j
public class Example14_32 {
public static void main(String[] args) {
Flux
.concat(
Flux.fromIterable(getViralVector()),
Flux.fromIterable(getMRNA()),
Flux.fromIterable(getSubunit()))
.subscribe(data -> log.info("# onNext: {}", data));
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getViralVector() {
return SampleData.viralVectorVaccines;
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getMRNA() {
return SampleData.mRNAVaccines;
}
private static List<Tuple2<SampleData.CovidVaccine, Integer>> getSubunit() {
return SampleData.subunitVaccines;
}
}
Example 14.33
/**
* merge 예제
*/
@Slf4j
public class Example14_33 {
public static void main(String[] args) throws InterruptedException {
Flux
.merge(
Flux.just(1, 2, 3, 4).delayElements(Duration.ofMillis(300L)),
Flux.just(5, 6, 7).delayElements(Duration.ofMillis(500L))
)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2000L);
}
}
Example 14.34
/**
* merge 예제
*/
@Slf4j
public class Example14_34 {
public static void main(String[] args) throws InterruptedException {
String[] usaStates = {
"Ohio", "Michigan", "New Jersey", "Illinois", "New Hampshire",
"Virginia", "Vermont", "North Carolina", "Ontario", "Georgia"
};
Flux
.merge(getMeltDownRecoveryMessage(usaStates))
.subscribe(log::info);
Thread.sleep(2000L);
}
private static List<Mono<String>> getMeltDownRecoveryMessage(String[] usaStates) {
List<Mono<String>> messages = new ArrayList<>();
for (String state : usaStates) {
messages.add(SampleData.nppMap.get(state));
}
return messages;
}
}
Example 14.35
/**
* zip 예제
*/
@Slf4j
public class Example14_35 {
public static void main(String[] args) throws InterruptedException {
Flux
.zip(
Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L))
)
.subscribe(tuple2 -> log.info("# onNext: {}", tuple2));
Thread.sleep(2500L);
}
}
Example 14.36
/**
* zip 예제
*/
@Slf4j
public class Example14_36 {
public static void main(String[] args) throws InterruptedException {
Flux
.zip(
Flux.just(1, 2, 3).delayElements(Duration.ofMillis(300L)),
Flux.just(4, 5, 6).delayElements(Duration.ofMillis(500L)),
(n1, n2) -> n1 * n2
)
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(2500L);
}
}
Example 14.37
/**
* zip 예제
*/
@Slf4j
public class Example14_37 {
public static void main(String[] args) throws InterruptedException {
getInfectedPersonsPerHour(10, 21)
.subscribe(tuples -> {
Tuple3<Tuple2, Tuple2, Tuple2> t3 = (Tuple3) tuples;
int sum = (int) t3.getT1().getT2() +
(int) t3.getT2().getT2() + (int) t3.getT3().getT2();
log.info("# onNext: {}, {}", t3.getT1().getT1(), sum);
});
}
private static Flux getInfectedPersonsPerHour(int start, int end) {
return Flux.zip(
Flux.fromIterable(SampleData.seoulInfected)
.filter(t2 -> t2.getT1() >= start && t2.getT1() <= end),
Flux.fromIterable(SampleData.incheonInfected)
.filter(t2 -> t2.getT1() >= start && t2.getT1() <= end),
Flux.fromIterable(SampleData.suwonInfected)
.filter(t2 -> t2.getT1() >= start && t2.getT1() <= end)
);
}
}
Example 14.38
/**
* and 예제
*/
@Slf4j
public class Example14_38 {
public static void main(String[] args) throws InterruptedException {
Mono
.just("Task 1")
.delayElement(Duration.ofSeconds(1))
.doOnNext(data -> log.info("# Mono doOnNext: {}", data))
.and(
Flux
.just("Task 2", "Task 3")
.delayElements(Duration.ofMillis(600))
.doOnNext(data -> log.info("# Flux doOnNext: {}", data))
)
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error),
() -> log.info("# onComplete")
);
Thread.sleep(5000);
}
}
Example 14.39
/**
* and 예제
*/
@Slf4j
public class Example14_39 {
public static void main(String[] args) throws InterruptedException {
restartApplicationServer()
.and(restartDBServer())
.subscribe(
data -> log.info("# onNext: {}", data),
error -> log.error("# onError:", error),
() -> log.info("# sent an email to Administrator: " +
"All Servers are restarted successfully")
);
Thread.sleep(6000L);
}
private static Mono<String> restartApplicationServer() {
return Mono
.just("Application Server was restarted successfully.")
.delayElement(Duration.ofSeconds(2))
.doOnNext(log::info);
}
private static Publisher<String> restartDBServer() {
return Mono
.just("DB Server was restarted successfully.")
.delayElement(Duration.ofSeconds(4))
.doOnNext(log::info);
}
}
Example 14.40
/**
* collectList 예제
*/
@Slf4j
public class Example14_40 {
public static void main(String[] args) {
Flux
.just("...", "---", "...")
.map(code -> transformMorseCode(code))
.collectList()
.subscribe(list -> log.info(list.stream().collect(Collectors.joining())));
}
public static String transformMorseCode(String morseCode) {
return SampleData.morseCodeMap.get(morseCode);
}
}
Example 14.41
/**
* collectMap 예제
*/
@Slf4j
public class Example14_41 {
public static void main(String[] args) {
Flux
.range(0, 26)
.collectMap(key -> SampleData.morseCodes[key],
value -> transformToLetter(value))
.subscribe(map -> log.info("# onNext: {}", map));
}
private static String transformToLetter(int value) {
return Character.toString((char) ('a' + value));
}
}
Publisher가 구독 중일 때 트리거되는 동작을 추가할 수 있음
Publisher 가 요청을 수신할 때 트리거되는 동작을 추가할 수 있음
Publisher 가 데이터를 emit 할 때 트리거되는 동작을 추가할 수 있음
Publisher 가 성공적으로 완료되었을때 트리거되는 동작을 추가할 수 있음
Publisher 가 에러가 발생한 상태로 종료되었을 때 트리거되는 동작 추가할 수 있음
Publisher 가 취소되었을 때 트리거되는 동작을 추가할 수 있다
Publisher 가 성공적으로 완료되었을 때 또는 에러가 발생한 상태로 종료되었을때 트리거되는 공작을 추가할 수 있다
Publisher 가 데이터를 emit 할 때 성공적으로 완료되었을 때 에러가 발생한 상태가 종료되었을 때 트리거되는 동작을 추가할 수 있다.
Upstream 에 있는 전체 Operator 체인의 동작 중에서 오퍼레이터에 의해 폐기되는 요소를 조건부로 정리할 수 있다.
DownStream 을 성공적으로 완료한 직후 또는 에러가 발생하여 퍼블리셔가 종료된 직후에 트리거되는 동작을 추가할 수 있다.
Publisher 가 구독되기 전에 트리거되는 동작을 추가할 수 있다.
에러를 포함해서 어떤 이유이든 간에 퍼블리셔가 종료된 후 트리거되는 동작을 추가할 수 있다.
Example 14.43
/**
* error 처리 예제
* - error Operator
* - 명시적으로 error 이벤트를 발생시켜야 하는 경우
*/
@Slf4j
public class Example14_43 {
public static void main(String[] args) {
Flux
.range(1, 5)
.flatMap(num -> {
if ((num * 2) % 3 == 0) {
return Flux.error(
new IllegalArgumentException("Not allowed multiple of 3"));
} else {
return Mono.just(num * 2);
}
})
.subscribe(data -> log.info("# onNext: {}", data),
error -> log.error("# onError: ", error));
}
}
Example 14.44
/**
* error 처리 예제
* - error Operator
* - 명시적으로 error 이벤트를 발생시켜야 하는 경우
* - flatMap처럼 Inner Sequence가 존재하는 경우 체크 예외 발생 시 Flux로 래핑해서 onError Signal을 전송할 수 있다.
*/
@Slf4j
public class Example14_44 {
public static void main(String[] args) {
Flux
.just('a', 'b', 'c', '3', 'd')
.flatMap(letter -> {
try {
return convert(letter);
} catch (DataFormatException e) {
return Flux.error(e);
}
})
.subscribe(data -> log.info("# onNext: {}", data),
error -> log.error("# onError: ", error));
}
private static Mono<String> convert(char ch) throws DataFormatException {
if (!Character.isAlphabetic(ch)) {
throw new DataFormatException("Not Alphabetic");
}
return Mono.just("Converted to " + Character.toUpperCase(ch));
}
}
Example 14.45
/**
* error 처리 예제
* - onErrorReturn Operator
* - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
* - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
*/
@Slf4j
public class Example14_45 {
public static void main(String[] args) {
getBooks()
.map(book -> book.getPenName().toUpperCase())
.onErrorReturn("No pen name")
.subscribe(log::info);
}
public static Flux<Book> getBooks() {
return Flux.fromIterable(SampleData.books);
}
}
Example 14.46
/**
* error 처리 예제
* - onErrorReturn Operator
* - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, default value로 대체해서 emit하고자 할 경우
* - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
*/
@Slf4j
public class Example14_46 {
public static void main(String[] args) {
getBooks()
.map(book -> book.getPenName().toUpperCase())
.onErrorReturn(NullPointerException.class, "no pen name")
.onErrorReturn(IllegalFormatException.class, "Illegal pen name")
.subscribe(log::info);
}
public static Flux<Book> getBooks() {
return Flux.fromIterable(SampleData.books);
}
}
Example 14.47
/**
* error 처리 예제
* - onErrorResume Operator
* - 예외가 발생했을 때, error 이벤트를 발생시키지 않고, 대체 Publisher로 데이터를 emit하고자 할 경우
* - try ~ catch 문의 경우, catch해서 return default value 하는 것과 같다.
*/
@Slf4j
public class Example14_47 {
public static void main(String[] args) {
final String keyword = "DDD";
getBooksFromCache(keyword)
.onErrorResume(error -> getBooksFromDatabase(keyword))
.subscribe(data -> log.info("# onNext: {}", data.getBookName()),
error -> log.error("# onError: ", error));
}
public static Flux<Book> getBooksFromCache(final String keyword) {
return Flux
.fromIterable(SampleData.books)
.filter(book -> book.getBookName().contains(keyword))
.switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
}
public static Flux<Book> getBooksFromDatabase(final String keyword) {
List<Book> books = new ArrayList<>(SampleData.books);
books.add(new Book("DDD: Domain Driven Design",
"Joy", "ddd-man", 35000, 200));
return Flux
.fromIterable(books)
.filter(book -> book.getBookName().contains(keyword))
.switchIfEmpty(Flux.error(new NoSuchBookException("No such Book")));
}
private static class NoSuchBookException extends RuntimeException {
NoSuchBookException(String message) {
super(message);
}
}
}
Example 14.48
/**
* error 처리 예제
* - onErrorContinue Operator
* - 예외가 발생했을 때, 예외를 발생시킨 데이터를 건너뛰고 Upstream에서 emit된 다음 데이터를
* 처리한다.
*/
@Slf4j
public class Example14_48 {
public static void main(String[] args) {
Flux
.just(1, 2, 4, 0, 6, 12)
.map(num -> 12 / num)
.onErrorContinue((error, num) ->
log.error("error: {}, num: {}", error, num))
.subscribe(data -> log.info("# onNext: {}", data),
error -> log.error("# onError: ", error));
}
}
Example 14.49
/**
* error 처리 예제
* - retry Operator
* - 에러가 발생했을 때, 지정된 횟수만큼 Sequence를 다시 구독한다.
*/
@Slf4j
public class Example14_49 {
public static void main(String[] args) throws InterruptedException {
final int[] count = {1};
Flux
.range(1, 3)
.delayElements(Duration.ofSeconds(1))
.map(num -> {
try {
if (num == 3 && count[0] == 1) {
count[0]++;
Thread.sleep(1000);
}
} catch (InterruptedException e) {}
return num;
})
.timeout(Duration.ofMillis(1500))
.retry(1)
.subscribe(data -> log.info("# onNext: {}", data),
(error -> log.error("# onError: ", error)),
() -> log.info("# onComplete"));
Thread.sleep(7000);
}
}
Example 14.50
/**
* error 처리 예제
* - retry Operator
* - 에러가 발생했을 때, 지정된 횟수만큼 Sequence를 다시 구독한다.
*/
@Slf4j
public class Example14_50 {
public static void main(String[] args) throws InterruptedException {
getBooks()
.collect(Collectors.toSet())
.subscribe(bookSet -> bookSet.stream()
.forEach(book -> log.info("book name: {}, price: {}",
book.getBookName(), book.getPrice())));
Thread.sleep(12000);
}
private static Flux<Book> getBooks() {
final int[] count = {0};
return Flux
.fromIterable(SampleData.books)
.delayElements(Duration.ofMillis(500))
.map(book -> {
try {
count[0]++;
if (count[0] == 3) {
Thread.sleep(2000);
}
} catch (InterruptedException e) {
}
return book;
})
.timeout(Duration.ofSeconds(2))
.retry(1)
.doOnNext(book -> log.info("# getBooks > doOnNext: {}, price: {}",
book.getBookName(), book.getPrice()));
}
}
Example 14.51
/**
* 시간 측정 예제
* - elapsed Operator
* - emit된 데이터 사이의 경과된 시간을 측정한다.
* - emit된 첫번째 데이터는 onSubscribe Signal과 첫번째 데이터 사이의 시간을 기준으로 측정한다.
* - 측정된 시간 단위는 milliseconds이다.
*/
@Slf4j
public class Example14_51 {
public static void main(String[] args) throws InterruptedException {
Flux
.range(1, 5)
.delayElements(Duration.ofSeconds(1))
.elapsed()
.subscribe(data -> log.info("# onNext: {}, time: {}",
data.getT2(), data.getT1()));
Thread.sleep(6000);
}
}
Example 14.52
/**
* 시간 측정 예제
* - elapsed Operator
* - emit된 데이터 사이의 경과된 시간을 측정한다.
* - emit된 첫번째 데이터는 onSubscribe Signal과 첫번째 데이터 사이의 시간을 기준으로 측정한다.
* - 측정된 시간 단위는 milliseconds이다.
*/
@Slf4j
public class Example14_52 {
public static void main(String[] args) {
URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
.host("worldtimeapi.org")
.port(80)
.path("/api/timezone/Asia/Seoul")
.build()
.encode()
.toUri();
RestTemplate restTemplate = new RestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
Mono.defer(() -> Mono.just(
restTemplate
.exchange(worldTimeUri,
HttpMethod.GET,
new HttpEntity<String>(headers),
String.class)
)
)
.repeat(4)
.elapsed()
.map(response -> {
DocumentContext jsonContext =
JsonPath.parse(response.getT2().getBody());
String dateTime = jsonContext.read("$.datetime");
return Tuples.of(dateTime, response.getT1());
})
.subscribe(
data -> log.info("now: {}, elapsed time: {}", data.getT1(), data.getT2()),
error -> log.error("# onError:", error),
() -> log.info("# onComplete")
);
}
}
Example 14.53
/**
* split 예제
* - window(maxSize) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다.
* - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다.
* - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_53 {
public static void main(String[] args) {
Flux.range(1, 11)
.window(3)
.flatMap(flux -> {
log.info("======================");
return flux;
})
.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(Integer value) {
log.info("# onNext: {}", value);
request(2);
}
});
}
}
Example 14.54
/**
* split 예제
* - window(maxSize) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize의 숫자만큼의 데이터를 포함하는 새로운 Flux로 분할한다.
* - 새롭게 생성되는 Flux를 윈도우(Window)라고 한다.
* - 마지막 윈도우가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_54 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.monthlyBookSales2021)
.window(3)
.flatMap(flux -> MathFlux.sumInt(flux))
.subscribe(new BaseSubscriber<>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(2);
}
@Override
protected void hookOnNext(Integer value) {
log.info("# onNext: {}", value);
request(2);
}
});
}
}
Example 14.55
/**
* split 예제
* - buffer(maxSize) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자만큼의 데이터를 List 버퍼로 한번에 emit한다.
* - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_55 {
public static void main(String[] args) {
Flux.range(1, 95)
.buffer(10)
.subscribe(buffer -> log.info("# onNext: {}", buffer));
}
}
Example 14.56
/**
* split 예제
* - bufferTimeout(maxSize, maxTime) Operator
* - Upstream에서 emit되는 첫 번째 데이터부터 maxSize 숫자 만큼의 데이터 또는 maxTime 내에 emit된 데이터를 List 버퍼로 한번에 emit한다.
* - maxSize나 maxTime에서 먼저 조건에 부합할때까지 emit된 데이터를 List 버퍼로 emit한다.
* - 마지막 버퍼가 포함하는 데이터는 maxSize보다 작거나 같다.
*/
@Slf4j
public class Example14_56 {
public static void main(String[] args) {
Flux
.range(1, 20)
.map(num -> {
try {
if (num < 10) {
Thread.sleep(100L);
} else {
Thread.sleep(300L);
}
} catch (InterruptedException e) {}
return num;
})
.bufferTimeout(3, Duration.ofMillis(400L))
.subscribe(buffer -> log.info("# onNext: {}", buffer));
}
}
Example 14.57
/**
* split 예제
* - groupBy(keyMapper) Operator
* - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
* - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
*/
@Slf4j
public class Example14_57 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.books)
.groupBy(book -> book.getAuthorName())
.flatMap(groupedFlux ->
groupedFlux
.map(book -> book.getBookName() +
"(" + book.getAuthorName() + ")")
.collectList()
)
.subscribe(bookByAuthor ->
log.info("# book by author: {}", bookByAuthor));
}
}
Example 14.58
/**
* split 예제
* - groupBy(keyMapper, valueMapper) Operator
* - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
* - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
* - valueMapper를 추가로 전달해서 그룹화 되어 emit되는 데이터의 값을 미리 다른 값으로 변경할 수 있다.
*/
@Slf4j
public class Example14_58 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.books)
.groupBy(book ->
book.getAuthorName(),
book -> book.getBookName() + "(" + book.getAuthorName() + ")")
.flatMap(groupedFlux -> groupedFlux.collectList())
.subscribe(bookByAuthor ->
log.info("# book by author: {}", bookByAuthor));
}
}
Example 14.59
/**
* split 예제
* - groupBy() Operator
* - emit되는 데이터를 key를 기준으로 그룹화 한 GroupedFlux를 리턴한다.
* - 그룹화 된 GroupedFlux로 그룹별 작업을 할 수 있다.
* - 저자 명으로 된 도서의 가격
*/
@Slf4j
public class Example14_59 {
public static void main(String[] args) {
Flux.fromIterable(SampleData.books)
.groupBy(book -> book.getAuthorName())
.flatMap(groupedFlux ->
Mono
.just(groupedFlux.key())
.zipWith(
groupedFlux
.map(book ->
(int)(book.getPrice() * book.getStockQuantity() * 0.1))
.reduce((y1, y2) -> y1 + y2),
(authorName, sumRoyalty) ->
authorName + "'s royalty: " + sumRoyalty)
)
.subscribe(log::info);
}
}
Subscriber 가 구독을 하면 Upstream 에서 emit 된 데이터가 모든 구독자에게 멀티캐스팅 됨
Example 14.60
/**
* multicast 예제
* - publish() Operator
* - 다수의 Subscriber와 Flux를 공유한다.
* - 즉, Cold Sequence를 Hot Sequence로 변환한다.
* - connect()가 호출 되기 전까지는 데이터를 emit하지 않는다.
*/
@Slf4j
public class Example14_60 {
public static void main(String[] args) throws InterruptedException {
ConnectableFlux<Integer> flux =
Flux
.range(1, 5)
.delayElements(Duration.ofMillis(300L))
.publish();
Thread.sleep(500L);
flux.subscribe(data -> log.info("# subscriber1: {}", data));
Thread.sleep(200L);
flux.subscribe(data -> log.info("# subscriber2: {}", data));
flux.connect();
Thread.sleep(1000L);
flux.subscribe(data -> log.info("# subscriber3: {}", data));
Thread.sleep(2000L);
}
}
Example 14.61
/**
* multicast 예제
* - publish() Operator
* - 다수의 Subscriber와 Flux를 공유한다.
* - 즉, Cold Sequence를 Hot Sequence로 변환한다.
* - connect()가 호출 되기 전까지는 데이터를 emit하지 않는다.
*/
@Slf4j
public class Example14_61 {
private static ConnectableFlux<String> publisher;
private static int checkedAudience;
static {
publisher =
Flux
.just("Concert part1", "Concert part2", "Concert part3")
.delayElements(Duration.ofMillis(300L))
.publish();
}
public static void main(String[] args) throws InterruptedException {
checkAudience();
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 1 is watching {}", data));
checkedAudience++;
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 2 is watching {}", data));
checkedAudience++;
checkAudience();
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 3 is watching {}", data));
Thread.sleep(1000L);
}
public static void checkAudience() {
if (checkedAudience >= 2) {
publisher.connect();
}
}
}
Example 14.62
/**
* multicast 예제
* - autoConnect() Operator
* - 다수의 Subscriber와 Flux를 공유한다.
* - 즉, Cold Sequence를 Hot Sequence로 변환한다.
* - 파라미터로 입력한 숫자만큼의 구독이 발생하는 시점에 connect()가 자동으로 호출된다.
*/
@Slf4j
public class Example14_62 {
public static void main(String[] args) throws InterruptedException {
Flux<String> publisher =
Flux
.just("Concert part1", "Concert part2", "Concert part3")
.delayElements(Duration.ofMillis(300L))
.publish()
.autoConnect(2);
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 1 is watching {}", data));
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 2 is watching {}", data));
Thread.sleep(500L);
publisher.subscribe(data -> log.info("# audience 3 is watching {}", data));
Thread.sleep(1000L);
}
}
Example 14.63
/**
* multicast 예제
* - refCount() Operator
* - 다수의 Subscriber와 Flux를 공유한다.
* - 즉, Cold Sequence를 Hot Sequence로 변환한다.
* - 파라미터로 입력한 숫자만큼의 구독이 발생하는 시점에 connect()가 자동으로 호출된다.
* - 모든 구독이 취소되면 Upstream 소스와의 연결을 해제한다.
*/
@Slf4j
public class Example14_63 {
public static void main(String[] args) throws InterruptedException {
Flux<Long> publisher =
Flux
.interval(Duration.ofMillis(500))
.publish().autoConnect(1);
// .publish().refCount(1);
Disposable disposable =
publisher.subscribe(data -> log.info("# subscriber 1: {}", data));
Thread.sleep(2100L);
disposable.dispose();
publisher.subscribe(data -> log.info("# subscriber 2: {}", data));
Thread.sleep(2500L);
}
}