百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Flink中处理维表关联技术实现路径

bigegpt 2025-06-18 19:15 2 浏览

在 Flink 中处理维表关联大体氛围Table SQL Lookup Join 和 DataStream 算子函数,主要技术实现路径:

I. Flink SQL/Table API 中的 Lookup Join

Flink SQL/Table API 提供了 LOOKUP JOIN 语法,专门用于将流式数据与维表(通常是存储在外部系统中的批数据)进行关联。其核心在于通过异步或同步的方式查询维表,避免阻塞流处理。

1. 同步 Lookup Join

实现路径:

  • JDBC Lookup Function (MySQL, PostgreSQL等关系型数据库): Flink 内部提供了 JDBC connector,可以通过配置来连接关系型数据库。当流数据到来时,Flink 会向数据库发送查询请求,同步等待查询结果。
  • HBase Lookup Function: Flink 也提供了 HBase Connector,可以用于与 HBase 进行同步关联。
  • Redis Lookup Function: 虽然没有内置的 Redis Connector 作为 Lookup Source,但可以通过自定义 TableFunctionAsyncTableFunction 来实现同步或异步与 Redis 的关联。

优点:

  • 实现简单: 对于支持的数据库,配置相对简单,直接使用 SQL 语法即可。
  • 数据一致性高: 每次查询都是从最新维表数据获取,保证了数据的新鲜度(在不考虑数据库同步延迟的情况下)。

缺点:

  • 性能瓶颈: 同步查询会阻塞 Flink 算子,如果维表查询延迟较高,或者查询并发量大,会严重影响流处理的吞吐量和延迟。这在处理高吞吐量流数据时是一个显著的缺点。
  • 外部依赖: 强依赖外部数据库的性能和可用性。
  • 不适合高并发场景: 容易导致数据库连接池耗尽或数据库负载过高。

示例代码:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class SyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (这里使用 VALUES 生成一个模拟的流)
        // 实际应用中可以替换为 Kafka, Pulsar, Kinesis 等数据源
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" + // 使用 datagen 连接器模拟数据
            "    'rows-per-second' = '1',\n" + // 每秒生成一条数据
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" + // 模拟关联 Product ID
            "    'fields.product_id.max' = '202',\n" +
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 定义一个维表数据源 (MySQL JDBC Lookup Table)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE products_dim (\n" +
            "    product_id INT,\n" +
            "    product_name STRING,\n" +
            "    category STRING\n" +
            ") WITH (\n" +
            "    'connector' = 'jdbc',\n" +
            "    'url' = 'jdbc:mysql://localhost:3306/testdb',\n" + // 替换为你的 MySQL 地址和数据库名
            "    'table-name' = 'products',\n" +
            "    'username' = 'root',\n" +     // 替换为你的 MySQL 用户名
            "    'password' = 'your_password', \n" + // 替换为你的 MySQL 密码
            "    'lookup.cache.max-rows' = '1000',\n" + // JDBC Lookup 支持缓存
            "    'lookup.cache.ttl' = '10min',\n" +    // 缓存过期时间
            "    'lookup.max-retries' = '3' \n" +       // 查询失败重试次数
            ")"
        );

        // 3. 执行 Lookup Join 查询
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    p.product_name,\n" +
            "    p.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o\n" +
            "JOIN products_dim AS p ON o.product_id = p.product_id" // 简单的等值 JOIN 即可
        );

        // 4. 将结果打印到控制台 (或输出到其他 Sink)
        resultTable.execute().print();

        // Flink Job 会持续运行,直到手动停止
        // env.execute("Sync Lookup Join Example"); // 对于execute().print()不需要再execute()
    }
}

如果你的 MySQL 响应慢,你会发现 Flink 的输出也会变慢,因为它是同步阻塞的

2. 异步 Lookup Join (推荐)

