Quarkus有几种方法都实现反应式应用程序。这里介绍使用RESTEasy reactive,它是从Quarkus反应式引擎中受益的RESTEasy实现。默认情况下,它调用I/O线程上的HTTP端点。
尽管可以使用传统的RESTEasy,但需要添加quarkus-resteasy-mutiny扩展,并且该方法仍将在辅助线程上调用。因此,尽管它将使用反应式编程,但仍将需要工作线程,这无法达到目的
创建新Quarkus项目的最简单方法是打开终端并运行以下命令:
mvn io.quarkus:quarkus-maven-plugin:1.13.2.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started-reactive \
-DclassName="org.acme.getting.started.ReactiveGreetingResource" \
-Dpath="/hello" \
-Dextensions="resteasy-reactive"
cd getting-started-reactive
./getting-started-reactive:
- Maven结构
- org.acme.quickstart.ReactiveGreetingResource暴露于的资源/hello
- 相关的单元测试
- http://localhost:8080启动应用程序后可访问的登录页面
- 和中的模式的示例Dockerfile文件nativejvmsrc/main/docker
- 应用程序配置文件
反应式JAX-RS资源
在项目创建过程中src/main/java/org/acme/getting/started/ReactiveGreetingResource.java已创建包含以下内容的文件:
package org.acme.getting.started;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
@Path("/hello")
public class ReactiveGreetingResource {
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "Hello RESTEasy Reactive";
}
}
这是一个非常简单的REST端点,将“Hello RESTEasy Reactive”返回到“/Hello”上的请求。
要让Quarkus在辅助线程上调用此方法,请使用注释对其进行io.smallrye.common.annotation.Blocking注释。可以@Blocking在方法上使用类,或者通过对Application类进行注释来为整个应用程序启用它:
import javax.ws.rs.ApplicationPath;
import javax.ws.rs.core.Application;
import io.smallrye.common.annotation.Blocking;
@ApplicationPath("/")
@Blocking
public class RestBlockingApplication extends Application {
}
创建一个ReactiveGreetingService使用以下内容初始化:
package org.acme.getting.started;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
@ApplicationScoped
public class ReactiveGreetingService {
public Uni<String> greeting(String name) {
return Uni.createFrom().item(name)
.onItem().transform(n -> String.format("hello %s", n));
}
}
然后,编辑ReactiveGreetingResource类以匹配以下内容:
package org.acme.getting.started;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import org.reactivestreams.Publisher;
@Path("/hello")
public class ReactiveGreetingResource {
@Inject
ReactiveGreetingService service;
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/greeting/{name}")
public Uni<String> greeting(String name) {
return service.greeting(name);
}
@GET
@Produces(MediaType.TEXT_PLAIN)
public String hello() {
return "hello";
}
}
ReactiveGreetingService类包含产生一个直接的方法Uni。虽然在此示例中,将立即发出结果项,但可以想象任何异步API都会产生Uni。
现在,使用以下命令启动应用程序:
处理流
用流传输多个项目的方式扩展应用程序。这些流可能来自Kafka或任何其他数据源,这里生成定期的问候消息。
在中ReactiveGreetingService,添加以下方法:
public Multi<String> greetings(int count, String name) {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onItem().transform(n -> String.format("hello %s - %d", name, n))
.transform().byTakingFirstItems(count);
}
它每秒生成一条问候消息,并在count消息之后停止。
在ReactiveGreetingResource添加以下方法:
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/greeting/{count}/{name}")
public Multi<String> greetings(int count, String name) {
return service.greetings(count, name);
}
该端点将项目作为JSON数组流式传输到客户端。消息的名称和数量使用路径参数进行参数化。
因此,调用端点会产生类似以下内容:
$ curl http://localhost:8080/hello/greeting/3/neo
["hello neo - 0", "hello neo - 1", "hello neo - 2"]
还可以通过返回来生成服务器发送的事件响应Multi:
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
@RestSseElementType(MediaType.TEXT_PLAIN)
@Path("/stream/{count}/{name}")
public Multi<String> greetingsAsStream(int count, String name) {
return service.greetings(count, name);
}
与前一片段的唯一区别是产生的类型和@RestSseElementType指示每个事件类型的注释。正如@Produces注释定义的那样SERVER_SENT_EVENTS,JAX-RS需要它知道每个(嵌套的)事件的内容类型。
使用以下方法查看结果:
$ curl -N http://localhost:8080/hello/stream/5/neo
data: hello neo - 0
data: hello neo - 1
data: hello neo - 2
data: hello neo - 3
data: hello neo - 4
使用反应式API
Quarkus提供了许多使用Mutty模型的反应式API。这里介绍如何使用反应式PostgreSQL驱动程序以非阻塞和被动方式与数据库交互。
使用以下内容创建新项目:
mvn io.quarkus:quarkus-maven-plugin:1.13.2.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=getting-started-reactive-crud \
-DclassName="org.acme.reactive.crud.FruitResource" \
-Dpath="/fruits" \
-Dextensions="resteasy-reactive,resteasy-reactive-jackson,reactive-pg-client"
cd getting-started-reactive-crud
配置数据源src/main/resources/application.properties:
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=quarkus_test
quarkus.datasource.password=quarkus_test
quarkus.datasource.reactive.url=postgresql://localhost:5432/quarkus_test
myapp.schema.create=true
前3行定义数据源,最后一行将在应用程序中使用,以指示在应用程序初始化时是否插入一些项。
创建实体.创建org.acme.reactive.crud.Fruit使用以下内容初始化:
package org.acme.reactive.crud;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.RowSet;
import io.vertx.mutiny.sqlclient.Tuple;
import java.util.stream.StreamSupport;
public class Fruit {
public Long id;
public String name;
public Fruit() {
// default constructor.
}
public Fruit(String name) {
this.name = name;
}
public Fruit(Long id, String name) {
this.id = id;
this.name = name;
}
public static Multi<Fruit> findAll(PgPool client) {
return client.query("SELECT id, name FROM fruits ORDER BY name ASC").execute()
// Create a Multi from the set of rows:
.onItem().transformToMulti(set -> Multi.createFrom().items(() -> StreamSupport.stream(set.spliterator(), false)))
// For each row create a fruit instance
.onItem().transform(Fruit::from);
}
public static Uni<Fruit> findById(PgPool client, Long id) {
return client.preparedQuery("SELECT id, name FROM fruits WHERE id = $1").execute(Tuple.of(id))
.onItem().transform(RowSet::iterator)
.onItem().transform(iterator -> iterator.hasNext() ? from(iterator.next()) : null);
}
public Uni<Long> save(PgPool client) {
return client.preparedQuery("INSERT INTO fruits (name) VALUES ($1) RETURNING (id)").execute(Tuple.of(name))
.onItem().transform(pgRowSet -> pgRowSet.iterator().next().getLong("id"));
}
public Uni<Boolean> update(PgPool client) {
return client.preparedQuery("UPDATE fruits SET name = $1 WHERE id = $2").execute(Tuple.of(name, id))
.onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}
public static Uni<Boolean> delete(PgPool client, Long id) {
return client.preparedQuery("DELETE FROM fruits WHERE id = $1").execute(Tuple.of(id))
.onItem().transform(pgRowSet -> pgRowSet.rowCount() == 1);
}
private static Fruit from(Row row) {
return new Fruit(row.getLong("id"), row.getString("name"));
}
}
该实体包含一些字段和方法,用于从数据库中查找,更新和删除行。当检索到结果时,这些方法将返回Unis或Multis作为异步发出所产生的项目的方式。请注意,反应式PostgreSQL客户端已经提供了Uni和Multi实例。因此,只需将结果从数据库转换为对业务友好的对象。
为了在应用程序启动时初始化数据库,创建一个名为DBInit,其内容如下的类:
package org.acme.reactive.crud;
import io.quarkus.runtime.StartupEvent;
import io.vertx.mutiny.pgclient.PgPool;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@ApplicationScoped
public class DBInit {
private final PgPool client;
private final boolean schemaCreate;
public DBInit(PgPool client, @ConfigProperty(name = "myapp.schema.create", defaultValue = "true") boolean schemaCreate) {
this.client = client;
this.schemaCreate = schemaCreate;
}
void onStart(@Observes StartupEvent ev) {
if (schemaCreate) {
initdb();
}
}
private void initdb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')").execute())
.await().indefinitely();
}
}
编辑FruitResource类:该资源根据类产生的结果返回Uni和Multi实例Fruit。
package org.acme.reactive.crud;
import java.net.URI;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
@Path("fruits")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class FruitResource {
private final PgPool client;
public FruitResource(PgPool client) {
this.client = client;
}
private void initdb() {
client.query("DROP TABLE IF EXISTS fruits").execute()
.flatMap(r -> client.query("CREATE TABLE fruits (id SERIAL PRIMARY KEY, name TEXT NOT NULL)").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Kiwi')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Durian')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Pomelo')").execute())
.flatMap(r -> client.query("INSERT INTO fruits (name) VALUES ('Lychee')").execute())
.await().indefinitely();
}
@GET
public Multi<Fruit> get() {
return Fruit.findAll(client);
}
@GET
@Path("{id}")
public Uni<Response> getSingle(Long id) {
return Fruit.findById(client, id)
.onItem().transform(fruit -> fruit != null ? Response.ok(fruit) : Response.status(Status.NOT_FOUND))
.onItem().transform(ResponseBuilder::build);
}
@POST
public Uni<Response> create(Fruit fruit) {
return fruit.save(client)
.onItem().transform(id -> URI.create("/fruits/" + id))
.onItem().transform(uri -> Response.created(uri).build());
}
@PUT
@Path("{id}")
public Uni<Response> update(Long id, Fruit fruit) {
return fruit.update(client)
.onItem().transform(updated -> updated ? Status.OK : Status.NOT_FOUND)
.onItem().transform(status -> Response.status(status).build());
}
@DELETE
@Path("{id}")
public Uni<Response> delete(Long id) {
return Fruit.delete(client, id)
.onItem().transform(deleted -> deleted ? Status.NO_CONTENT : Status.NOT_FOUND)
.onItem().transform(status -> Response.status(status).build());
}
}