Sfoglia il codice sorgente

优化Redis Stream消息内存占用;修复敏感数据混合查询异常问题;

Woody 1 mese fa
parent
commit
86995b206a

+ 0 - 43
framework-database/src/main/java/com/chelvc/framework/database/handler/CryptoTypeHandler.java

@@ -1,43 +0,0 @@
-package com.chelvc.framework.database.handler;
-
-import java.sql.CallableStatement;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-
-import com.chelvc.framework.database.context.DatabaseContextHolder;
-import org.apache.ibatis.type.BaseTypeHandler;
-import org.apache.ibatis.type.JdbcType;
-
-/**
- * 加解密字段类型处理实现
- *
- * @author Woody
- * @date 2024/6/7
- */
-public class CryptoTypeHandler extends BaseTypeHandler<String> {
-    @Override
-    public void setNonNullParameter(PreparedStatement ps, int i, String parameter, JdbcType jdbcType)
-            throws SQLException {
-        parameter = DatabaseContextHolder.getDatabaseCipherHandler().encrypt(parameter, true);
-        ps.setString(i, parameter);
-    }
-
-    @Override
-    public String getNullableResult(ResultSet rs, String columnName) throws SQLException {
-        String value = rs.getString(columnName);
-        return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(value, true);
-    }
-
-    @Override
-    public String getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
-        String value = rs.getString(columnIndex);
-        return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(value, true);
-    }
-
-    @Override
-    public String getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
-        String value = cs.getString(columnIndex);
-        return DatabaseContextHolder.getDatabaseCipherHandler().decrypt(value, true);
-    }
-}

+ 127 - 35
framework-database/src/main/java/com/chelvc/framework/database/interceptor/DynamicInvokeInterceptor.java

@@ -1,6 +1,8 @@
 package com.chelvc.framework.database.interceptor;
 
 import java.sql.Connection;
+import java.time.temporal.Temporal;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -11,6 +13,7 @@ import com.baomidou.mybatisplus.core.plugins.InterceptorIgnoreHelper;
 import com.baomidou.mybatisplus.core.toolkit.PluginUtils;
 import com.baomidou.mybatisplus.core.toolkit.StringPool;
 import com.chelvc.framework.common.util.ObjectUtils;
+import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.database.config.DatabaseProperties;
 import com.chelvc.framework.database.context.DatabaseContextHolder;
 import com.chelvc.framework.database.context.TableFieldContext;
@@ -18,7 +21,6 @@ import com.chelvc.framework.database.entity.Creatable;
 import com.chelvc.framework.database.entity.Deletable;
 import com.chelvc.framework.database.entity.Environmental;
 import com.chelvc.framework.database.entity.Updatable;
-import com.chelvc.framework.database.handler.CryptoTypeHandler;
 import com.google.common.collect.Lists;
 import lombok.RequiredArgsConstructor;
 import net.sf.jsqlparser.JSQLParserException;
@@ -30,6 +32,7 @@ import net.sf.jsqlparser.expression.Function;
 import net.sf.jsqlparser.expression.JdbcParameter;
 import net.sf.jsqlparser.expression.NullValue;
 import net.sf.jsqlparser.expression.Parenthesis;
+import net.sf.jsqlparser.expression.StringValue;
 import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
 import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
 import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
@@ -73,8 +76,6 @@ import org.apache.ibatis.plugin.Plugin;
 import org.apache.ibatis.plugin.Signature;
 import org.apache.ibatis.reflection.MetaObject;
 import org.apache.ibatis.reflection.SystemMetaObject;
