Processor 인터페이스에는 별도로 구현해야 하는 메서드는 없지만 코드를 보면 구독자와 퍼블리셔 모두를 상속한다는 것을 알 수 있습니다.
2.4 리액티브 스트림즈 관련 용어 정리
1. Signal
구독자와 발행자간 상호간 주고받는 신호를 말합니다.
2. Demand
Subscriber 가 Publisher 에게 요청하는 데이터를 의미합니다.
Publisher 가 아직 Subscriber 에게 전달하지 않았지만, 구독 요청한 데이터를 의미 합니다.
3. Emit
말그대로 데이터를 방출! 하는 과정입니다.
Subscriber 인터페이스에서 onNext 를 통해 데이터를 전달하는 Signal 을 데이터를 emit 한다고 표현합니다.
4. upstream / downstream
코드로 바로 보는 것이 빠릅니다.
publicstaticvoidmain(String[] args){Flux.just(1,2,3,4,5,6).filter(n -> n %2==0).map(n -> n *2).subscribe(System.out.println);}
데이터 스트림의 관점으로 볼 때, 위의 just 호출을 통해 반환된 Flux 는 filter 메서드 호출을 통해 반환된 Flux 가 더 하위에 있기 때문에 filter Flux 는 Downstream 이 되고, filter 입장에서는 just 호출을 통해 반환된 Flux 가 자신보다 상위에 있기 때문에 Upstream 이 됩니다.
5. Sequence
4번의 코드처럼 다양한 연산으로 데이터의 연속 처리 흐름을 정의한 것이라고 생각하면 됩니다. 개인적으로는 kotlin standard library 의 Sequence 를 참고해도 좋다고 생각합니다.
6. Operator
말이 필요 없습니다.
just, filter, map 같은 메서드들을 Operator 라고 부릅니다.
Stream API 나 코틀린의 map filter 등을 떠올리면 바로 감이 잡힐 것입니다.
7. Source
Data Source, Publisher, Source Flux 등 최초에 가장 먼저 생성된 무언가, 즉 데이터 원본이라고 생각하면 좋습니다.
2.5 리액티브 스트림즈의 구현 규칙
Publisher 구현을 위한 주요 기본 규칙
Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다.
Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다.
구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다.
Subscriber 구현을 위한 주요 기본 규칙
Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해 Demand signal을 Publisher에게 보내야 한다.
Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안된다.
Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 signal을 수신한 후 구독이 취소된 것으로 간주해야 한다.
구독이 더 이상 필요하지 않은 경우 Subscribersms Subscriber.cancel()을 호출해야 한다.
Subscriber.onSubscribe()는 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다.
Subscription 구현을 위한 주요 기본 규칙
구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다.
구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다.
구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다.
구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다.
구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것을 허용하지 않는다.
구독은 무제한 수의 request 호출을 지원해야 하고 최대 263-1개의 Demand를 지원해야 한다.