۲۹ شهریور ۱۴۰۳

Techboy

اخبار و اطلاعات روز تکنولوژی

برنامه نویسی واکنشی با RxJava

ReactiveX یکی از شناخته شده ترین چارچوب ها برای برنامه نویسی واکنشی است و RxJava پیاده سازی مبتنی بر جاوا آن است. بیایید ببینیم چه کاری می توانیم با RxJava انجام دهیم.

ReactiveX یکی از شناخته شده ترین چارچوب ها برای برنامه نویسی واکنشی است و RxJava پیاده سازی مبتنی بر جاوا آن است. بیایید ببینیم چه کاری می توانیم با RxJava انجام دهیم.

برنامه‌نویسی واکنش‌گرا الگوی عملکردی و لایه‌های برنامه‌نویسی پیچیده را در قابلیت‌های بزرگ می‌گیرد. این قابلیت ها امکان استفاده از معنایی عملکردی-مانند را در معماری برنامه ها فراهم می کند. ReactiveX یکی از قوی‌ترین پروژه‌ها در دنیای واکنش‌گرا است که مجموعه‌ای از مشخصات مشترک را برای پیاده‌کننده‌های زبان ارائه می‌کند. این مقاله یک کاوش عملی از RxJava، پیاده‌سازی جاوا ReactiveX است.

شروع به کار با RxJava

برای آزمایش RxJava، برنامه‌ای در خط فرمان می‌نویسیم که جریان رویداد عمومی را که توسط CoinCap این جریان رویداد یک WebSocket API را ارائه می‌کند، که مانند آتش‌خانه‌ای از رویدادهای با قالب JSON برای هر تراکنش در طیف گسترده‌ای از مبادلات رمزنگاری است. ما به سادگی با گرفتن این رویدادها و چاپ آنها در کنسول شروع می کنیم. سپس برای نشان دادن قابلیت‌های RxJava، هندلینگ پیچیده‌تری را اضافه می‌کنیم.

فهرست ۱ ما را با کهن الگوی شروع سریع Maven شروع می کند، که داربست را برای برنامه آزمایشی ما فراهم می کند.


mvn archetype:generate -DgroupId=com.infoworld -DartifactId=rxjava -DarchetypeArtifactId=maven-archetype-quickstart

اکنون یک داربست پروژه ساده داریم که در فهرست /rxjava ذخیره شده است. ما می‌توانیم pom.xml را تغییر دهیم تا وابستگی‌هایی را که نیاز داریم را شامل شود. ما همچنین نسخه جاوا برنامه را همانطور که در لیست ۲ نشان داده شده است تنظیم کردیم.


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.infoworld</groupId>
  <artifactId>rxjava</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>rxjava</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>16</source>
          <target>16</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
      </dependency>
    <dependency>
        <groupId>io.reactivex.rxjava2</groupId>
        <artifactId>rxjava</artifactId>
        <version>2.2.21</version>
    </dependency>
<dependency>
        <groupId>com.squareup.okhttp3</groupId>
        <artifactId>okhttp</artifactId>
        <version>4.9.1</version>
    </dependency>
    <!-- JSON library for parsing GitHub API response -->
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.8.9</version>
    </dependency>
  </dependencies>
</project>

برای تأیید اینکه کارها کار می کنند، تایپ کنید: $ mvn clean install exec:java -Dexec.mainClass="com.infoworld.App". این دستور باید به خروجی کلاسیک “Hello World” منجر شود.

اکنون، کد ویژگی اصلی بیرون کشیدن رویدادها از نقطه پایانی WebSocket و نمایش آنها در کنسول را اضافه می کنیم. می توانید این کد را در فهرست ۳ ببینید.


package com.infoworld;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