-import org.apache.ibatis.session.Configuration;
-import org.apache.ibatis.type.StringTypeHandler;
 import org.apache.ibatis.type.TypeHandler;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -89,8 +90,87 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor(onConstructor = @__(@Autowired))
 @Intercepts(@Signature(type = StatementHandler.class, method = "prepare", args = {Connection.class, Integer.class}))
 public class DynamicInvokeInterceptor implements Interceptor {
+    /**
+     * JdbcParameter类型参数查找结果回调接口
+     */
+    private interface JdbcParameterConsumer {
+        /**
+         * 结果信息处理,并返回SQL结构是否变更
+         *
+         * @param table     关联表信息
+         * @param column    关联字段信息
+         * @param parameter 参数表达式
+         * @param condition 条件表达式
+         * @param changing  表达式变更回调函数
+         * @return true/false
+         */
+        boolean consume(Table table, Column column, JdbcParameter parameter, Expression condition,
+                        Consumer<Expression> changing);
+    }
+
     private final DatabaseProperties properties;
 
+    /**
+     * 字符串参数特殊字符转义
+     *
+     * @param value 参数值
+     * @return 参数值
+     */
+    private String escape(String value) {
+        if (StringUtils.isEmpty(value)) {
+            return value;
+        }
+
+        // 判断参数是否需要转义
+        boolean needs = false;
+        int size = value.length();
+        for (int i = 0; i < size; i++) {
+            char c = value.charAt(i);
+            if (c == '\u0000' || c == '\n' || c == '\r' || c == '\u001a' || c == '"' || c == '\'' || c == '\\') {
+                needs = true;
+                break;
+            }
+        }
+        if (!needs) {
+            return value;
+        }
+
+        // 特殊字符转义
+        StringBuilder buffer = new StringBuilder((int) ((double) size * 1.1D));
+        for (int i = 0; i < size; i++) {
+            char c = value.charAt(i);
+            switch (c) {
+                case '\u0000':
+                    buffer.append('\\');
+                    buffer.append('0');
+                    break;
+                case '\n':
+                    buffer.append('\\');
+                    buffer.append('n');
+                    break;
+                case '\r':
+                    buffer.append('\\');
+                    buffer.append('r');
+                    break;
+                case '\u001a':
+                    buffer.append('\\');
+                    buffer.append('Z');
+                    break;
+                case '\'':
+                    buffer.append('\'');
+                    buffer.append('\'');
+                    break;
+                case '\\':
+                    buffer.append('\\');
+                    buffer.append('\\');
+                    break;
+                default:
+                    buffer.append(c);
+            }
+        }
+        return buffer.toString();
+    }
+
     /**
      * 判断字段别名与指定别名是否相同
      *
@@ -106,6 +186,41 @@ public class DynamicInvokeInterceptor implements Interceptor {
         return table == null || Objects.equals(table.getName(), alias.getName());
     }
 
+    /**
+     * 判断是否是元类型参数
+     *
+     * @param parameter 参数
+     * @return true/false
+     */
+    private boolean isMetaParameter(Object parameter) {
+        return parameter == null || parameter instanceof Enum || parameter instanceof Number
+                || parameter instanceof String || parameter instanceof Boolean || parameter instanceof Character
+                || parameter instanceof Date || parameter instanceof Temporal;
+    }
+
+    /**
+     * 获取参数值
+     *
+     * @param bound     绑定SQL
+     * @param parameter 参数值表达式
+     * @return 参数值
+     */
+    private Object getParameterValue(BoundSql bound, JdbcParameter parameter) {
+        Object object = bound.getParameterObject();
+        if (this.isMetaParameter(object)) {
+            return object;
+        }
+        List<ParameterMapping> mappings = bound.getParameterMappings();
+        if (ObjectUtils.isEmpty(mappings)) {
+            return null;
+        }
+        String property = mappings.get(parameter.getIndex() - 1).getProperty();
+        if (bound.hasAdditionalParameter(property)) {
+            return bound.getAdditionalParameter(property);
+        }
+        return SystemMetaObject.forObject(object).getValue(property);
+    }
+
     /**
      * 查找字段的下标位置,如果不存在则返回 -1
      *
@@ -337,24 +452,6 @@ public class DynamicInvokeInterceptor implements Interceptor {
         return null;
     }
 
-    /**
-     * JdbcParameter类型参数查找结果回调接口
-     */
-    private interface JdbcParameterConsumer {
-        /**
-         * 结果信息处理,并返回SQL结构是否变更
-         *
-         * @param table     关联表信息
-         * @param column    关联字段信息
-         * @param parameter 参数表达式
-         * @param condition 条件表达式
-         * @param changing  表达式变更回调函数
-         * @return true/false
-         */
-        boolean consume(Table table, Column column, JdbcParameter parameter, Expression condition,
-                        Consumer<Expression> changing);
-    }
-
     /**
      * 预处理JdbcParameter类型参数,并返回SQL结构是否变更
      *
@@ -811,22 +908,17 @@ public class DynamicInvokeInterceptor implements Interceptor {
             return false;
         }
 
-        // 构建混合模式附加参数
-        TypeHandler<?> handler;
-        if (this.properties.getCrypto().isWritable()) {
-            // 明文参数 Handler
-            handler = DatabaseContextHolder.getTypeHandler(StringTypeHandler.class);
-        } else {
-            // 加密参数 Handler
-            handler = DatabaseContextHolder.getTypeHandler(CryptoTypeHandler.class);
+        // 获取绑定参数值(明文)
+        Object value = this.getParameterValue(bound, parameter);
+        if (!(value instanceof String)) {
+            return false;
         }
-        List<ParameterMapping> mappings = bound.getParameterMappings();
-        String property = mappings.get(parameter.getIndex() - 1).getProperty();
-        Configuration configuration = DatabaseContextHolder.getSessionTemplate().getConfiguration();
-        mappings.add(new ParameterMapping.Builder(configuration, property, handler).build());
-        JdbcParameter extra = new JdbcParameter(mappings.size(), parameter.isUseFixedIndex());
 
-        // 将附附加参数添加到当前SQL
+        // 追加混合查询明文/密文附加参数
+        if (!this.properties.getCrypto().isWritable()) {
+            value = DatabaseContextHolder.getDatabaseCipherHandler().encrypt((String) value, true);
+        }
+        StringValue extra = new StringValue(this.escape((String) value));
         if (condition instanceof InExpression) {
             InExpression in = (InExpression) condition;
             ((ExpressionList) in.getRightItemsList()).getExpressions().add(extra);

+ 6 - 17
framework-kafka/src/main/java/com/chelvc/framework/kafka/context/KafkaContextHolder.java

@@ -32,11 +32,6 @@ import org.springframework.kafka.core.KafkaTemplate;
  * @date 2024/1/30
  */
 public final class KafkaContextHolder {
-    /**
-     * 配置属性
-     */
-    private static volatile KafkaProperties PROPERTIES;
-
     /**
      * 模版实例
      */
@@ -46,19 +41,13 @@ public final class KafkaContextHolder {
     }
 
     /**
-     * 获取配置属性
+     * 获取命名空间
      *
-     * @return 配置属性实例
+     * @return 命名空间
      */
-    private static KafkaProperties getProperties() {
-        if (PROPERTIES == null) {
-            synchronized (KafkaProperties.class) {
-                if (PROPERTIES == null) {
-                    PROPERTIES = ApplicationContextHolder.getBean(KafkaProperties.class);
-                }
-            }
-        }
-        return PROPERTIES;
+    public static String getNamespace() {
+        KafkaProperties properties = ApplicationContextHolder.getBean(KafkaProperties.class, false);
+        return ObjectUtils.ifNull(properties, KafkaProperties::getNamespace);
     }
 
     /**
@@ -87,7 +76,7 @@ public final class KafkaContextHolder {
      * @return 环境隔离标记
      */
     public static String isolate(@NonNull String original) {
-        String namespace = getProperties().getNamespace();
+        String namespace = getNamespace();
         return StringUtils.isEmpty(namespace) ? original : (namespace + original);
     }
 

+ 2 - 2
framework-redis/src/main/java/com/chelvc/framework/redis/config/RedisProperties.java

@@ -40,9 +40,9 @@ public class RedisProperties {
         private int idle = 60;
 
         /**
-         * 消息队列容量
+         * 消息过期时间(秒)
          */
-        private int capacity = 100000;
+        private int expiration = 60 * 60;
     }
 
     /**

+ 20 - 50
framework-redis/src/main/java/com/chelvc/framework/redis/context/RedisStreamHolder.java

@@ -63,14 +63,11 @@ public final class RedisStreamHolder {
     private static final String DELAYING = "delaying";
 
     /**
-     * 新增消息脚本
+     * 新增并移除过期消息脚本
      */
-    private static volatile RedisScript<String> ADD_SCRIPT;
-
-    /**
-     * 消息流配置属性
-     */
-    private static volatile RedisProperties.Stream PROPERTIES;
+    private static final RedisScript<String> XADD_MINID_SCRIPT = new DefaultRedisScript<>(
+            "return redis.call('XADD', KEYS[1], 'MINID', '~', ARGV[1], '*', unpack(ARGV, 2))", String.class
+    );
 
     /**
      * 字符串RedisTemplate实例
@@ -81,44 +78,25 @@ public final class RedisStreamHolder {
     }
 
     /**
-     * 获取新增Stream消息流脚本
+     * 获取消息最小ID
      *
-     * @return Lua脚本
+     * @return 消息ID
      */
-    private static RedisScript<String> getAddScript() {
-        if (ADD_SCRIPT == null) {
-            synchronized (RedisStreamHolder.class) {
-                if (ADD_SCRIPT == null) {
-                    int capacity = getProperties().getCapacity();
-                    if (capacity > 0) {
-                        ADD_SCRIPT = new DefaultRedisScript<>(String.format(
-                                "return redis.call('XADD', KEYS[1], 'MAXLEN', '~', %d, '*', unpack(ARGV))", capacity
-                        ), String.class);
-                    } else {
-                        ADD_SCRIPT = new DefaultRedisScript<>(
-                                "return redis.call('XADD', KEYS[1], '*', unpack(ARGV))", String.class
-                        );
-                    }
-                }
-            }
-        }
-        return ADD_SCRIPT;
+    public static String getMinid() {
+        RedisProperties properties = ApplicationContextHolder.getBean(RedisProperties.class);
+        // 消息过期时间最短1分钟,最长24小时
+        int expiration = Math.min(Math.max(properties.getStream().getExpiration(), 60), 86400);
+        return (System.currentTimeMillis() - expiration * 1000) + "-0";
     }
 
     /**
-     * 获取消息流配置属性
+     * 获取命名空间
      *
-     * @return 配置属性
+     * @return 命名空间
      */
-    private static RedisProperties.Stream getProperties() {
-        if (PROPERTIES == null) {
-            synchronized (RedisProperties.Stream.class) {
-                if (PROPERTIES == null) {
-                    PROPERTIES = ApplicationContextHolder.getBean(RedisProperties.class).getStream();
-                }
-            }
-        }
-        return PROPERTIES;
+    public static String getNamespace() {
+        RedisProperties properties = ApplicationContextHolder.getBean(RedisProperties.class, false);
+        return properties == null ? null : properties.getStream().getNamespace();
     }
 
     /**
@@ -147,15 +125,6 @@ public final class RedisStreamHolder {
         return (RedisTemplate<String, T>) STRING_TEMPLATE;
     }
 
-    /**
-     * 获取MQ消费者空闲时间(秒)
-     *
-     * @return 空闲时间
-     */
-    public static int getIdle() {
-        return getProperties().getIdle();
-    }
-
     /**
      * 环境隔离
      *
@@ -163,7 +132,7 @@ public final class RedisStreamHolder {
      * @return 环境隔离标记
      */
     public static String isolate(@NonNull String original) {
-        String namespace = getProperties().getNamespace();
+        String namespace = getNamespace();
         return StringUtils.isEmpty(namespace) ? original : (namespace + original);
     }
 
@@ -248,6 +217,7 @@ public final class RedisStreamHolder {
      */
     public static void send(@NonNull String topic, @NonNull Object payload, Long delaying) {
         List<String> args = Lists.newLinkedList();
+        args.add(getMinid());
         args.add(PAYLOAD);
         args.add(JacksonUtils.serialize(payload));
         if (delaying != null) {
@@ -260,7 +230,7 @@ public final class RedisStreamHolder {
             args.add(JacksonUtils.serialize(session));
         }
         List<String> keys = Collections.singletonList(isolate(topic));
-        getStreamTemplate().execute(getAddScript(), keys, args.toArray());
+        getStreamTemplate().execute(XADD_MINID_SCRIPT, keys, args.toArray());
     }
 
     /**
@@ -371,7 +341,7 @@ public final class RedisStreamHolder {
      */
     public static void heartbeat(@NonNull String topic) {
         List<String> keys = Collections.singletonList(topic);
-        getStreamTemplate().execute(getAddScript(), keys, PAYLOAD, StringUtils.EMPTY);
+        getStreamTemplate().execute(XADD_MINID_SCRIPT, keys, getMinid(), PAYLOAD, StringUtils.EMPTY);
     }
 
     /**

+ 3 - 1
framework-redis/src/main/java/com/chelvc/framework/redis/queue/DefaultRedisMQListenerContainer.java

@@ -14,6 +14,7 @@ import com.chelvc.framework.base.context.ApplicationContextHolder;
 import com.chelvc.framework.common.util.AssertUtils;
 import com.chelvc.framework.common.util.StringUtils;
 import com.chelvc.framework.redis.annotation.RedisMQConsumer;
+import com.chelvc.framework.redis.config.RedisProperties;
 import com.chelvc.framework.redis.context.RedisStreamHolder;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -91,7 +92,8 @@ public class DefaultRedisMQListenerContainer<T> implements RedisMQListenerContai
         AssertUtils.nonempty(topic, () -> "Consumer topic must not be empty");
         AssertUtils.nonempty(group, () -> "Consumer group must not be empty");
         AssertUtils.check(batch > 0, () -> "Consumer batch must be greater than 0");
-        AssertUtils.check((this.idle = RedisStreamHolder.getIdle()) > 0, () -> "Consumer idle must be greater than 0");
+        int idle = ApplicationContextHolder.getBean(RedisProperties.class).getStream().getIdle();
+        AssertUtils.check((this.idle = idle) > 0, () -> "Consumer idle must be greater than 0");
         Environment environment = ApplicationContextHolder.getEnvironment();
         this.topic = RedisStreamHolder.isolate(environment.resolvePlaceholders(topic));
         this.group = RedisStreamHolder.isolate(environment.resolvePlaceholders(group));

+ 10 - 21
framework-rocketmq/src/main/java/com/chelvc/framework/rocketmq/context/RocketMQContextHolder.java

@@ -45,11 +45,6 @@ public final class RocketMQContextHolder {
      */
     private static volatile Producer PRODUCER;
 
-    /**
-     * 配置属性
-     */
-    private static volatile RocketMQProperties PROPERTIES;
-
     /**
      * 客户端服务提供者
      */
@@ -59,19 +54,13 @@ public final class RocketMQContextHolder {
     }
 
     /**
-     * 获取配置属性
+     * 获取命名空间
      *
-     * @return 配置属性
+     * @return 命名空间
      */
-    private static RocketMQProperties getProperties() {
-        if (PROPERTIES == null) {
-            synchronized (RocketMQProperties.class) {
-                if (PROPERTIES == null) {
-                    PROPERTIES = ApplicationContextHolder.getBean(RocketMQProperties.class);
-                }
-            }
-        }
-        return PROPERTIES;
+    public static String getNamespace() {
+        RocketMQProperties properties = ApplicationContextHolder.getBean(RocketMQProperties.class, false);
+        return ObjectUtils.ifNull(properties, RocketMQProperties::getNamespace);
     }
 
     /**
@@ -113,9 +102,9 @@ public final class RocketMQContextHolder {
      * @return 消息主题
      */
     public static String topic(@NonNull Message message) {
-        String namespace = getProperties().getNamespace();
+        String namespace = getNamespace();
         String topic = message.getTopic(), tag = message.getTag().orElse(null);
-        if (StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
+        if (StringUtils.notEmpty(namespace) && StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
             topic = topic.substring(namespace.length());
         }
         return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
@@ -128,9 +117,9 @@ public final class RocketMQContextHolder {
      * @return 消息主题
      */
     public static String topic(@NonNull MessageView message) {
-        String namespace = getProperties().getNamespace();
+        String namespace = getNamespace();
         String topic = message.getTopic(), tag = message.getTag().orElse(null);
-        if (StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
+        if (StringUtils.notEmpty(namespace) && StringUtils.notEmpty(topic) && topic.startsWith(namespace)) {
             topic = topic.substring(namespace.length());
         }
         return StringUtils.isEmpty(tag) ? topic : (topic + TOPIC_TAG_DELIMITER + tag);
@@ -163,7 +152,7 @@ public final class RocketMQContextHolder {
      * @return 环境隔离标记
      */
     public static String isolate(@NonNull String original) {
-        String namespace = getProperties().getNamespace();
+        String namespace = getNamespace();
         return StringUtils.isEmpty(namespace) ? original : (namespace + original);
     }