feat: 框架代码全面升级webflux,兼容原

This commit is contained in:
wangyu 2021-12-07 18:02:32 +08:00
parent de1350abb3
commit defcf09d91
13 changed files with 169 additions and 49 deletions

View File

@ -16,5 +16,4 @@ public interface BeanAuditor<T extends Domain> {
* @return 结果
*/
void audit(T data);
}

View File

@ -4,6 +4,7 @@ import com.flyfish.framework.domain.base.Domain;
/**
* 存储完bean的后置审查
*
* @author wangyu
*/
public interface BeanPoster<T extends Domain> {

View File

@ -25,11 +25,11 @@ public class OperationAuditor implements BeanAuditor<AuditDomain> {
if (null != currentUser) {
if (StringUtils.isNotBlank(data.getId())) {
data.setModifierId(currentUser.getId());
data.setModifier(currentUser.getModifier());
data.setModifier(currentUser.getName());
} else {
data.setCreatorId(currentUser.getId());
data.setCreator(currentUser.getName());
data.setModifierId(currentUser.getModifierId());
data.setModifierId(currentUser.getId());
}
}
}

View File

@ -0,0 +1,20 @@
package com.flyfish.framework.auditor;
import com.flyfish.framework.domain.base.Domain;
import reactor.core.publisher.Mono;
/**
* 异步的对bean进行审查
*
* @param <T> 泛型
*/
public interface ReactiveBeanAuditor<T extends Domain> {
/**
* 对实体进行审查并补全相关字段
*
* @param data 原数据
* @return 结果
*/
Mono<T> audit(T data);
}

View File

@ -0,0 +1,19 @@
package com.flyfish.framework.auditor;
import com.flyfish.framework.domain.base.Domain;
import reactor.core.publisher.Mono;
/**
* 异步的对bean进行后置审查
*
* @param <T> 泛型
*/
public interface ReactiveBeanPoster<T extends Domain> {
/**
* 对入库的实体进行审查并执行额外功能
*
* @param data 原数据
*/
Mono<T> post(T data);
}

View File

@ -0,0 +1,40 @@
package com.flyfish.framework.auditor;
import com.flyfish.framework.context.UserContext;
import com.flyfish.framework.domain.base.AuditDomain;
import com.flyfish.framework.domain.base.IUser;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.core.context.ReactiveSecurityContextHolder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
public class ReactiveOperationAuditor implements ReactiveBeanAuditor<AuditDomain> {
private final UserContext userContext;
/**
* 对实体进行审查并补全相关字段
*
* @param data 原数据
* @return 结果
*/
@Override
public Mono<AuditDomain> audit(AuditDomain data) {
return ReactiveSecurityContextHolder.getContext()
.map(ctx -> ((IUser) ctx.getAuthentication().getPrincipal()).toUser())
.map(user -> {
if (StringUtils.isNotBlank(data.getId())) {
data.setModifierId(user.getId());
data.setModifier(user.getName());
} else {
data.setCreatorId(user.getId());
data.setCreator(user.getName());
data.setModifierId(user.getId());
}
return data;
});
}
}

View File

@ -8,6 +8,7 @@ import com.flyfish.framework.enums.BlankEnum;
import com.flyfish.framework.enums.NamedEnum;
import com.flyfish.framework.utils.Assert;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.reflections.Reflections;
@ -16,6 +17,8 @@ import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.lang.reflect.Field;
import java.net.URL;
@ -33,6 +36,7 @@ import java.util.stream.Collectors;
*/
@RequiredArgsConstructor
@Component
@Slf4j
public class DictionaryProcessor implements InitializingBean {
private final DictionaryService dictionaryService;
@ -48,17 +52,19 @@ public class DictionaryProcessor implements InitializingBean {
.setScanners(Scanners.FieldsAnnotated));
Set<Field> fields = reflections.getFieldsAnnotatedWith(DictValue.class);
if (CollectionUtils.isNotEmpty(fields)) {
// 查找是否存在不存在插入存在无视
List<Dictionary> dictionaries = fields.stream().map(field -> field.getAnnotation(DictValue.class))
Flux.fromIterable(fields)
.map(field -> field.getAnnotation(DictValue.class))
.filter(annotation -> null != annotation && StringUtils.isNotBlank(annotation.value()) &&
BlankEnum.class != annotation.enumType())
.map(annotation -> dictionaryService
// 查找是否存在不存在插入存在无视
.flatMap(annotation -> dictionaryService
.getByCode(annotation.value())
.map(dictionary -> applyDictProps(dictionary, annotation))
.orElseGet(() -> applyDictProps(new Dictionary(), annotation)))
.distinct()
.collect(Collectors.toList());
dictionaryService.updateBatch(dictionaries);
.switchIfEmpty(Mono.fromCallable(() -> applyDictProps(new Dictionary(), annotation)))
)
.collectList()
.flatMapMany(dictionaryService::updateBatch)
.subscribe();
}
}

View File

@ -2,6 +2,7 @@ package com.flyfish.framework.dict.repository;
import com.flyfish.framework.dict.domain.Dictionary;
import com.flyfish.framework.repository.DefaultReactiveRepository;
import reactor.core.publisher.Mono;
import java.util.Optional;
@ -18,5 +19,5 @@ public interface DictionaryRepository extends DefaultReactiveRepository<Dictiona
* @param code 编码
* @return 结果
*/
Optional<Dictionary> findByCode(String code);
Mono<Dictionary> findByCode(String code);
}