实现路径:

  • 自定义 AsyncTableFunction: 这是实现异步 Lookup Join 的核心和推荐方式。
  • 原理: AsyncTableFunction 允许用户编写异步查询逻辑。当流数据到来时,Flink 会并发地向外部维表发起查询请求,但不会阻塞当前处理线程。查询结果返回后,通过 ResultFuture 回调通知 Flink,将维表数据与流数据进行合并。
  • 适用场景: 适用于任何支持异步客户端的外部存储,例如: Redis: 使用 Jedis 或 Lettuce 等异步客户端。 HBase (Async Client): 使用 HBase 提供的异步客户端。 Cassandra: 使用 DataStax Java Driver。 ClickHouse (异步JDBC或HTTP API): 虽然是关系型数据库,但如果其驱动支持异步,或通过 HTTP API 异步访问,也可以实现。 HTTP API (微服务接口): 调用外部微服务接口获取维表数据。
  • 实现步骤: 实现 AsyncTableFunction 接口。eval 方法中发起异步查询,并传入 ResultFuture 当异步查询返回结果时,调用 ResultFuture.complete() 来通知 Flink。 注册自定义的 AsyncTableFunction 到 Table Environment。 在 SQL 中使用 LATERAL TABLE() 语法进行关联。

优点:

  • 高吞吐量和低延迟: 非阻塞查询,允许多个并发查询同时进行,极大地提高了吞吐量,降低了端到端延迟。
  • 资源利用率高: 充分利用 I/O 资源,避免了线程阻塞。
  • 灵活性强: 可以集成任何支持异步查询的外部存储或服务。
  • 背压能力: Flink 的异步 I/O 组件提供了内置的背压机制,可以根据下游处理能力自动调整并发度,防止外部系统过载。

缺点:

  • 实现复杂度相对较高: 需要编写异步查询逻辑,处理回调和异常。
  • 外部系统支持: 依赖外部系统提供的异步客户端或接口。

示例代码:

首先,我们需要创建一个自定义的 AsyncRedisLookupFunction。

import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.Map;

public class AsyncRedisLookupFunction extends AsyncTableFunction<RowData> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword; // 如果有密码
    private transient JedisPool jedisPool; // transient 表示不参与序列化
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public AsyncRedisLookupFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        super.open(context);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100); // 最大连接数
        poolConfig.setMaxIdle(20);  // 最大空闲连接数
        poolConfig.setMinIdle(5);   // 最小空闲连接数
        poolConfig.setTestOnBorrow(true); // 借用连接时测试
        poolConfig.setTestOnReturn(true); // 归还连接时测试
        poolConfig.setTestWhileIdle(true); // 空闲时测试

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 创建一个线程池来模拟异步查询。
        // 实际生产中,更推荐使用 Lettuce 等真正的异步 Redis 客户端。
        this.executorService = Executors.newFixedThreadPool(
                Runtime.getRuntime().availableProcessors() * 2 // 通常设置为 CPU 核数的两倍
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    // eval 方法接收流中的 join key,并返回一个 CompletableFuture
    // Flink 将在 CompletableFuture 完成时收集结果
    public void eval(CompletableFuture<Collector<RowData>> resultFuture, Integer productId) {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    // 返回一个 RowData,包含维表查询到的字段
                    // 顺序与定义 Table 时声明的字段顺序一致
                    resultFuture.complete(new SimpleCollector<>(
                            Collections.singletonList(GenericRowData.of(
                                    StringData.fromString(productName),
                                    StringData.fromString(category)
                            ))
                    ));
                } else {
                    // 如果未找到,返回空结果
                    resultFuture.complete(new SimpleCollector<>(Collections.emptyList()));
                }
            } catch (Exception e) {
                // 处理异常,可以返回空结果或抛出异常让 Flink 处理
                resultFuture.completeExceptionally(e);
            }
        });
    }

    // 辅助类,用于将 List<RowData> 收集到 Collector 中
    private static class SimpleCollector<T> implements Collector<T> {
        private final java.util.List<T> list;

        public SimpleCollector(java.util.List<T> list) {
            this.list = list;
        }

        @Override
        public void collect(T record) {
            list.add(record); // 实际上这里我们只收集一个结果
        }

        @Override
        public void close() {
            // No-op
        }
    }
}

现在,我们将在 Flink SQL 中注册并使用这个 AsyncRedisLookupFunction。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.logical.utils.LogicalTypeComparators;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.Collections;

import static org.apache.flink.table.api.DataTypes.*;

