今天遇到了一个需求,我们的数据库是MongoDB,需要实时新增数据源并使用,用的库可能相同,但是数据源不同,这里记录下实现方法,大体的思路是自己注入MongoTemplate,通过ThreadLocal来切换

首先定义一个@Mongo注解用于注入MongoTemplate

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
public @interface Mongo {

    /**
     * 数据库名称
     */
    String dbName();

}

创建MongoDBTemplateFactory工厂,用于生成MongoTemplate

public class MongoDBTemplateFactory {

    /**
     * 存放MongoDB练级池
     * Map<env, Map<databaseName, MongoTemplate>>
     */
    private static final Map<String, Map<String, MongoTemplate>> MONGO_TEMPLATE_MAP;

    /**
     * 存放MongoClientSettings
     * Map<env, MongoClientSettings>
     */
    private static final Map<String, MongoClientSettings> MONGO_CLIENT_SETTINGS_MAP;

    static {
        MONGO_TEMPLATE_MAP = new ConcurrentHashMap<>();
        MONGO_CLIENT_SETTINGS_MAP = new ConcurrentHashMap<>();
    }

    public static MongoTemplate requireGetMongoTemplate(String env, String database) {
        Map<String, MongoTemplate> mongoTeamplateMap = MONGO_TEMPLATE_MAP.getOrDefault(env, new HashMap<>());
        if (mongoTeamplateMap.containsKey(database)) {
            return mongoTeamplateMap.get(database);
        }

        MongoClientSettings settings = MONGO_CLIENT_SETTINGS_MAP.get(env);
        if (settings == null) throw new IllegalArgumentException("MongoClientSettings is null");

        MongoDatabaseFactory databaseFactory = MongoDBUtils.createFactory(settings, database);
        MongoTemplate mongoTemplate = new MongoTemplate(databaseFactory);
        mongoTeamplateMap.put(database, mongoTemplate);
        MONGO_TEMPLATE_MAP.put(env, mongoTeamplateMap);

        return mongoTemplate;
    }

    public static MongoClientSettings registerMongoSettings(String env, MongoProperties properties) {
        String authenticationURL = MongoDBUtils.getAuthenticationURL(
                properties.getUsername(), new String(properties.getPassword()),
                properties.getHost(), properties.getPort(), MongoDBUtils.DEFAULT_DB_NAME
        );
        MongoClientSettings settings = MongoDBUtils.getMongoClientSettings(authenticationURL);
        MONGO_CLIENT_SETTINGS_MAP.put(env, settings);

        return settings;
    }

}

其中,MongoDBUtil实现方法如下,都是对常用方法的封装

public class MongoDBUtils {

    /**
     * 默认数据库名
     */
    public static final String DEFAULT_DB_NAME = "test";

    /**
     * 默认连接超时时间
     */
    public static final long DEFAULT_CONNECTION_TIMEOUT = 5000;

    /**
     * 默认最大等待时间
     */
    public static final long DEFAULT_MAX_WAIT_TIME = 5000;

    /**
     * 默认连接池大小
     */
    public static final int DEFAULT_POOL_SIZE = 20;

    private MongoDBUtils() {}

    /**
     * 获取数据库连接地址
     * @param host hostname
     * @param port port
     */
    public static String getAuthenticationURL(String host, int port) {
        return getAuthenticationURL(null, null, host, port, DEFAULT_DB_NAME);
    }

    /**
     * 获取数据库连接地址
     * @param host hostname
     * @param port port
     * @param dbName db name
     */
    public static String getAuthenticationURL(String host, int port, String dbName) {
        return getAuthenticationURL(null, null, host, port, dbName);
    }

    /**
     * 获取数据库连接地址
     * @param username username
     * @param password password
     * @param host hostname
     * @param port port
     * @param dbName default db name
     */
    public static String getAuthenticationURL(String username, String password,
                                              String host, int port, String dbName) {
        StringBuilder url = new StringBuilder("mongodb://");
        if (StringUtils.hasLength(username) && StringUtils.hasLength(password)) {
            url.append(username).append(":").append(password).append("@");
        }
        url.append(host).append(":").append(port)
                .append("/")
                .append(dbName);
        return url.toString();
    }

    /**
     * 获取MongoDB连接配置
     * @param url mongodb url
     */
    public static MongoClientSettings getMongoClientSettings(String url) {
        return getMongoClientSettings(url, DEFAULT_POOL_SIZE, DEFAULT_MAX_WAIT_TIME, DEFAULT_CONNECTION_TIMEOUT);
    }