View File

@ -6,6 +6,7 @@ import com.flyfish.framework.dict.repository.DictionaryRepository;
import com.flyfish.framework.service.impl.BaseReactiveServiceImpl;
import com.flyfish.framework.service.impl.BaseReactiveServiceImpl;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Mono;
import java.util.Optional;
@ -22,7 +23,7 @@ public class DictionaryService extends BaseReactiveServiceImpl<Dictionary> {
*
* @return 结果
*/
public Optional<Dictionary> getByCode(String code) {
public Mono<Dictionary> getByCode(String code) {
DictionaryRepository repository = this.getRepository();
return repository.findByCode(code);
}

View File

@ -7,8 +7,8 @@ import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import javax.annotation.Resource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* 日志切面
@ -40,18 +40,47 @@ public class LogAdvice {
public Object process(ProceedingJoinPoint joinPoint) throws Throwable {
// 构建上下文
LogContext context = LogContext.of(joinPoint);
// 结果信息
Object result = null;
try {
Object result = joinPoint.proceed(joinPoint.getArgs());
context.setResult(result);
return result;
return result = joinPoint.proceed(joinPoint.getArgs());
} catch (Throwable throwable) {
context.setError(throwable);
throw throwable;
} finally {
if (context.isValid()) {
context.end();
handleResult(result, context);
logManager.tryLog(context);
}
}
}
/**
* 处理结果支持webflux
*
* @param result 结果
* @param context 上下文
*/
private void handleResult(Object result, LogContext context) {
if (null != result) {
// 判断结果是否是publisher
if (result instanceof Mono) {
((Mono<?>) result).subscribe(data -> {
context.setResult(data);
context.end();
});
} else if (result instanceof Flux) {
((Flux<?>) result).collectList().subscribe(list -> {
context.setResult(list);
context.end();
});
} else {
context.setResult(result);
context.end();
}
} else {
// 为空直接标记结束
context.end();
}
}
}

View File

@ -1,7 +1,7 @@
package com.flyfish.framework.config.audit;
import com.flyfish.framework.auditor.BeanAuditor;
import com.flyfish.framework.auditor.BeanPoster;
import com.flyfish.framework.auditor.ReactiveBeanAuditor;
import com.flyfish.framework.auditor.ReactiveBeanPoster;
import com.flyfish.framework.config.constants.UserCacheKeys;
import com.flyfish.framework.domain.po.User;
import com.flyfish.framework.enums.UserType;
@ -12,10 +12,11 @@ import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
@Component
@RequiredArgsConstructor
public class UserBeanAuditor implements BeanAuditor<User>, BeanPoster<User> {
public class UserBeanAuditor implements ReactiveBeanAuditor<User>, ReactiveBeanPoster<User> {
private final PasswordEncoder passwordEncoder;
private final ReactiveRedisOperations reactiveRedisOperations;
@ -26,7 +27,7 @@ public class UserBeanAuditor implements BeanAuditor<User>, BeanPoster<User> {
* @param entity 原数据
*/
@Override
public void audit(User entity) {
public Mono<User> audit(User entity) {
if (null == entity.getId() && StringUtils.isNotBlank(entity.getPassword())) {
Assert.isTrue(StrengthUtils.isValid(entity.getPassword()), "密码强度不够,至少应该包含数字、大小写字母、符号组合");
entity.setPassword(passwordEncoder.encode(entity.getPassword()));
@ -43,6 +44,7 @@ public class UserBeanAuditor implements BeanAuditor<User>, BeanPoster<User> {
if (null == entity.getCode()) {
entity.setCode(entity.getUsername());
}
return Mono.just(entity);
}
/**
@ -51,8 +53,8 @@ public class UserBeanAuditor implements BeanAuditor<User>, BeanPoster<User> {
* @param data 原数据
*/
@Override
public void post(User data) {
public Mono<User> post(User data) {
// 更新缓存
reactiveRedisOperations.del(UserCacheKeys.get(data.getUsername())).subscribe();
return reactiveRedisOperations.del(UserCacheKeys.get(data.getUsername())).then(Mono.just(data));
}
}

View File

@ -58,10 +58,12 @@ public class PageQueryArgumentResolver extends HandlerMethodArgumentResolverSupp
if (qo instanceof Qo) {
Qo query = (Qo) qo;
query.setPageable(pageable);
return ReactiveSecurityContextHolder.getContext().map(securityContext -> {
query.setUser(UserUtils.extractUser(securityContext));
return query;
}).defaultIfEmpty(query);
return ReactiveSecurityContextHolder.getContext()
.map(UserUtils::extractUser)
.map(user -> {
query.setUser(user);
return query;
}).defaultIfEmpty(query);
}
return Mono.just(qo);
});

View File

@ -1,7 +1,7 @@
package com.flyfish.framework.service.impl;
import com.flyfish.framework.auditor.BeanAuditor;
import com.flyfish.framework.auditor.BeanPoster;
import com.flyfish.framework.auditor.ReactiveBeanAuditor;
import com.flyfish.framework.auditor.ReactiveBeanPoster;
import com.flyfish.framework.bean.SyncVo;
import com.flyfish.framework.domain.base.AuditDomain;
import com.flyfish.framework.domain.base.Domain;
@ -17,6 +17,7 @@ import org.springframework.data.domain.Example;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.util.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -36,11 +37,11 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
@Autowired
protected DefaultReactiveRepository<T> repository;
@Autowired(required = false)
protected BeanAuditor<T> auditor;
protected ReactiveBeanAuditor<T> auditor;
@Autowired(required = false)
protected BeanPoster<T> poster;
protected ReactiveBeanPoster<T> poster;
@Autowired
private BeanAuditor<AuditDomain> operationAuditor;
private ReactiveBeanAuditor<AuditDomain> operationAuditor;
/**
* 查询
@ -220,7 +221,7 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
*/
@Override
public Mono<T> create(T entity) {
return repository.insert(audit(entity)).map(this::post);
return audit(entity).flatMap(repository::insert).flatMap(this::post);
}
/**
@ -230,7 +231,7 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
*/
@Override
public Mono<T> createSelective(T entity) {
return repository.insert(audit(entity)).map(this::post);
return audit(entity).flatMap(repository::insert).flatMap(this::post);
}
/**
@ -260,7 +261,7 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
*/
@Override
public Mono<T> updateById(T entity) {
return repository.save(audit(entity)).map(this::post);
return audit(entity).flatMap(repository::save).flatMap(this::post);
}
/**
@ -335,13 +336,13 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
.filter(CollectionUtils::isNotEmpty)
.flatMapMany(list -> repository.findAllById(list))
.collectMap(Domain::getId, t -> t)
.map(map -> entities.stream()
.flatMap(map -> Flux.fromIterable(entities)
// 补全已保存信息
.map(t -> map.containsKey(t.getId()) ? CopyUtils.copyProps(t, map.get(t.getId())) : t)
.map(this::audit)
.collect(Collectors.toList()))
.flatMap(this::audit)
.collectList())
.flatMapMany(list -> repository.saveAll(list))
.map(this::post);
.flatMap(this::post);
}
return Flux.fromIterable(entities);
}
@ -367,16 +368,16 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
*
* @param entity 实体
*/
protected T audit(T entity) {
protected Mono<T> audit(T entity) {
// 用户审查
if (entity instanceof AuditDomain) {
operationAuditor.audit((AuditDomain) entity);
return CastUtils.cast(operationAuditor.audit((AuditDomain) entity));
}
// 自定义审查
if (auditor != null) {
auditor.audit(entity);
return auditor.audit(entity);
}
return entity;
return Mono.just(entity);
}
/**
@ -385,16 +386,15 @@ public class BaseReactiveServiceImpl<T extends Domain> implements BaseReactiveSe
* @param entity 实体
* @return 结果
*/
protected T post(T entity) {
protected Mono<T> post(T entity) {
if (null != poster) {
poster.post(entity);
return poster.post(entity);
}
return entity;
return Mono.just(entity);
}
@SuppressWarnings("unchecked")
public <R extends DefaultReactiveRepository<T>> R getRepository() {
return (R) repository;
return CastUtils.cast(repository);
}
}