public class AsyncLookupJoinExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 示例中使用单并行度,但异步 I/O 可以并行

        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        // 1. 定义一个订单数据源 (Datagen 生成模拟流)
        tEnv.executeSql(
            "CREATE TEMPORARY TABLE orders (\n" +
            "    order_id INT,\n" +
            "    product_id INT,\n" +
            "    amount DOUBLE,\n" +
            "    proctime AS PROCTIME() -- 处理时间字段\n" +
            ") WITH (\n" +
            "    'connector' = 'datagen',\n" +
            "    'rows-per-second' = '10',\n" + // 提高生成速度,更明显地展示异步优势
            "    'fields.order_id.min' = '1',\n" +
            "    'fields.order_id.max' = '1000',\n" +
            "    'fields.product_id.min' = '101',\n" +
            "    'fields.product_id.max' = '202',\n" + // 确保 product_id 在 Redis 示例范围内
            "    'fields.amount.min' = '10.0',\n" +
            "    'fields.amount.max' = '1000.0'\n" +
            ")"
        );

        // 2. 注册自定义的 AsyncRedisLookupFunction
        // 构造函数参数: redisHost, redisPort, redisPassword
        tEnv.createTemporarySystemFunction(
            "AsyncRedisProducts",
            new AsyncRedisLookupFunction("localhost", 6379, "") // 替换为你的 Redis 地址、端口和密码
        );

        // 3. 执行 Lookup Join 查询
        // 使用 LATERAL TABLE(FunctionName(lookup_keys)) 语法
        // 注意:AsyncRedisProducts 函数的返回类型决定了 JOIN 后的字段名
        // AsyncRedisLookupFunction 返回的是 RowData,包含 (product_name, category)
        Table resultTable = tEnv.sqlQuery(
            "SELECT\n" +
            "    o.order_id,\n" +
            "    o.product_id,\n" +
            "    T.product_name,\n" + // T 是 LATERAL TABLE 的别名
            "    T.category,\n" +
            "    o.amount\n" +
            "FROM\n" +
            "    orders AS o,\n" + // 注意这里是逗号连接,而不是 JOIN
            "    LATERAL TABLE(AsyncRedisProducts(o.product_id)) AS T(product_name, category)" // 指定返回的字段名
        );

        // 4. 将结果打印到控制台
        resultTable.execute().print();
    }
}

查询 Redis 是异步进行的,不会阻塞主线程。

II. DataStream API 中的维表关联

在 DataStream API 中,没有直接的 LOOKUP JOIN 概念,但可以通过自定义函数和状态管理来实现维表关联。

1. 使用 RichFlatMapFunction/RichMapFunction + 缓存

实现路径:

  • 原理:open() 方法中初始化外部维表连接或加载部分维表数据到内存(如果维表数据量不大)。在 map()flatMap() 方法中,当流数据到来时,首先尝试从本地缓存中查询维表数据。如果缓存未命中,则向外部维表发起同步查询,并将结果更新到缓存中。
  • 缓存策略: LRU Cache: 维护一个固定大小的最近最少使用缓存。 TTL Cache: 给缓存中的数据设置过期时间,定期清理过期数据。
  • 适用场景: 维表数据量不大,可以完全或部分加载到内存中。 对数据新鲜度要求不是极高,允许一定的缓存延迟。 对实时性要求较高,但外部维表查询延迟较低。

优点:

  • 性能提升: 命中缓存可以避免与外部存储的交互,显著提升性能。
  • 降低外部系统压力: 减少对外部维表的查询次数。
  • 实现相对简单: 比异步 I/O 相对容易理解和实现。

缺点:

  • 数据新鲜度问题: 缓存可能导致数据不新鲜,特别是维表数据更新频繁时。需要实现合适的缓存失效和更新机制。
  • 内存消耗: 维表数据如果过大,可能导致内存溢出。
  • 并发瓶颈: 如果缓存未命中,仍需进行同步查询,在高并发下仍可能成为瓶颈。
  • 缺乏内置背压: 需要手动管理与外部系统的交互,没有像异步 I/O 那样内置的背压机制。

示例代码:

