Commit 9742023a authored by 侯力峰's avatar 侯力峰
Browse files

首次开源发布

parent 444abeb0
Pipeline #2755 canceled with stages
package cn.spatiotemporal.core.async.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.core.annotation.AliasFor;
/**
* 异步服务的客户端调用注解
* @author marquis
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncClient {
/**
* 异步服务的组件名称(MQ的主题名称)
*/
@AliasFor("topic")
String value() default "";
/**
* 异步服务的组件名称(MQ的主题名称)
*/
@AliasFor("value")
String topic() default "";
/**
* 通过名称,而非类型来指定实现类
*/
@AliasFor("qualifier")
String name() default "";
/**
* 如果有多个实现类,可以通过<code>@name</code>或<code>@Qualifier</code>的值来区分
*/
@AliasFor("name")
String qualifier() default "";
/**
* 重新指定服务端接口或实现类的类型,如果未指定则采用注解所在的接口的类型
* @return
*/
String type() default "";
}
package cn.spatiotemporal.core.async.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 异步服务的服务端实现注解
* @author marquis
*
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface AsyncService {
}
package cn.spatiotemporal.core.async.client;
import java.lang.reflect.ParameterizedType;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import cn.spatiotemporal.core.async.AsyncConstants;
import cn.spatiotemporal.core.async.AsyncParamMessage;
import cn.spatiotemporal.core.async.AsyncReturnMessage;
import cn.spatiotemporal.core.async.spring.EnableAsyncClient;
import cn.spatiotemporal.core.utils.FastJsonUtils;
import net.jodah.typetools.TypeResolver;
/**
* 异步调用的客户端处理
* @author marquis
*
*/
@Component
@ConditionalOnBean(annotation = EnableAsyncClient.class)
public class AsyncClient {
@Autowired
private String compInstanceId;
@Autowired
private RabbitTemplate template;
/**
* 成功回调实例储存Map
*/
private static Map<String, Consumer<?>> onSuccussCallbacks = new HashMap<String, Consumer<?>>();
/**
* 失败回调实例储存Map
*/
private static Map<String, Consumer<String>> onFailCallbacks = new HashMap<String, Consumer<String>>();
/**
* 回调返回值的类型
*/
private Class resultType;
/**
* 异步调用
* @param compName 远程模块
* @param className 对象类名称
* @param methodName 方法名称
* @param params 参数
*/
public AsyncResult call(String compName, String className, String methodName, Object[] params) {
AsyncResult result = new AsyncResult();
//随机生成异步调用实例ID
String instanceId = String.valueOf(UUID.randomUUID().getMostSignificantBits());
result.setInstanceId(instanceId);
//发送异步调用消息
AsyncParamMessage param = new AsyncParamMessage();
param.setCid(compInstanceId);
param.setInstanceId(instanceId);
param.setTarget(className);
param.setMethod(methodName);
param.setParams(FastJsonUtils.object2Json(params));
template.convertAndSend(AsyncConstants.ASYNC_CALL_MQ + compName,
param);
return result;
}
/**
* 异步调用
* @param compName 远程模块
* @param className 对象类名称
* @param methodName 方法名称
* @param params 参数
*/
public <T> AsyncResult call(String compName, String className, String methodName, Object[] params, Class<T> clazz, Consumer<T> consumer) {
//注册成功回调实例
return call(compName, className, methodName, params).onSuccuss(clazz, consumer);
}
/**
* 异步调用结果处理
* @param returnMsg
*/
@RabbitListener(queues = AsyncConstants.ASYNC_RETURN_MQ + "#{@compInstanceId}")
public void resultHandle(AsyncReturnMessage returnMsg) {
try {
//如果异步调用执行有异常,则调用异常回调处理,否则抛出异常
if (returnMsg.isHasException()) {
//获取回调实例
Consumer<String> onFail = (Consumer<String>) onFailCallbacks.get(returnMsg.getInstanceId());
if (onFail != null) {
onFail.accept(returnMsg.getErrorMessage());
} else {
//TODO throw异常 无法结束
System.out.println(returnMsg.getErrorMessage());
//throw new AsyncProcessException(returnMsg.getErrorMessage());
}
} else {
//获取回调实例
Consumer onSuccuss = onSuccussCallbacks.get(returnMsg.getInstanceId());
if (onSuccuss != null) {
Class<?> clazz = TypeResolver.resolveRawArguments(Consumer.class, onSuccuss.getClass())[0];
onSuccuss.accept(FastJsonUtils.json2Object(returnMsg.getReturnValue(), clazz));
}
}
} catch (Exception e) {
// TODO 记录异步调用异常,记录日志
e.printStackTrace();
}
}
/**
* 注册正常回调处理
* @param instanceId
* @param consumer
*/
protected static <T> void registOnSuccussCallback(String instanceId, Class<T> clazz, Consumer<T> consumer) {
if (consumer != null) {
onSuccussCallbacks.put(instanceId, consumer);
}
}
/**
* 注册异常回调处理
* @param instanceId
* @param consumer
*/
protected static void registOnFailCallback(String instanceId, Consumer<String> consumer) {
if (consumer != null) {
onFailCallbacks.put(instanceId, consumer);
}
}
private Class getResultType() {
if (resultType == null) {
resultType = (Class) ((ParameterizedType)this.getClass().getGenericSuperclass())
.getActualTypeArguments()[0]; //得到泛型类型
}
return resultType;
}
}
package cn.spatiotemporal.core.async.client;
import java.util.function.Consumer;
import lombok.Getter;
import lombok.Setter;
/**
* 异步调用结果处理(添加成功回调或异常回调)
* @author marquis
*
*/
@Setter
@Getter
public class AsyncResult {
/**
* 异步调用实例ID
*/
private String instanceId;
/**
* 成功回调
* @param consumer
* @return
*/
public <T> AsyncResult onSuccuss(Class<T> clazz, Consumer<T> consumer) {
AsyncClient.registOnSuccussCallback(instanceId, clazz, consumer);
return this;
}
/**
* 异常回调
* @param consumer
* @return
*/
public AsyncResult onFail(Consumer<String> consumer) {
AsyncClient.registOnFailCallback(instanceId, consumer);
return this;
}
}
package cn.spatiotemporal.core.async.exception;
/**
* 业务异常类的公共基类
* @author marquis
*
*/
public class AsyncProcessException extends RuntimeException {
/**
*
*/
private static final long serialVersionUID = 4534112171112827815L;
private String code;
public AsyncProcessException(String message) {
super(message);
this.code = "502";
}
public AsyncProcessException(String code, String message) {
super(message);
this.code = code;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
}
package cn.spatiotemporal.core.async.server;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.util.List;
import java.util.Map;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import cn.spatiotemporal.core.async.AsyncConstants;
import cn.spatiotemporal.core.async.AsyncParamMessage;
import cn.spatiotemporal.core.async.AsyncReturnMessage;
import cn.spatiotemporal.core.async.exception.AsyncProcessException;
import cn.spatiotemporal.core.async.spring.EnableAsyncServer;
import cn.spatiotemporal.core.utils.FastJsonUtils;
import cn.spatiotemporal.core.utils.SpringBeanUtils;
@Component
@ConditionalOnBean(annotation = EnableAsyncServer.class)
public class AsyncDispatcher {
@Autowired
private String compInstanceId;
@Autowired
private RabbitTemplate template;
/**
* 异步调用调度处理
* @param param
*/
@RabbitListener(queues = AsyncConstants.ASYNC_CALL_MQ + "#{@compId}")
public void dispath(AsyncParamMessage param){
AsyncReturnMessage returnMsg = new AsyncReturnMessage();
returnMsg.setEid(compInstanceId);
returnMsg.setInstanceId(param.getInstanceId());
returnMsg.setHasException(false);
//查找调用对象及方法,然后执行
try {
Object target = SpringBeanUtils.getBean(param.getTarget());
if (target == null) {
throw new AsyncProcessException("无法找到服务对象["+param.getTarget()+"]");
}
String paramsJson = param.getParams();
JSONArray jarray = JSON.parseArray(paramsJson);
Method method = findMethod(target.getClass(), param.getMethod(), jarray.size());
if (method == null) {
throw new AsyncProcessException("无法匹配到合适的方法["+param.getMethod()+"]");
}
Object[] params = json2Params(jarray, method.getParameterTypes());
Object returnValue = method.invoke(target, params);
returnMsg.setReturnValue(FastJsonUtils.object2Json(returnValue));
} catch (Exception e) {
returnMsg.setHasException(true);
returnMsg.setErrorMessage(e.getMessage());
}
//发送返回消息
template.convertAndSend(AsyncConstants.ASYNC_RETURN_MQ + param.getCid(),
returnMsg);
}
/**
* 查找对象方法
* @param clazz
* @param name
* @param paramSize
* @return
*/
private Method findMethod(Class<?> clazz, String name, int paramSize) {
for (Method method : clazz.getMethods()) {
if (name.equals(method.getName()) && method.getParameterCount() == paramSize) {
return method;
}
}
return null;
}
private Object[] json2Params(JSONArray jarray, Class<?>[] clazzes) {
Object[] params = new Object[jarray.size()];
for (int i = 0; i < jarray.size(); i++) {
Class clazz = clazzes[i];
if (clazz.isArray()) {
params[i] = FastJsonUtils.json2Array(jarray.getJSONArray(i), clazz.getComponentType());
} else if (clazz.isAssignableFrom(List.class)) {
Class type = (Class) ((ParameterizedType)clazz.getGenericSuperclass())
.getActualTypeArguments()[0]; //得到泛型类型
params[i] = FastJsonUtils.json2List(jarray.getJSONArray(i), type);
} else if (clazz.isAssignableFrom(Map.class)) {
Class keyType = (Class) ((ParameterizedType)clazz.getGenericSuperclass())
.getActualTypeArguments()[0]; //得到key的泛型类型
Class valueType = (Class) ((ParameterizedType)clazz.getGenericSuperclass())
.getActualTypeArguments()[1]; //得到value的泛型类型
params[i] = FastJsonUtils.json2Map(jarray.getJSONObject(i), keyType, valueType);
} else if (FastJsonUtils.isSimple(clazz)) {
// 基本类型
params[i] = jarray.getObject(i, clazz);
} else {
// 一般Bean
params[i] = FastJsonUtils.json2Bean(jarray.getJSONObject(i), clazz);
}
}
return params;
}
}
package cn.spatiotemporal.core.async.spring;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.core.env.MapPropertySource;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import cn.spatiotemporal.core.async.agent.AsyncAgent;
import cn.spatiotemporal.core.async.annotation.AsyncClient;
import ch.qos.logback.core.net.server.Client;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
* 异步服务客户端工厂对象
* 利用动态代理生成异步接口调用的实现类
* @author marquis
*
*/
@Data
@EqualsAndHashCode(callSuper = false)
public class AsyncClientFactoryBean implements FactoryBean<Object>, InitializingBean,
DisposableBean, ApplicationContextAware {
public interface Specification {
String getName();
Class<?>[] getConfiguration();
}
private Class<?> type;
private String name;
private String topic;
private Map<String, String> attrs;
private ApplicationContext context;
public AsyncClientFactoryBean(Class<?> type, Map<String, String> attrs) {
this.type = type;
this.attrs = attrs;
this.topic = attrs.get("topic");
this.name = attrs.get("name");
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.hasText(this.topic, "Topic must be set");
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.context = context;
}
@Override
public Object getObject() throws Exception {
AsyncAgent builder = AsyncAgent.builder(this.topic);
return builder.build(this.type, this.name, this.attrs);
}
@Override
public Class<?> getObjectType() {
return this.type;
}
@Override
public boolean isSingleton() {
return true;
}
@Override
public void destroy() throws Exception {
// 销毁处理
}
}
package cn.spatiotemporal.core.async.spring;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.ResourceLoaderAware;
import org.springframework.context.annotation.ClassPathScanningCandidateComponentProvider;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.core.type.ClassMetadata;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.core.type.filter.AbstractClassTestingTypeFilter;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.core.type.filter.TypeFilter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import cn.spatiotemporal.core.async.annotation.AsyncClient;
/**
* 异步服务客户端注册器
* 扫描异步服务客户端注解并注册
* @author marquis
*
*/
public class AsyncClientRegistrar implements ImportBeanDefinitionRegistrar,
ResourceLoaderAware, BeanClassLoaderAware, EnvironmentAware {
private ResourceLoader resourceLoader;
private ClassLoader classLoader;;
private Environment environment;
public AsyncClientRegistrar() {
}
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
registerDefaultConfiguration(metadata, registry);
registerAsyncClients(metadata, registry);
}
private void registerDefaultConfiguration(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
Map<String, Object> defaultAttrs = metadata
.getAnnotationAttributes(EnableAsyncClient.class.getName(), true);
if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) {
String name;
if (metadata.hasEnclosingClass()) {
name = "default." + metadata.getEnclosingClassName();
}
else {
name = "default." + metadata.getClassName();
}
registerClientConfiguration(registry, name,
defaultAttrs.get("defaultConfiguration"));
}
}
public void registerAsyncClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
ClassPathScanningCandidateComponentProvider scanner = getScanner();
scanner.setResourceLoader(this.resourceLoader);
Set<String> basePackages;
Map<String, Object> attrs = metadata
.getAnnotationAttributes(EnableAsyncClient.class.getName());
AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter(
AsyncClient.class);
final Class<?>[] clients = attrs == null ? null
: (Class<?>[]) attrs.get("clients");
if (clients == null || clients.length == 0) {
scanner.addIncludeFilter(annotationTypeFilter);
basePackages = getBasePackages(metadata);
}
else {
final Set<String> clientClasses = new HashSet<>();
basePackages = new HashSet<>();
for (Class<?> clazz : clients) {
basePackages.add(ClassUtils.getPackageName(clazz));
clientClasses.add(clazz.getCanonicalName());
}
AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() {
@Override
protected boolean match(ClassMetadata metadata) {
String cleaned = metadata.getClassName().replaceAll("\\$", ".");
return clientClasses.contains(cleaned);
}
};
scanner.addIncludeFilter(
new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter)));
}
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner
.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(),
"@AsyncClient 注解只能指定到接口上。");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(
AsyncClient.class.getCanonicalName());
String topic = getTopicName(attributes);
registerClientConfiguration(registry, topic,
attributes.get("configuration"));
try {
registerAsyncClient(registry, annotationMetadata, attributes);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
private void registerAsyncClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) throws ClassNotFoundException {
String className = annotationMetadata.getClassName();
BeanDefinitionBuilder definition = BeanDefinitionBuilder
.genericBeanDefinition(AsyncClientFactoryBean.class);
validate(attributes);
String name = getName(attributes);
definition.addPropertyValue("name", name);
definition.addPropertyValue("type", className);
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
String alias = name + "AsyncClient";
if (!StringUtils.hasText(alias)) {
alias = className;
}
AbstractBeanDefinition beanDefinition = definition.getBeanDefinition();
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(Class.forName(className));
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(attributes);
beanDefinition.setPrimary(true); // has a default, won't be null
BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className,
new String[] { name });
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
private void validate(Map<String, Object> attributes) {
AnnotationAttributes annotation = AnnotationAttributes.fromMap(attributes);
// TODO 版本低了? This blows up if an aliased property is overspecified
//annotation.getAliasedString("name", AsyncClient.class, null);
}
protected String getName(Map<String, Object> attributes) {
String name = (String) attributes.get("name");
if (!StringUtils.hasText(name)) {
name = (String) attributes.get("qualifier");
}
name = resolve(name);
if (!StringUtils.hasText(name)) {
return "";
}
return name;
}
private String resolve(String value) {
if (StringUtils.hasText(value)) {
return this.environment.resolvePlaceholders(value);
}
return value;
}
protected ClassPathScanningCandidateComponentProvider getScanner() {
return new ClassPathScanningCandidateComponentProvider(false, this.environment) {
@Override
protected boolean isCandidateComponent(
AnnotatedBeanDefinition beanDefinition) {
if (beanDefinition.getMetadata().isIndependent()) {
// TODO until SPR-11711 will be resolved
if (beanDefinition.getMetadata().isInterface()
&& beanDefinition.getMetadata()
.getInterfaceNames().length == 1
&& Annotation.class.getName().equals(beanDefinition
.getMetadata().getInterfaceNames()[0])) {
try {
Class<?> target = ClassUtils.forName(
beanDefinition.getMetadata().getClassName(),
AsyncClientRegistrar.this.getClass().getClassLoader());
return !target.isAnnotation();
}
catch (Exception ex) {
this.logger.error(
"Could not load target class: "
+ beanDefinition.getMetadata().getClassName(),
ex);
}
}
return true;
}
return false;
}
};
}
protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) {
Map<String, Object> attributes = importingClassMetadata
.getAnnotationAttributes(EnableAsyncClient.class.getCanonicalName());
Set<String> basePackages = new HashSet<>();
for (String pkg : (String[]) attributes.get("value")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
for (String pkg : (String[]) attributes.get("basePackages")) {
if (StringUtils.hasText(pkg)) {
basePackages.add(pkg);
}
}
for (Class<?> clazz : (Class[]) attributes.get("basePackageClasses")) {
basePackages.add(ClassUtils.getPackageName(clazz));
}
if (basePackages.isEmpty()) {
basePackages.add(
ClassUtils.getPackageName(importingClassMetadata.getClassName()));
}
return basePackages;
}
private String getQualifier(Map<String, Object> client) {
return getName(client);
}
private String getTopicName(Map<String, Object> client) {
if (client == null) {
return null;
}
String value = (String) client.get("value");
if (!StringUtils.hasText(value)) {
value = (String) client.get("topic");
}
if (StringUtils.hasText(value)) {
return value;
}
throw new IllegalStateException("在@" + AsyncClient.class.getSimpleName() + "注解上, 'topic' 或 'value' 至少要指定一个。");
}
private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name,
Object configuration) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(AsyncClientSpecification.class);
builder.addConstructorArgValue(name);
builder.addConstructorArgValue(configuration);
registry.registerBeanDefinition(
name + "." + AsyncClientSpecification.class.getSimpleName(),
builder.getBeanDefinition());
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
/**
* Helper class to create a {@link TypeFilter} that matches if all the delegates
* match.
*
* @author Oliver Gierke
*/
private static class AllTypeFilter implements TypeFilter {
private final List<TypeFilter> delegates;
/**
* Creates a new {@link AllTypeFilter} to match if all the given delegates match.
*
* @param delegates must not be {@literal null}.
*/
public AllTypeFilter(List<TypeFilter> delegates) {
Assert.notNull(delegates);
this.delegates = delegates;
}
@Override
public boolean match(MetadataReader metadataReader,
MetadataReaderFactory metadataReaderFactory) throws IOException {
for (TypeFilter filter : this.delegates) {
if (!filter.match(metadataReader, metadataReaderFactory)) {
return false;
}
}
return true;
}
}
}
package cn.spatiotemporal.core.async.spring;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AsyncClientSpecification {
private String name;
private Class<?>[] configuration;
}
package cn.spatiotemporal.core.async.spring;
import org.springframework.context.annotation.Configuration;
/**
* 异步服务调用配置
* 主要指定所使用的消息队列、序列化器等参数
* @author marquis
*
*/
@Configuration
public class AsyncClientsConfiguration {
}
package cn.spatiotemporal.core.async.spring;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* 扫描异步调用客户端
* @author marquis
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
//@Import(AsyncClientRegistrar.class)
public @interface EnableAsyncClient {
/**
* 默认属性,与{@link #basePackages()}是同一个属性,简写的时候等同于指定了 {@link #basePackages()}。
* 例如: 可以用{@code @EnableAsyncClients("org.my.pkg")}来替代
* {@code @EnableAsyncClients(basePackages="org.my.pkg")}.
* @return the array of 'basePackages'.
*/
String[] value() default {};
/**
* Base packages to scan for annotated components.
* <p>
* {@link #value()} is an alias for (and mutually exclusive with) this attribute.
* <p>
* Use {@link #basePackageClasses()} for a type-safe alternative to String-based
* package names.
*
* @return the array of 'basePackages'.
*/
String[] basePackages() default {};
/**
* Type-safe alternative to {@link #basePackages()} for specifying the packages to
* scan for annotated components. The package of each class specified will be scanned.
* <p>
* Consider creating a special no-op marker class or interface in each package that
* serves no purpose other than being referenced by this attribute.
*
* @return the array of 'basePackageClasses'.
*/
Class<?>[] basePackageClasses() default {};
/**
* 异步服务客户端的配置信息.
*
* @see AsyncClientsConfiguration for the defaults
*/
Class<?>[] defaultConfiguration() default {};
/**
* List of classes annotated with @AsyncClient. If not empty, disables classpath scanning.
* @return
*/
Class<?>[] clients() default {};
}
package cn.spatiotemporal.core.async.spring;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.context.annotation.Import;
/**
* 扫描异步调用的服务端
* @author marquis
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface EnableAsyncServer {
/**
* 默认属性,与{@link #basePackages()}是同一个属性,简写的时候等同于指定了 {@link #basePackages()}。
* 例如: 可以用{@code @EnableAsyncClients("org.my.pkg")}来替代
* {@code @EnableAsyncClients(basePackages="org.my.pkg")}.
* @return the array of 'basePackages'.
*/
String[] value() default {};
/**
* Base packages to scan for annotated components.
* <p>
* {@link #value()} is an alias for (and mutually exclusive with) this attribute.
* <p>
* Use {@link #basePackageClasses()} for a type-safe alternative to String-based
* package names.
*
* @return the array of 'basePackages'.
*/
String[] basePackages() default {};
/**
* Type-safe alternative to {@link #basePackages()} for specifying the packages to
* scan for annotated components. The package of each class specified will be scanned.
* <p>
* Consider creating a special no-op marker class or interface in each package that
* serves no purpose other than being referenced by this attribute.
*
* @return the array of 'basePackageClasses'.
*/
Class<?>[] basePackageClasses() default {};
/**
* 异步服务客户端的配置信息.
*
* @see AsyncClientsConfiguration for the defaults
*/
Class<?>[] defaultConfiguration() default {};
/**
* List of classes annotated with @AsyncClient. If not empty, disables classpath scanning.
* @return
*/
Class<?>[] clients() default {};
}
package cn.spatiotemporal.core.config;
import javax.annotation.Resource;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
public class DynamicConfig {
@Resource
protected Environment environment;
/**
* 判断是否存在指定的参数
* @param key
* @return
*/
public boolean containsKey(String key) {
return environment.containsProperty(key);
}
/**
* 动态获取配置参数的值(字符串)
* @param key
* @return
*/
public String getValue(String key) {
return environment.getProperty(key, String.class);
}
/**
* 动态获取配置参数的值(对象)
* @param key
* @param clazz
* @return
*/
public <T> T getValue(String key, Class<T> clazz) {
return environment.getProperty(key, clazz);
}
}
package cn.spatiotemporal.core.config.async;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.spatiotemporal.core.async.AsyncConstants;
import cn.spatiotemporal.core.async.spring.EnableAsyncClient;
/**
* 异步调用配置
* @author marquis
*
*/
@Configuration
@ConditionalOnBean(annotation = EnableAsyncClient.class)
public class AsyncClientConfig {
@Autowired
private String compInstanceId;
@Bean
/**
* 异步调用通讯用MQ
* @return
*/
public Queue asyncQueue() {
Queue queue = new Queue(AsyncConstants.ASYNC_RETURN_MQ + compInstanceId);
return queue;
}
// @Bean
// public DirectExchange asyncReturnExchange() {
// DirectExchange exchange = new DirectExchange(AsyncConstants.ASYNC_RABBIT_MQ_EXCHANGE);
// return exchange;
// }
//
// @Bean
// public Binding asyncClientBinding(DirectExchange asyncReturnExchange, Queue asyncQueue, String compInstanceId) {
// //绑定消费规则
// Binding binding = BindingBuilder.bind(asyncQueue).to(asyncReturnExchange).with(AsyncConstants.ASYNC_RETURN_ROUTING_KEY + compInstanceId);
// return binding;
// }
}
package cn.spatiotemporal.core.config.async;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.spatiotemporal.core.async.AsyncConstants;
import cn.spatiotemporal.core.async.spring.EnableAsyncServer;
/**
* 异步调用配置
* @author marquis
*
*/
@Configuration
@ConditionalOnBean(annotation = EnableAsyncServer.class)
public class AsyncServerConfig {
@Value("${spatiotemporal.async.comp-id}")
private String compId;
@Bean
public String compId() {
return compId;
}
@Bean
/**
* 异步调用通讯用MQ
* @return
*/
public Queue asyncQueue() {
Queue queue = new Queue(AsyncConstants.ASYNC_CALL_MQ + compId);
return queue;
}
// @Bean
// public DirectExchange asyncCallExchange() {
// DirectExchange exchange = new DirectExchange(AsyncConstants.ASYNC_RABBIT_MQ_EXCHANGE);
// return exchange;
// }
//
// @Bean
// public Binding asyncServerBinding(DirectExchange asyncCallExchange, Queue asyncQueue, String compId) {
// //绑定消费规则
// Binding binding = BindingBuilder.bind(asyncQueue).to(asyncCallExchange).with(AsyncConstants.ASYNC_CALL_ROUTING_KEY + compId);
// return binding;
// }
}
package cn.spatiotemporal.core.config.ftp;
import javax.annotation.PreDestroy;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import cn.spatiotemporal.core.ftp.FtpClientPooledObjectFactory;
import cn.spatiotemporal.core.ftp.FtpDao;
import cn.spatiotemporal.core.ftp.FtpTemplate;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Configuration
@ConditionalOnClass({GenericObjectPool.class, FTPClient.class})
@ConditionalOnProperty(prefix = "spatiotemporal.ftp", name = "ip", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(FtpProperties.class)
public class FtpConfiguration {
private final FtpProperties ftpProperties;
@Autowired
public FtpConfiguration(FtpProperties ftpProperties) {
this.ftpProperties = ftpProperties;
}
private ObjectPool<FTPClient> pool;
/**
* 预加载FTPClient连接到对象池中
*
* @param initialSize
* @param maxIdle
*/
private void preLoadingFtpClient(Integer initialSize, int maxIdle) {
//如果初始化大小为null或者小于等于0,则不执行逻辑
if (null == initialSize || initialSize <= 0) {
return;
}
int size = Math.min(initialSize, maxIdle);
try {
for (int i = 0; i < size; i++) {
pool.addObject();
}
} catch (Exception e) {
log.error("预加载失败!", (Object) e.getStackTrace());
}
}
/**
* 销毁方法
*/
@PreDestroy
public void destroy() {
if (null != pool) {
pool.close();
log.info("销毁ftp客户端连接池。。。");
}
}
/**
* 判断不存在业务Service时初始化默认Bean到Spring
*/
@Bean
@ConditionalOnMissingBean(FtpDao.class)
public FtpDao ftpDao() {
log.info("没有找到ftpDao,执行创建默认ftpDao");
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setTestOnBorrow(true);
poolConfig.setTestOnReturn(true);
poolConfig.setTestWhileIdle(true);
poolConfig.setMinEvictableIdleTimeMillis(6000);
poolConfig.setSoftMinEvictableIdleTimeMillis(50000);
poolConfig.setTimeBetweenEvictionRunsMillis(30000);
pool = new GenericObjectPool<>(new FtpClientPooledObjectFactory(ftpProperties), poolConfig);
preLoadingFtpClient(ftpProperties.getInitialSize(), poolConfig.getMaxIdle());
FtpTemplate dao = new FtpTemplate(ftpProperties);
dao.setFtpClientPool(pool);
dao.setHasInit(true);
return dao;
}
}
package cn.spatiotemporal.core.config.ftp;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;
@Data
@Component
@ConfigurationProperties(prefix = "spatiotemporal.ftp")
public class FtpProperties {
private String ip;
private String port;
private String username;
private String password;
private Integer initialSize = 0;
private String encoding = "UTF-8";
private Integer bufferSize = 65536;
private Integer retryCount = 3;
}
package cn.spatiotemporal.core.config.redis;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
/**
* Redis工具配置
* @author marquis
*
*/
@Configuration
public class RedisConfiguration {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
//使用fastjson序列化
FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
// key的序列化采用StringRedisSerializer
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setConnectionFactory(factory);
return template;
}
}
package cn.spatiotemporal.core.config.security;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.UUID;
import org.apache.commons.lang.math.RandomUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 安全配置:模块进程ID
* @author marquis
*
*/
@Configuration
public class InstanceConfig {
private static final String instancePath = "com/config";
private static final String instanceFile = "instance.bin";
/**
* 模块运行进程实例ID
*/
private final static String compInstanceId;
// 初始化进程实例ID
static {
long instanceId = 0;
try {
instanceId = loadInstanceId();
} catch (IOException e) {
instanceId = UUID.randomUUID().getMostSignificantBits();
saveInstanceId(instanceId);
}
compInstanceId = Long.toHexString(instanceId).toUpperCase();
}
@Bean
public String compInstanceId() {
return compInstanceId;
}
private static long loadInstanceId() throws IOException {
File file = new File(instancePath + "/" + instanceFile);
InputStream is = new FileInputStream(file);
byte[] bytes = new byte[8];
is.read(bytes, 0, 8);
is.close();
return bytes2long(bytes);
}
private static void saveInstanceId(long instanceId) {
try {
File dir = new File(instancePath);
if (!dir.exists()) {
dir.mkdirs();
}
File file = new File(instancePath + "/" + instanceFile);
if (!file.exists()) {
file.createNewFile();
}
OutputStream os = new FileOutputStream(file);
os.write(long2bytes(instanceId));
os.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* long型转二进制字节流(低位在前,高位在后)
* @param value
* @return
*/
private static byte[] long2bytes(long value) {
byte[] bytes = new byte[8];
for (int i = 0; i < 8; i++) {
bytes[i] = (byte) ((value >> (i * 8)) & 0xFF);
}
return bytes;
}
/**
* 二进制字节流转long型
* @param bytes
* @return
*/
private static long bytes2long(byte[] bytes) {
long value = 0;
for (int i = 0; i < 8; i++) {
value += bytes[i] << (i * 8);
}
return value;
}
}
package cn.spatiotemporal.core.env;
import java.util.UUID;
/**
* 分布式模块的环境参数
* @author marquis
*
*/
public class Module {
private final static long instanceId = UUID.randomUUID().getMostSignificantBits();
/**
* 禁止实例化
*/
private Module() {
}
/**
* 获取分布式模块各自的实例全局唯一ID
* 每一个正在运行的模块都有自己的实例ID
* @return
*/
public static long getInstanceId() {
return instanceId;
}
public static String getHostname() {
// TODO 未实现
return "";
};
public static String getIp() {
// TODO 未实现
return "";
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment