今天遇到了一个需求,我们的数据库是MongoDB,需要实时新增数据源并使用,用的库可能相同,但是数据源不同,这里记录下实现方法,大体的思路是自己注入MongoTemplate,通过ThreadLocal来切换
首先定义一个@Mongo注解用于注入MongoTemplate
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.PARAMETER})
public @interface Mongo {
/**
* 数据库名称
*/
String dbName();
}
创建MongoDBTemplateFactory工厂,用于生成MongoTemplate
public class MongoDBTemplateFactory {
/**
* 存放MongoDB练级池
* Map<env, Map<databaseName, MongoTemplate>>
*/
private static final Map<String, Map<String, MongoTemplate>> MONGO_TEMPLATE_MAP;
/**
* 存放MongoClientSettings
* Map<env, MongoClientSettings>
*/
private static final Map<String, MongoClientSettings> MONGO_CLIENT_SETTINGS_MAP;
static {
MONGO_TEMPLATE_MAP = new ConcurrentHashMap<>();
MONGO_CLIENT_SETTINGS_MAP = new ConcurrentHashMap<>();
}
public static MongoTemplate requireGetMongoTemplate(String env, String database) {
Map<String, MongoTemplate> mongoTeamplateMap = MONGO_TEMPLATE_MAP.getOrDefault(env, new HashMap<>());
if (mongoTeamplateMap.containsKey(database)) {
return mongoTeamplateMap.get(database);
}
MongoClientSettings settings = MONGO_CLIENT_SETTINGS_MAP.get(env);
if (settings == null) throw new IllegalArgumentException("MongoClientSettings is null");
MongoDatabaseFactory databaseFactory = MongoDBUtils.createFactory(settings, database);
MongoTemplate mongoTemplate = new MongoTemplate(databaseFactory);
mongoTeamplateMap.put(database, mongoTemplate);
MONGO_TEMPLATE_MAP.put(env, mongoTeamplateMap);
return mongoTemplate;
}
public static MongoClientSettings registerMongoSettings(String env, MongoProperties properties) {
String authenticationURL = MongoDBUtils.getAuthenticationURL(
properties.getUsername(), new String(properties.getPassword()),
properties.getHost(), properties.getPort(), MongoDBUtils.DEFAULT_DB_NAME
);
MongoClientSettings settings = MongoDBUtils.getMongoClientSettings(authenticationURL);
MONGO_CLIENT_SETTINGS_MAP.put(env, settings);
return settings;
}
}
其中,MongoDBUtil实现方法如下,都是对常用方法的封装
public class MongoDBUtils {
/**
* 默认数据库名
*/
public static final String DEFAULT_DB_NAME = "test";
/**
* 默认连接超时时间
*/
public static final long DEFAULT_CONNECTION_TIMEOUT = 5000;
/**
* 默认最大等待时间
*/
public static final long DEFAULT_MAX_WAIT_TIME = 5000;
/**
* 默认连接池大小
*/
public static final int DEFAULT_POOL_SIZE = 20;
private MongoDBUtils() {}
/**
* 获取数据库连接地址
* @param host hostname
* @param port port
*/
public static String getAuthenticationURL(String host, int port) {
return getAuthenticationURL(null, null, host, port, DEFAULT_DB_NAME);
}
/**
* 获取数据库连接地址
* @param host hostname
* @param port port
* @param dbName db name
*/
public static String getAuthenticationURL(String host, int port, String dbName) {
return getAuthenticationURL(null, null, host, port, dbName);
}
/**
* 获取数据库连接地址
* @param username username
* @param password password
* @param host hostname
* @param port port
* @param dbName default db name
*/
public static String getAuthenticationURL(String username, String password,
String host, int port, String dbName) {
StringBuilder url = new StringBuilder("mongodb://");
if (StringUtils.hasLength(username) && StringUtils.hasLength(password)) {
url.append(username).append(":").append(password).append("@");
}
url.append(host).append(":").append(port)
.append("/")
.append(dbName);
return url.toString();
}
/**
* 获取MongoDB连接配置
* @param url mongodb url
*/
public static MongoClientSettings getMongoClientSettings(String url) {
return getMongoClientSettings(url, DEFAULT_POOL_SIZE, DEFAULT_MAX_WAIT_TIME, DEFAULT_CONNECTION_TIMEOUT);
}
/**
* 获取MongoDB连接配置
* @param url mongodb url
* @param poolSize 连接池大小
* @param maxWaitTime 最大等待时间
* @param connectionTimeout 连接超时时间
*/
public static MongoClientSettings getMongoClientSettings(String url, int poolSize, long maxWaitTime, long connectionTimeout) {
ConnectionString connectionString = new ConnectionString(url);
return MongoClientSettings.builder()
.applyConnectionString(connectionString)
.applyToSocketSettings(builder -> builder.connectTimeout(connectionTimeout, TimeUnit.MICROSECONDS))
.applyToConnectionPoolSettings(builder -> builder.maxSize(poolSize).maxWaitTime(maxWaitTime, TimeUnit.MICROSECONDS))
.build();
}
/**
* 创建MongoDB工厂
* @param settings mongodb配置
* @param dbName db name
*/
public static MongoDatabaseFactory createFactory(MongoClientSettings settings, String dbName) {
return new SimpleMongoClientDatabaseFactory(MongoClients.create(settings), dbName);
}
}
为了避免对一个数据源和数据库创建多个连接池,这里用MongoDBProxyRegister来记录创建过的数据源和连接池,并在有相同的时候直接返回即可。
public class MongoDBProxyRegister {
private static final Map<String, MongoTemplate> DATABASE_PROXY_MAP = new ConcurrentHashMap<>();
private MongoDBProxyRegister() {}
/**
* 创建代理对象
* @param database 数据库名
* @param mongoDatabaseFactory 数据库工厂
* @return 代理对象
*/
public static MongoTemplate createMongoDBProxy(String database, MongoDatabaseFactory mongoDatabaseFactory) {
if (DATABASE_PROXY_MAP.containsKey(database)) {
return DATABASE_PROXY_MAP.get(database);
}
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(MongoTemplate.class);
enhancer.setCallback(new MongoDBProxyHandler(database));
MongoTemplate mongoTemplate = (MongoTemplate) enhancer.create(
new Class[]{MongoDatabaseFactory.class},
new Object[]{mongoDatabaseFactory}
);
DATABASE_PROXY_MAP.put(database, mongoTemplate);
return mongoTemplate;
}
/**
* 代理对象拦截器
*/
public static class MongoDBProxyHandler implements MethodInterceptor {
private final String database;
public MongoDBProxyHandler(String database) {
this.database = database;
}
@Override
public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) {
String env = MongoDBEnvConverter.getCurrentEnv();
MongoTemplate mongoTemplate = MongoDBTemplateFactory.requireGetMongoTemplate(env, database);
return ReflectionUtils.invokeMethod(method, mongoTemplate, objects);
}
}
}
因为MongoTemplate为实体类不为接口,因此没法使用JDK代理,因此内部类MongoDBProxyHandler使用cglib动态代理,当调用MongoTemplate方法时,会使用当前实际环境和数据库来执行相关方法
MongoDBEnvConverter是让环境切换的ThreadLocal,简单实现下
public class MongoDBEnvConverter {
private MongoDBEnvConverter() {}
private static final ThreadLocal<String> CURRENT_ENV = new ThreadLocal<>();
public static void setCurrentEnv(String env) {
CURRENT_ENV.set(env);
}
public static String getCurrentEnv() {
return CURRENT_ENV.get();
}
public static void clear() {
CURRENT_ENV.remove();
}
}
接下来就是扫描Spring内部的所有Bean,将含有Mongo注解的MongoTemplate替换成我们自己实现的动态代理类,这个功能可以使用Spring提供的BeanpostProcessor实现
@Component
public class MongoDBBeanPostProcessor implements BeanPostProcessor {
@Resource
private MongoProperties mongoProperties;
@Override
public Object postProcessAfterInitialization(Object bean, @Nonnull String beanName) throws BeansException {
ReflectionUtils.doWithFields(bean.getClass(), field -> {
Mongo mongo = field.getAnnotation(Mongo.class);
if (mongo == null) return;
String dbName = mongo.dbName();
MongoClientSettings settings = MongoDBTemplateFactory.registerMongoSettings("default", mongoProperties);
MongoDatabaseFactory databaseFactory = MongoDBUtils.createFactory(settings, dbName);
MongoTemplate proxyMongoTemplate = MongoDBProxyRegister.createMongoDBProxy(dbName, databaseFactory);
ReflectionUtils.makeAccessible(field);
ReflectionUtils.setField(field, bean, proxyMongoTemplate);
});
return bean;
}
}
后续扩展可以用AOP 加一个determateMongo注解来指定当前环境,设置默认环境等等,这里就不细展开了
over...