RichFlatMapFunction 实现

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class RedisProductCachedFlatMapFunction extends RichFlatMapFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient Map<Integer, ProductInfo> cache; // LRU 缓存
    private final int cacheMaxSize;
    private final long cacheTTL; // 缓存过期时间,单位毫秒

    // 内部类,存储产品信息
    private static class ProductInfo {
        String productName;
        String category;
        long timestamp; // 记录缓存时间,用于TTL

        public ProductInfo(String productName, String category) {
            this.productName = productName;
            this.category = category;
            this.timestamp = System.currentTimeMillis();
        }
    }

    public RedisProductCachedFlatMapFunction(String redisHost, int redisPort, String redisPassword, int cacheMaxSize, long cacheTTL) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
        this.cacheMaxSize = cacheMaxSize;
        this.cacheTTL = cacheTTL;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(50); // 连接数可以适当减少,因为有缓存
        poolConfig.setMaxIdle(10);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        // 实现一个简单的 LRU 缓存,带 TTL
        this.cache = new LinkedHashMap<Integer, ProductInfo>(cacheMaxSize, 0.75f, true) {
            @Override
            protected boolean removeEldestEntry(Map.Entry<Integer, ProductInfo> eldest) {
                // 移除最老或过期的条目
                return size() > cacheMaxSize || (System.currentTimeMillis() - eldest.getValue().timestamp > cacheTTL);
            }
        };
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        super.close();
    }

    @Override
    public void flatMap(Order order, Collector<EnrichedOrder> out) throws Exception {
        ProductInfo productInfo = cache.get(order.productId);
        long currentTime = System.currentTimeMillis();

        // 检查缓存是否存在且未过期
        if (productInfo != null && (currentTime - productInfo.timestamp) <= cacheTTL) {
            // 缓存命中
            out.collect(new EnrichedOrder(order, productInfo.productName, productInfo.category));
        } else {
            // 缓存未命中或已过期,从 Redis 查询
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> redisResult = jedis.hgetAll(key);

                if (redisResult != null && !redisResult.isEmpty()) {
                    String productName = redisResult.get("name");
                    String category = redisResult.get("category");
                    productInfo = new ProductInfo(productName, category);
                    cache.put(order.productId, productInfo); // 更新缓存
                    out.collect(new EnrichedOrder(order, productName, category));
                } else {
                    // 未找到维表数据,选择丢弃或使用默认值
                    // System.err.println("Product ID " + order.productId + " not found in Redis.");
                    // out.collect(new EnrichedOrder(order, "UNKNOWN", "UNKNOWN")); // 示例:使用默认值
                }
            } catch (Exception e) {
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                // out.collect(new EnrichedOrder(order, "ERROR_PRODUCT", "ERROR_CATEGORY")); // 错误时使用默认值
            }
        }
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class DataStreamCachedLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,每个并行度会维护自己的缓存

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 RichFlatMapFunction 进行维表关联,带缓存
        DataStream<EnrichedOrder> enrichedStream = orderStream.flatMap(
                new RedisProductCachedFlatMapFunction(
                        "localhost", 6379, "", // 替换为你的Redis地址和密码
                        1000, // 缓存最大条目数
                        5 * 60 * 1000L // 缓存TTL 5分钟 (5 * 60 * 1000 毫秒)
                )
        );

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Cached Lookup Join Example");
    }
}

观察控制台输出。你会注意到,当缓存生效时,处理速度会非常快。如果产品 ID 命中缓存,则不会访问 Redis。当新的产品 ID 出现或缓存过期时,才会触发对 Redis 的查询。

2. 使用 AsyncDataStream (异步 I/O)

实现路径:

  • 原理: 这是 DataStream API 中实现异步维表关联的推荐方式,与 Table API 的 AsyncTableFunction 原理类似。用户实现 AsyncFunction 接口,在 asyncInvoke() 方法中发起异步查询,并通过 ResultFuture 返回结果。
  • 适用场景: 同步 Lookup Join 的异步版本在 DataStream API 中的实现,适用于对性能、吞吐量和延迟有高要求的场景,并且外部系统支持异步客户端。

优点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的优点: 高吞吐量、低延迟、高资源利用率、灵活性强、内置背压机制。

缺点:

  • 与 Table API 的异步 Lookup Join 类似,具有相同的缺点: 实现复杂度相对较高,依赖外部系统提供的异步客户端或接口。

示例代码:

AsyncFunction 实现