public class App {
  public static void main(String[] args) {
    String websocketUrl = "wss://ws.coincap.io/trades/binance";
    OkHttpClient client = new OkHttpClient();
    Request request = new Request.Builder().url(websocketUrl).build();

    Observable<String> observable = Observable.create(emitter -> {
      WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
        @Override
        public void onOpen(WebSocket webSocket, okhttp3.Response response) {
          // WebSocket connection is open
        }
        @Override
        public void onMessage(WebSocket webSocket, String text) {
          emitter.onNext(text); // Emit received message
        }
        @Override
        public void onMessage(WebSocket webSocket, ByteString bytes)           {
        // Handle binary message if needed
        }
        @Override
        public void onClosing(WebSocket webSocket, int code, String reason) {
          webSocket.close(code, reason);
        }
        @Override
        public void onClosed(WebSocket webSocket, int code, String reason) {
          emitter.onComplete(); // WebSocket connection is closed
        }
        @Override
        public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
          emitter.onError(t); // WebSocket connection failure
        }
      });
    // Dispose WebSocket connection when the observer is disposed
    emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
    });
    observable
      .subscribeOn(Schedulers.io())
      .subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {
        // No-op
      }
     @Override
     public void onNext(String event) {
       // Process each event here
       System.out.println(event);
     }
     @Override
     public void onError(Throwable e) {
       e.printStackTrace();
     }

     @Override
     public void onComplete() {
       System.out.println("Completed");
     }
   });

   // Wait indefinitely (or use another mechanism to keep the program running)
   try {
      Thread.sleep(Long.MAX_VALUE);
   } catch (InterruptedException e) {
     e.printStackTrace();
   }
  }
}

اگر این برنامه را اجرا کنید، یک خروجی خط به خط از رویدادهای JSON با یک رویداد در هر خط دریافت خواهید کرد. برای از بین بردن آن، Ctrl/Command-c را فشار دهید.

یک دروازه API با استفاده از YARP در ASP.NET Core بسازید

مدل‌سازی جریان‌های رویداد

فهرست ۳ به ما نگاه خوبی به برخی اصول RxJava می دهد. ما با OkHttpClient به نقطه پایانی فشار binance (wss://ws.coincap.io/trades/binance) اتصال می‌یابیم که مصرف WebSocket API را آسان می‌کند. (به اسناد OkHttpClient مراجعه کنید.)

هنگامی که اتصال را باز کردیم، یک Observable جدید ایجاد می کنیم. قابل مشاهده نوع اصلی برای انتشار رویدادها است، شیئی که می توان آن را تماشا کرد (یا به آن گوش داد). به عبارت دیگر، یک قابل مشاهده نوعی منبع رویداد است و می تواند منابع مختلفی را مدل سازی کند. در این مورد، ما یک منبع جدید با روش Observables.create ایجاد می کنیم، که یک تابع مرتبه بالاتر است که تابعی را با یک آرگومان می پذیرد که آن را emitter.

شیء emitter دارای تمام روش‌های برگشت تماس ما برای تولید جریان رویداد است. به یک معنا، ما می خواهیم جریان WebSocket را در یک منبع رویداد سفارشی RxJava بپیچیم. برای انجام این کار، ما از WebSocketClient – به ویژه نسخه String onMessage، تماس‌های مورد نظر خود را می‌گیریم و Emitter را فراخوانی می‌کنیم. روش را می خواهیم، ​​در این مورد، emitter.onNext(text);. (همچنین برای رویدادهای چرخه زندگی مانند onClosed و onError تماس‌های برگشتی وجود دارد.) 

آنچه این به ما می دهد یک قابل مشاهده است که می توان آن را در اختیار هرکسی قرار داد که به آن نیاز دارد تا از آنچه در حال وقوع است مطلع شود. این یک روش استاندارد و قابل حمل برای مدل‌سازی جریان رویداد است. علاوه بر این، بسیار چکش‌خوار است، با طیف وسیعی از تغییرات عملکردی، که به‌طور لحظه‌ای آن را خواهید دید.

چگونه امیتر را می بندیم:


emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1000, "Closing WebSocket")));
});. 

بستن امیتر به این روش تضمین می‌کند که پس از اتمام امیتر، اتصال WebSocket را می‌بندیم.

مشاهده رویدادها