    /**
     * 获取MongoDB连接配置
     * @param url mongodb url
     * @param poolSize 连接池大小
     * @param maxWaitTime 最大等待时间
     * @param connectionTimeout 连接超时时间
     */
    public static MongoClientSettings getMongoClientSettings(String url, int poolSize, long maxWaitTime, long connectionTimeout) {
        ConnectionString connectionString = new ConnectionString(url);
        return MongoClientSettings.builder()
                .applyConnectionString(connectionString)
                .applyToSocketSettings(builder -> builder.connectTimeout(connectionTimeout, TimeUnit.MICROSECONDS))
                .applyToConnectionPoolSettings(builder -> builder.maxSize(poolSize).maxWaitTime(maxWaitTime, TimeUnit.MICROSECONDS))
                .build();
    }

    /**
     * 创建MongoDB工厂
     * @param settings mongodb配置
     * @param dbName db name
     */
    public static MongoDatabaseFactory createFactory(MongoClientSettings settings, String dbName) {
        return new SimpleMongoClientDatabaseFactory(MongoClients.create(settings), dbName);
    }

}

为了避免对一个数据源和数据库创建多个连接池,这里用MongoDBProxyRegister来记录创建过的数据源和连接池,并在有相同的时候直接返回即可。

public class MongoDBProxyRegister {

    private static final Map<String, MongoTemplate> DATABASE_PROXY_MAP = new ConcurrentHashMap<>();

    private MongoDBProxyRegister() {}

    /**
     * 创建代理对象
     * @param database 数据库名
     * @param mongoDatabaseFactory 数据库工厂
     * @return 代理对象
     */
    public static MongoTemplate createMongoDBProxy(String database, MongoDatabaseFactory mongoDatabaseFactory) {
        if (DATABASE_PROXY_MAP.containsKey(database)) {
            return DATABASE_PROXY_MAP.get(database);
        }

        Enhancer enhancer = new Enhancer();
        enhancer.setSuperclass(MongoTemplate.class);
        enhancer.setCallback(new MongoDBProxyHandler(database));
        MongoTemplate mongoTemplate = (MongoTemplate) enhancer.create(
            new Class[]{MongoDatabaseFactory.class},
            new Object[]{mongoDatabaseFactory}
        );
        DATABASE_PROXY_MAP.put(database, mongoTemplate);
        return mongoTemplate;
    }

    /**
     * 代理对象拦截器
     */
    public static class MongoDBProxyHandler implements MethodInterceptor {

        private final String database;

        public MongoDBProxyHandler(String database) {
            this.database = database;
        }

        @Override
        public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) {
            String env = MongoDBEnvConverter.getCurrentEnv();
            MongoTemplate mongoTemplate = MongoDBTemplateFactory.requireGetMongoTemplate(env, database);
            return ReflectionUtils.invokeMethod(method, mongoTemplate, objects);
        }

    }



}

因为MongoTemplate为实体类不为接口,因此没法使用JDK代理,因此内部类MongoDBProxyHandler使用cglib动态代理,当调用MongoTemplate方法时,会使用当前实际环境和数据库来执行相关方法

MongoDBEnvConverter是让环境切换的ThreadLocal,简单实现下

public class MongoDBEnvConverter {

    private MongoDBEnvConverter() {}

    private static final ThreadLocal<String> CURRENT_ENV = new ThreadLocal<>();

    public static void setCurrentEnv(String env) {
        CURRENT_ENV.set(env);
    }

    public static String getCurrentEnv() {
        return CURRENT_ENV.get();
    }

    public static void clear() {
        CURRENT_ENV.remove();
    }

}

接下来就是扫描Spring内部的所有Bean,将含有Mongo注解的MongoTemplate替换成我们自己实现的动态代理类,这个功能可以使用Spring提供的BeanpostProcessor实现

@Component
public class MongoDBBeanPostProcessor implements BeanPostProcessor {

    @Resource
    private MongoProperties mongoProperties;

    @Override
    public Object postProcessAfterInitialization(Object bean, @Nonnull String beanName) throws BeansException {
        ReflectionUtils.doWithFields(bean.getClass(), field -> {
            Mongo mongo = field.getAnnotation(Mongo.class);
            if (mongo == null) return;
            String dbName = mongo.dbName();
            MongoClientSettings settings = MongoDBTemplateFactory.registerMongoSettings("default", mongoProperties);
            MongoDatabaseFactory databaseFactory = MongoDBUtils.createFactory(settings, dbName);
            MongoTemplate proxyMongoTemplate = MongoDBProxyRegister.createMongoDBProxy(dbName, databaseFactory);

            ReflectionUtils.makeAccessible(field);
            ReflectionUtils.setField(field, bean, proxyMongoTemplate);
        });
        return bean;
    }
}

后续扩展可以用AOP 加一个determateMongo注解来指定当前环境,设置默认环境等等,这里就不细展开了

over...