import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.configuration.Configuration;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class RedisProductAsyncFunction extends RichAsyncFunction<Order, EnrichedOrder> {

    private final String redisHost;
    private final int redisPort;
    private final String redisPassword;
    private transient JedisPool jedisPool;
    private transient ExecutorService executorService; // 用于Jedis同步操作的线程池,模拟异步

    public RedisProductAsyncFunction(String redisHost, int redisPort, String redisPassword) {
        this.redisHost = redisHost;
        this.redisPort = redisPort;
        this.redisPassword = redisPassword;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);
        poolConfig.setMaxIdle(20);
        poolConfig.setMinIdle(5);
        poolConfig.setTestOnBorrow(true);
        poolConfig.setTestOnReturn(true);
        poolConfig.setTestWhileIdle(true);

        if (redisPassword != null && !redisPassword.isEmpty()) {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000, redisPassword);
        } else {
            this.jedisPool = new JedisPool(poolConfig, redisHost, redisPort, 2000);
        }

        this.executorService = Executors.newFixedThreadPool(
                getRuntimeContext().getNumberOfParallelSubtasks() * 2 // 每个并行度至少2个线程
        );
    }

    @Override
    public void close() throws Exception {
        if (jedisPool != null) {
            jedisPool.close();
        }
        if (executorService != null) {
            executorService.shutdown();
        }
        super.close();
    }

    @Override
    public void asyncInvoke(Order order, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        executorService.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                String key = "product:" + order.productId;
                Map<String, String> productInfo = jedis.hgetAll(key);

                if (productInfo != null && !productInfo.isEmpty()) {
                    String productName = productInfo.get("name");
                    String category = productInfo.get("category");
                    EnrichedOrder enrichedOrder = new EnrichedOrder(order, productName, category);
                    resultFuture.complete(Collections.singletonList(enrichedOrder));
                } else {
                    // 如果未找到维表数据,可以选择丢弃,或者用默认值填充
                    // 这里选择丢弃,也可以返回一个只包含订单信息的 EnrichedOrder
                    resultFuture.complete(Collections.emptyList());
                }
            } catch (Exception e) {
                // 处理异常,例如打印日志,并返回空列表或原始数据
                System.err.println("Error fetching product info for productId " + order.productId + ": " + e.getMessage());
                resultFuture.complete(Collections.emptyList()); // 发生异常时,丢弃该订单
            }
        });
    }

    // 可选:处理超时,当异步请求在指定时间内未完成时调用
    @Override
    public void timeout(Order input, ResultFuture<EnrichedOrder> resultFuture) throws Exception {
        System.err.println("Async lookup timeout for order: " + input.orderId);
        // 超时时可以选择返回原始数据,或丢弃
        resultFuture.complete(Collections.singletonList(new EnrichedOrder(input, "UNKNOWN", "UNKNOWN")));
    }
}

DataStream Job 代码

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;
import java.util.concurrent.TimeUnit;

public class DataStreamAsyncLookupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2); // 可以设置多个并行度,体验异步I/O的并发性

        // 1. 模拟订单数据流
        DataStream<Order> orderStream = env.addSource(new SourceFunction<Order>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();
            private long orderCounter = 0;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                while (isRunning) {
                    orderCounter++;
                    int productId = random.nextInt(202 - 101 + 1) + 101; // 101到202之间
                    double amount = 10.0 + random.nextDouble() * 990.0;
                    ctx.collect(new Order((int) orderCounter, productId, amount, System.currentTimeMillis()));
                    Thread.sleep(50); // 每50毫秒生成一个订单,模拟高吞吐
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // 2. 使用 AsyncDataStream 进行异步维表关联
        // orderStream: 输入流
        // new RedisProductAsyncFunction(...): 异步函数实例
        // 100: 最大并发异步请求数 (maxConcurrentRequests)
        // 1000: 超时时间 (timeout)
        // TimeUnit.MILLISECONDS: 超时时间单位
        DataStream<EnrichedOrder> enrichedStream = AsyncDataStream.unorderedWait(
                orderStream,
                new RedisProductAsyncFunction("localhost", 6379, ""), // 替换为你的Redis地址和密码
                1000, // 超时时间,例如1000毫秒
                TimeUnit.MILLISECONDS,
                100 // 最大并发请求数,可以调整
        );
        // 也可以使用 orderedWait 保持事件顺序,但通常 unorderedWait 性能更高

        // 3. 打印结果
        enrichedStream.print();

        env.execute("DataStream Async Lookup Join Example");
    }
}

3. 使用 Managed State (Stateful Functions) + 定期更新