برای مشاهده رویدادهایی که از Observable می آیند، از روش subscribe در شی Observable استفاده می کنیم. ما ابتدا .subscribeOn(Schedulers.io()) را فراخوانی می کنیم، که به RxJava می گوید در یک رشته پس زمینه اجرا شود. این یک راه (بسیار) آسان برای به دست آوردن همزمانی چند رشته ای است. RxJava حتی از یک Thread Pool برای شما استفاده می کند.

کار اصلی مدیریت رویدادها با ارسال یک Observer به روش subscribe انجام می‌شود. کلاس Observer روی دیگر سکه Observable است: نوع اساسی هر چیزی که می‌خواهد رویدادها را تماشا کند. در این مورد، ما یک Observer ناشناس جدید (پارامتر شده با یک عمومی) درون خطی در تماس subscribe() ایجاد می کنیم. کار واقعی نوشتن رویداد در کنسول در روش onNext(String) در Observer انجام می‌شود.

دستکاری جریان رویداد

اکنون اجازه دهید چند عملیات را در جریان انجام دهیم. ابتدا از GSON برای تبدیل String به یک شی JSON استفاده می کنیم. سپس، از شی استفاده می‌کنیم تا فقط تراکنش‌هایی را که در بلاک چین Solana هستند فیلتر کنیم.

برای این کار می‌توانیم از روش‌های map() و filter() در کلاس Observable استفاده کنیم. با map()، می‌توانیم رشته‌ها را بر اساس رویداد به رویداد به اشیاء JSON تبدیل کنیم. سپس، ما از JSON در داخل متد filter() استفاده می‌کنیم تا فقط آن رویدادها را با “Solana” به عنوان ارز نگه داریم (در مشخصات CoinCap، کریپتو مورد استفاده در قسمت “پایه” است). می‌توانید این کد جدید را در فهرست ۴ ببینید.


import com.google.gson.Gson;
import com.google.gson.JsonObject;
//… The rest is the same
observable
  .subscribeOn(Schedulers.io())
  .map(event -> {
    Gson gson = new Gson();
    JsonObject jsonObject = gson.fromJson(event, JsonObject.class);
    return jsonObject;
  })
  .filter(jsonObject -> {
    String base = jsonObject.get("base").getAsString();
    return base.equals("solana");
  })
  .subscribe(
    jsonObject -> System.out.println(jsonObject),
    Throwable::printStackTrace,
    () -> System.out.println("Completed")
  );

خواندن نقشه و filter نسبتاً آسان است. map() جریان String ما را به جریان JsonObject تبدیل می‌کند. filter() JsonObject ها را به محض ورود می گیرد. فقط آنهایی را که میدان پایه برابر “solana” دارند، نگه می دارد.

فهرست ۴ همچنین اضافه بار متفاوتی از روش subscribe() را به ما نشان می دهد. به جای یک نمونه Observer، این یکی سه آرگومان می گیرد: توابع onNext، onError و onComplete. همین کار را می کند. همچنین یک نسخه تک آرگومان وجود دارد که فقط کنترل کننده onNext را می گیرد.

همچنین، توجه داشته باشید که map و filter همان عملیات سبک عملکردی هستند که ما از جریان‌های جاوا و زبان‌های دیگر مانند جاوا اسکریپت می‌شناسیم و دوست داریم. اما اکنون، می‌توانیم آن‌ها را در طیف وسیعی از منابع رویداد اعمال کنیم. در واقع، ما می‌توانیم این عملیات را برای هر چیزی که بتوان با Observers و Observables کنترل کرد، اعمال کرد.

نتیجه گیری

برنامه نویسی واکنشی در RxJava قدرت جدی را در یک نحو انعطاف پذیر در دستان شما قرار می دهد. در شرایط مختلف می توان از آن استفاده کرد. همانطور که دیدیم، در برخورد با منابع داده های جریانی مانند CoinCap API بسیار مفید است. توانایی عبور جریان رویدادها به عنوان اشیا یک انتزاع مهم در نرم افزار مدرن است. هر توسعه دهنده ای باید در مورد آن بداند. می‌توانید منبع کامل برنامه نمونه را در GitHub پیدا کنید.