ReactiveX یکی از شناخته شده ترین چارچوب ها برای برنامه نویسی واکنشی است و RxJava پیاده سازی مبتنی بر جاوا آن است. بیایید ببینیم چه کاری می توانیم با RxJava انجام دهیم.
برنامهنویسی واکنشگرا الگوی عملکردی و لایههای برنامهنویسی پیچیده را در قابلیتهای بزرگ میگیرد. این قابلیت ها امکان استفاده از معنایی عملکردی-مانند را در معماری برنامه ها فراهم می کند. ReactiveX یکی از قویترین پروژهها در دنیای واکنشگرا است که مجموعهای از مشخصات مشترک را برای پیادهکنندههای زبان ارائه میکند. این مقاله یک کاوش عملی از RxJava، پیادهسازی جاوا ReactiveX است.
شروع به کار با RxJava
برای آزمایش RxJava، برنامهای در خط فرمان مینویسیم که جریان رویداد عمومی را که توسط CoinCap این جریان رویداد یک WebSocket API را ارائه میکند، که مانند آتشخانهای از رویدادهای با قالب JSON برای هر تراکنش در طیف گستردهای از مبادلات رمزنگاری است. ما به سادگی با گرفتن این رویدادها و چاپ آنها در کنسول شروع می کنیم. سپس برای نشان دادن قابلیتهای RxJava، هندلینگ پیچیدهتری را اضافه میکنیم.
فهرست ۱ ما را با کهن الگوی شروع سریع Maven شروع می کند، که داربست را برای برنامه آزمایشی ما فراهم می کند.
mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart
اکنون یک داربست پروژه ساده داریم که در فهرست /rxjava
ذخیره شده است. ما میتوانیم pom.xml
را تغییر دهیم تا وابستگیهایی را که نیاز داریم را شامل شود. ما همچنین نسخه جاوا برنامه را همانطور که در لیست ۲ نشان داده شده است تنظیم کردیم.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.infoworld</groupId>
<artifactId>rxjava</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>rxjava</name>
<url>http://maven.apache.org</url>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>16</source>
<target>16</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
<!-- JSON library for parsing GitHub API response -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.9</version>
</dependency>
</dependencies>
</project>
برای تأیید اینکه کارها کار می کنند، تایپ کنید: $ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App"
. این دستور باید به خروجی کلاسیک “Hello World” منجر شود.
اکنون، کد ویژگی اصلی بیرون کشیدن رویدادها از نقطه پایانی WebSocket و نمایش آنها در کنسول را اضافه می کنیم. می توانید این کد را در فهرست ۳ ببینید.
package com.infoworld;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;
public class App {
public static void main(String[] args) {
String websocketUrl = "wss://ws.coincap.io/trades/binance";
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(websocketUrl).build();
Observable<String> observable = Observable.create(emitter -> {
WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, okhttp3.Response response) {
// WebSocket connection is open
}
@Override
public void onMessage(WebSocket webSocket, String text) {
emitter.onNext(text); // Emit received message
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
// Handle binary message if needed
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
emitter.onComplete(); // WebSocket connection is closed
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
emitter.onError(t); // WebSocket connection failure
}
});
// Dispose WebSocket connection when the observer is disposed
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});
observable
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// No-op
}
@Override
public void onNext(String event) {
// Process each event here
System.out.println(event);
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Completed");
}
});
// Wait indefinitely (or use another mechanism to keep the program running)
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
اگر این برنامه را اجرا کنید، یک خروجی خط به خط از رویدادهای JSON با یک رویداد در هر خط دریافت خواهید کرد. برای از بین بردن آن، Ctrl/Command-c را فشار دهید.
مدلسازی جریانهای رویداد
فهرست ۳ به ما نگاه خوبی به برخی اصول RxJava می دهد. ما با OkHttpClient
به نقطه پایانی فشار binance (wss://ws.coincap.io/trades/binance) اتصال مییابیم که مصرف WebSocket API را آسان میکند. (به اسناد OkHttpClient مراجعه کنید.)
هنگامی که اتصال را باز کردیم، یک Observable
جدید ایجاد می کنیم. قابل مشاهده
نوع اصلی برای انتشار رویدادها است، شیئی که می توان آن را تماشا کرد (یا به آن گوش داد). به عبارت دیگر، یک قابل مشاهده
نوعی منبع رویداد است و می تواند منابع مختلفی را مدل سازی کند. در این مورد، ما یک منبع جدید با روش Observables.create
ایجاد می کنیم، که یک تابع مرتبه بالاتر است که تابعی را با یک آرگومان می پذیرد که آن را emitter
.
شیء emitter
دارای تمام روشهای برگشت تماس ما برای تولید جریان رویداد است. به یک معنا، ما می خواهیم جریان WebSocket را در یک منبع رویداد سفارشی RxJava بپیچیم. برای انجام این کار، ما از WebSocketClient
– به ویژه نسخه String
onMessage
، تماسهای مورد نظر خود را میگیریم و Emitter را فراخوانی میکنیم. روش
را می خواهیم، در این مورد، emitter.onNext(text);
. (همچنین برای رویدادهای چرخه زندگی مانند onClosed
و onError
تماسهای برگشتی وجود دارد.)
آنچه این به ما می دهد یک قابل مشاهده
است که می توان آن را در اختیار هرکسی قرار داد که به آن نیاز دارد تا از آنچه در حال وقوع است مطلع شود. این یک روش استاندارد و قابل حمل برای مدلسازی جریان رویداد است. علاوه بر این، بسیار چکشخوار است، با طیف وسیعی از تغییرات عملکردی، که بهطور لحظهای آن را خواهید دید.
چگونه امیتر را می بندیم:
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});.
بستن امیتر به این روش تضمین میکند که پس از اتمام امیتر، اتصال WebSocket را میبندیم.
مشاهده رویدادها
برای مشاهده رویدادهایی که از Observable
می آیند، از روش subscribe
در شی Observable
استفاده می کنیم. ما ابتدا .subscribeOn(Schedulers.io())
را فراخوانی می کنیم، که به RxJava می گوید در یک رشته پس زمینه اجرا شود. این یک راه (بسیار) آسان برای به دست آوردن همزمانی چند رشته ای است. RxJava حتی از یک Thread Pool برای شما استفاده می کند.
کار اصلی مدیریت رویدادها با ارسال یک Observer
به روش subscribe
انجام میشود. کلاس Observer
روی دیگر سکه Observable
است: نوع اساسی هر چیزی که میخواهد رویدادها را تماشا کند. در این مورد، ما یک Observer
ناشناس جدید (پارامتر شده با یک
عمومی) درون خطی در تماس subscribe()
ایجاد می کنیم. کار واقعی نوشتن رویداد در کنسول در روش onNext(String)
در Observer
انجام میشود.
دستکاری جریان رویداد
اکنون اجازه دهید چند عملیات را در جریان انجام دهیم. ابتدا از GSON برای تبدیل String
به یک شی JSON استفاده می کنیم. سپس، از شی استفاده میکنیم تا فقط تراکنشهایی را که در بلاک چین Solana هستند فیلتر کنیم.
برای این کار میتوانیم از روشهای map()
و filter()
در کلاس Observable
استفاده کنیم. با map()
، میتوانیم رشتهها را بر اساس رویداد به رویداد به اشیاء JSON تبدیل کنیم. سپس، ما از JSON در داخل متد filter()
استفاده میکنیم تا فقط آن رویدادها را با “Solana” به عنوان ارز نگه داریم (در مشخصات CoinCap، کریپتو مورد استفاده در قسمت “پایه” است). میتوانید این کد جدید را در فهرست ۴ ببینید.
import com.google.gson.Gson;
import com.google.gson.JsonObject;
//… The rest is the same
observable
.subscribeOn(Schedulers.io())
.map(event -> {
Gson gson = new Gson();
JsonObject jsonObject = gson.fromJson(event, JsonObject.class);
return jsonObject;
})
.filter(jsonObject -> {
String base = jsonObject.get("base").getAsString();
return base.equals("solana");
})
.subscribe(
jsonObject -> System.out.println(jsonObject),
Throwable::printStackTrace,
() -> System.out.println("Completed")
);
خواندن نقشه
و filter
نسبتاً آسان است. map()
جریان String
ما را به جریان JsonObject
تبدیل میکند. filter()
JsonObject
ها را به محض ورود می گیرد. فقط آنهایی را که میدان پایه برابر “solana” دارند، نگه می دارد.
فهرست ۴ همچنین اضافه بار متفاوتی از روش subscribe()
را به ما نشان می دهد. به جای یک نمونه Observer
، این یکی سه آرگومان می گیرد: توابع onNext
، onError
و onComplete
. همین کار را می کند. همچنین یک نسخه تک آرگومان وجود دارد که فقط کنترل کننده onNext
را می گیرد.
همچنین، توجه داشته باشید که map
و filter
همان عملیات سبک عملکردی هستند که ما از جریانهای جاوا و زبانهای دیگر مانند جاوا اسکریپت میشناسیم و دوست داریم. اما اکنون، میتوانیم آنها را در طیف وسیعی از منابع رویداد اعمال کنیم. در واقع، ما میتوانیم این عملیات را برای هر چیزی که بتوان با Observer
s و Observable
s کنترل کرد، اعمال کرد.
نتیجه گیری
برنامه نویسی واکنشی در RxJava قدرت جدی را در یک نحو انعطاف پذیر در دستان شما قرار می دهد. در شرایط مختلف می توان از آن استفاده کرد. همانطور که دیدیم، در برخورد با منابع داده های جریانی مانند CoinCap API بسیار مفید است. توانایی عبور جریان رویدادها به عنوان اشیا یک انتزاع مهم در نرم افزار مدرن است. هر توسعه دهنده ای باید در مورد آن بداند. میتوانید منبع کامل برنامه نمونه را در GitHub پیدا کنید.
پست های مرتبط
برنامه نویسی واکنشی با RxJava
برنامه نویسی واکنشی با RxJava
برنامه نویسی واکنشی با RxJava