实现路径:

  • 原理: 将维表数据作为 Flink Managed State(如 ValueState, MapState)存储在 Flink 内部,通常与 Keyed State 结合使用。通过一个独立的流(或定时任务)定期从外部维表读取最新数据,并更新到 Flink 的 Managed State 中。当主数据流到来时,直接从本地状态中查询维表数据。
  • 更新机制: 全量同步: 定期拉取全量维表数据。 增量同步: 如果维表支持,可以通过 CDC (Change Data Capture) 或消息队列(如 Kafka)订阅维表的变化,将变化数据作为单独的流推送到 Flink,然后更新到 Managed State。
  • 适用场景: 维表数据量大,无法全部加载到每个 TaskManager 的内存中,但仍希望在 Flink 内部实现高性能查询。 对数据新鲜度有一定容忍度,允许维表数据与外部源存在短暂延迟。 需要利用 Flink 的状态管理能力(如状态快照、故障恢复)。

优点:

  • 极高查询性能: 维表数据在 Flink 内部,查询速度极快,避免了网络 I/O 延迟。
  • 故障恢复: 维表数据作为 Managed State,可以随着 Flink 的状态快照和故障恢复机制得到保护。
  • 独立更新: 维表更新与主数据流处理分离,不会阻塞主数据流。
  • 降低外部系统压力: 维表查询不再直接访问外部系统,而是访问 Flink 内部状态。

缺点:

  • 数据新鲜度: 维表数据与外部源存在更新延迟,取决于更新频率和机制。
  • 实现复杂度高: 需要管理状态的更新逻辑,特别是增量更新。
  • 状态管理开销: 维表数据如果非常大,会增加 Flink State 的存储和快照开销。
  • 扩容和再平衡: 如果维表数据是 Keyed State,当 Flink 集群扩容或任务再平衡时,状态迁移可能会带来开销。

总结与选择建议:

实现路径

适用场景

优点

缺点

SQL/Table API - 同步 Lookup Join

简单验证、低吞吐量、对性能要求不高。

实现简单,易于上手。

性能瓶颈,不适合高并发,强依赖外部数据库性能。

SQL/Table API - 异步 Lookup Join

推荐,对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

高吞吐量,低延迟,高资源利用率,内置背压。

实现复杂度相对高,依赖外部系统异步客户端。

DataStream - 缓存 (RichFunction)

维表数据量小,对数据新鲜度有一定容忍,且外部维表查询延迟较低。

命中缓存性能高,降低外部系统压力,实现相对简单。

数据新鲜度问题,内存消耗,无内置背压。

DataStream - AsyncDataStream

对性能、吞吐量和延迟有高要求,外部系统支持异步客户端。

同 Table API 异步 Lookup Join 的优点。

同 Table API 异步 Lookup Join 的缺点。

DataStream - Managed State (定期更新)

维表数据量大,但希望高性能查询,对数据新鲜度有一定容忍,需要 Flink 状态管理。

极高查询性能,故障恢复,独立更新,降低外部系统压力。

数据新鲜度有延迟,实现复杂度高,状态管理开销,扩容开销。

选择建议:

  • 首选异步 I/O: 对于大多数生产环境下的维表关联场景,无论是使用 Flink SQL/Table API 的 LOOKUP JOIN 还是 DataStream API 的 AsyncDataStream,都强烈推荐使用异步 I/O。它能最大程度地提高吞吐量,降低延迟,并有效利用资源。
  • 考虑缓存: 如果维表数据量不大,且对数据新鲜度有一定容忍,可以考虑在 RichFunction 中结合缓存策略。
  • 大型维表或高一致性要求: 对于极其庞大或需要极高查询性能和故障恢复能力的维表,可以考虑将维表数据作为 Flink Managed State 来管理,通过增量或全量同步机制定期更新。
  • SQL/Table API vs. DataStream API: SQL/Table API: 如果业务逻辑可以通过 SQL 或 Table API 表达,且需要快速开发和部署,优先考虑。Lookup Join 语法简洁。 DataStream API: 如果业务逻辑复杂,需要更精细的控制,或者需要与 Flink 的其他高级特性(如事件时间处理、状态编程)深度结合,DataStream API 会提供更大的灵活性。

在实际应用中,通常需要根据维表的数据量、更新频率、数据新鲜度要求、以及对性能、延迟和实现复杂度的权衡来选择最合适的实现路径。

相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...