diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanAuditor.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanAuditor.java index b381493..dec283a 100644 --- a/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanAuditor.java +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanAuditor.java @@ -16,5 +16,4 @@ public interface BeanAuditor { * @return 结果 */ void audit(T data); - } diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanPoster.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanPoster.java index fac42e2..478e2b8 100644 --- a/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanPoster.java +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/BeanPoster.java @@ -4,6 +4,7 @@ import com.flyfish.framework.domain.base.Domain; /** * 存储完bean的后置审查 + * * @author wangyu */ public interface BeanPoster { diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/OperationAuditor.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/OperationAuditor.java index 3912c35..866c598 100644 --- a/flyfish-data/src/main/java/com/flyfish/framework/auditor/OperationAuditor.java +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/OperationAuditor.java @@ -25,11 +25,11 @@ public class OperationAuditor implements BeanAuditor { if (null != currentUser) { if (StringUtils.isNotBlank(data.getId())) { data.setModifierId(currentUser.getId()); - data.setModifier(currentUser.getModifier()); + data.setModifier(currentUser.getName()); } else { data.setCreatorId(currentUser.getId()); data.setCreator(currentUser.getName()); - data.setModifierId(currentUser.getModifierId()); + data.setModifierId(currentUser.getId()); } } } diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanAuditor.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanAuditor.java new file mode 100644 index 0000000..73d0719 --- /dev/null +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanAuditor.java @@ -0,0 +1,20 @@ +package com.flyfish.framework.auditor; + +import com.flyfish.framework.domain.base.Domain; +import reactor.core.publisher.Mono; + +/** + * 异步的对bean进行审查 + * + * @param 泛型 + */ +public interface ReactiveBeanAuditor { + + /** + * 对实体进行审查,并补全相关字段 + * + * @param data 原数据 + * @return 结果 + */ + Mono audit(T data); +} diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanPoster.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanPoster.java new file mode 100644 index 0000000..6f2dcdc --- /dev/null +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveBeanPoster.java @@ -0,0 +1,19 @@ +package com.flyfish.framework.auditor; + +import com.flyfish.framework.domain.base.Domain; +import reactor.core.publisher.Mono; + +/** + * 异步的对bean进行后置审查 + * + * @param 泛型 + */ +public interface ReactiveBeanPoster { + + /** + * 对入库的实体进行审查,并执行额外功能 + * + * @param data 原数据 + */ + Mono post(T data); +} diff --git a/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveOperationAuditor.java b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveOperationAuditor.java new file mode 100644 index 0000000..615be4e --- /dev/null +++ b/flyfish-data/src/main/java/com/flyfish/framework/auditor/ReactiveOperationAuditor.java @@ -0,0 +1,40 @@ +package com.flyfish.framework.auditor; + +import com.flyfish.framework.context.UserContext; +import com.flyfish.framework.domain.base.AuditDomain; +import com.flyfish.framework.domain.base.IUser; +import lombok.RequiredArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.springframework.security.core.context.ReactiveSecurityContextHolder; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +@Component +@RequiredArgsConstructor +public class ReactiveOperationAuditor implements ReactiveBeanAuditor { + + private final UserContext userContext; + + /** + * 对实体进行审查,并补全相关字段 + * + * @param data 原数据 + * @return 结果 + */ + @Override + public Mono audit(AuditDomain data) { + return ReactiveSecurityContextHolder.getContext() + .map(ctx -> ((IUser) ctx.getAuthentication().getPrincipal()).toUser()) + .map(user -> { + if (StringUtils.isNotBlank(data.getId())) { + data.setModifierId(user.getId()); + data.setModifier(user.getName()); + } else { + data.setCreatorId(user.getId()); + data.setCreator(user.getName()); + data.setModifierId(user.getId()); + } + return data; + }); + } +} diff --git a/flyfish-dict/src/main/java/com/flyfish/framework/dict/config/DictionaryProcessor.java b/flyfish-dict/src/main/java/com/flyfish/framework/dict/config/DictionaryProcessor.java index e91b9da..fea6cb0 100644 --- a/flyfish-dict/src/main/java/com/flyfish/framework/dict/config/DictionaryProcessor.java +++ b/flyfish-dict/src/main/java/com/flyfish/framework/dict/config/DictionaryProcessor.java @@ -8,6 +8,7 @@ import com.flyfish.framework.enums.BlankEnum; import com.flyfish.framework.enums.NamedEnum; import com.flyfish.framework.utils.Assert; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.reflections.Reflections; @@ -16,6 +17,8 @@ import org.reflections.util.ClasspathHelper; import org.reflections.util.ConfigurationBuilder; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.lang.reflect.Field; import java.net.URL; @@ -33,6 +36,7 @@ import java.util.stream.Collectors; */ @RequiredArgsConstructor @Component +@Slf4j public class DictionaryProcessor implements InitializingBean { private final DictionaryService dictionaryService; @@ -48,17 +52,19 @@ public class DictionaryProcessor implements InitializingBean { .setScanners(Scanners.FieldsAnnotated)); Set fields = reflections.getFieldsAnnotatedWith(DictValue.class); if (CollectionUtils.isNotEmpty(fields)) { - // 查找是否存在,不存在插入,存在无视 - List dictionaries = fields.stream().map(field -> field.getAnnotation(DictValue.class)) + Flux.fromIterable(fields) + .map(field -> field.getAnnotation(DictValue.class)) .filter(annotation -> null != annotation && StringUtils.isNotBlank(annotation.value()) && BlankEnum.class != annotation.enumType()) - .map(annotation -> dictionaryService + // 查找是否存在,不存在插入,存在无视 + .flatMap(annotation -> dictionaryService .getByCode(annotation.value()) .map(dictionary -> applyDictProps(dictionary, annotation)) - .orElseGet(() -> applyDictProps(new Dictionary(), annotation))) - .distinct() - .collect(Collectors.toList()); - dictionaryService.updateBatch(dictionaries); + .switchIfEmpty(Mono.fromCallable(() -> applyDictProps(new Dictionary(), annotation))) + ) + .collectList() + .flatMapMany(dictionaryService::updateBatch) + .subscribe(); } } diff --git a/flyfish-dict/src/main/java/com/flyfish/framework/dict/repository/DictionaryRepository.java b/flyfish-dict/src/main/java/com/flyfish/framework/dict/repository/DictionaryRepository.java index d201379..7328abd 100644 --- a/flyfish-dict/src/main/java/com/flyfish/framework/dict/repository/DictionaryRepository.java +++ b/flyfish-dict/src/main/java/com/flyfish/framework/dict/repository/DictionaryRepository.java @@ -2,6 +2,7 @@ package com.flyfish.framework.dict.repository; import com.flyfish.framework.dict.domain.Dictionary; import com.flyfish.framework.repository.DefaultReactiveRepository; +import reactor.core.publisher.Mono; import java.util.Optional; @@ -18,5 +19,5 @@ public interface DictionaryRepository extends DefaultReactiveRepository findByCode(String code); + Mono findByCode(String code); } diff --git a/flyfish-dict/src/main/java/com/flyfish/framework/dict/service/DictionaryService.java b/flyfish-dict/src/main/java/com/flyfish/framework/dict/service/DictionaryService.java index c4f208b..073175a 100644 --- a/flyfish-dict/src/main/java/com/flyfish/framework/dict/service/DictionaryService.java +++ b/flyfish-dict/src/main/java/com/flyfish/framework/dict/service/DictionaryService.java @@ -6,6 +6,7 @@ import com.flyfish.framework.dict.repository.DictionaryRepository; import com.flyfish.framework.service.impl.BaseReactiveServiceImpl; import com.flyfish.framework.service.impl.BaseReactiveServiceImpl; import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; import java.util.Optional; @@ -22,7 +23,7 @@ public class DictionaryService extends BaseReactiveServiceImpl { * * @return 结果 */ - public Optional getByCode(String code) { + public Mono getByCode(String code) { DictionaryRepository repository = this.getRepository(); return repository.findByCode(code); } diff --git a/flyfish-logging/src/main/java/com/flyfish/framework/logging/advice/LogAdvice.java b/flyfish-logging/src/main/java/com/flyfish/framework/logging/advice/LogAdvice.java index 002fef9..18e9207 100644 --- a/flyfish-logging/src/main/java/com/flyfish/framework/logging/advice/LogAdvice.java +++ b/flyfish-logging/src/main/java/com/flyfish/framework/logging/advice/LogAdvice.java @@ -7,8 +7,8 @@ import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; - -import javax.annotation.Resource; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * 日志切面 @@ -40,18 +40,47 @@ public class LogAdvice { public Object process(ProceedingJoinPoint joinPoint) throws Throwable { // 构建上下文 LogContext context = LogContext.of(joinPoint); + // 结果信息 + Object result = null; try { - Object result = joinPoint.proceed(joinPoint.getArgs()); - context.setResult(result); - return result; + return result = joinPoint.proceed(joinPoint.getArgs()); } catch (Throwable throwable) { context.setError(throwable); throw throwable; } finally { if (context.isValid()) { - context.end(); + handleResult(result, context); logManager.tryLog(context); } } } + + /** + * 处理结果,支持webflux + * + * @param result 结果 + * @param context 上下文 + */ + private void handleResult(Object result, LogContext context) { + if (null != result) { + // 判断结果是否是publisher + if (result instanceof Mono) { + ((Mono) result).subscribe(data -> { + context.setResult(data); + context.end(); + }); + } else if (result instanceof Flux) { + ((Flux) result).collectList().subscribe(list -> { + context.setResult(list); + context.end(); + }); + } else { + context.setResult(result); + context.end(); + } + } else { + // 为空,直接标记结束 + context.end(); + } + } } diff --git a/flyfish-user/src/main/java/com/flyfish/framework/config/audit/UserBeanAuditor.java b/flyfish-user/src/main/java/com/flyfish/framework/config/audit/UserBeanAuditor.java index 5c0e015..f745cb6 100644 --- a/flyfish-user/src/main/java/com/flyfish/framework/config/audit/UserBeanAuditor.java +++ b/flyfish-user/src/main/java/com/flyfish/framework/config/audit/UserBeanAuditor.java @@ -1,7 +1,7 @@ package com.flyfish.framework.config.audit; -import com.flyfish.framework.auditor.BeanAuditor; -import com.flyfish.framework.auditor.BeanPoster; +import com.flyfish.framework.auditor.ReactiveBeanAuditor; +import com.flyfish.framework.auditor.ReactiveBeanPoster; import com.flyfish.framework.config.constants.UserCacheKeys; import com.flyfish.framework.domain.po.User; import com.flyfish.framework.enums.UserType; @@ -12,10 +12,11 @@ import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.springframework.security.crypto.password.PasswordEncoder; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; @Component @RequiredArgsConstructor -public class UserBeanAuditor implements BeanAuditor, BeanPoster { +public class UserBeanAuditor implements ReactiveBeanAuditor, ReactiveBeanPoster { private final PasswordEncoder passwordEncoder; private final ReactiveRedisOperations reactiveRedisOperations; @@ -26,7 +27,7 @@ public class UserBeanAuditor implements BeanAuditor, BeanPoster { * @param entity 原数据 */ @Override - public void audit(User entity) { + public Mono audit(User entity) { if (null == entity.getId() && StringUtils.isNotBlank(entity.getPassword())) { Assert.isTrue(StrengthUtils.isValid(entity.getPassword()), "密码强度不够,至少应该包含数字、大小写字母、符号组合"); entity.setPassword(passwordEncoder.encode(entity.getPassword())); @@ -43,6 +44,7 @@ public class UserBeanAuditor implements BeanAuditor, BeanPoster { if (null == entity.getCode()) { entity.setCode(entity.getUsername()); } + return Mono.just(entity); } /** @@ -51,8 +53,8 @@ public class UserBeanAuditor implements BeanAuditor, BeanPoster { * @param data 原数据 */ @Override - public void post(User data) { + public Mono post(User data) { // 更新缓存 - reactiveRedisOperations.del(UserCacheKeys.get(data.getUsername())).subscribe(); + return reactiveRedisOperations.del(UserCacheKeys.get(data.getUsername())).then(Mono.just(data)); } } diff --git a/flyfish-web/src/main/java/com/flyfish/framework/configuration/resolver/PageQueryArgumentResolver.java b/flyfish-web/src/main/java/com/flyfish/framework/configuration/resolver/PageQueryArgumentResolver.java index a504ff2..ea705e1 100644 --- a/flyfish-web/src/main/java/com/flyfish/framework/configuration/resolver/PageQueryArgumentResolver.java +++ b/flyfish-web/src/main/java/com/flyfish/framework/configuration/resolver/PageQueryArgumentResolver.java @@ -58,10 +58,12 @@ public class PageQueryArgumentResolver extends HandlerMethodArgumentResolverSupp if (qo instanceof Qo) { Qo query = (Qo) qo; query.setPageable(pageable); - return ReactiveSecurityContextHolder.getContext().map(securityContext -> { - query.setUser(UserUtils.extractUser(securityContext)); - return query; - }).defaultIfEmpty(query); + return ReactiveSecurityContextHolder.getContext() + .map(UserUtils::extractUser) + .map(user -> { + query.setUser(user); + return query; + }).defaultIfEmpty(query); } return Mono.just(qo); }); diff --git a/flyfish-web/src/main/java/com/flyfish/framework/service/impl/BaseReactiveServiceImpl.java b/flyfish-web/src/main/java/com/flyfish/framework/service/impl/BaseReactiveServiceImpl.java index 4b4bd68..4e2a202 100644 --- a/flyfish-web/src/main/java/com/flyfish/framework/service/impl/BaseReactiveServiceImpl.java +++ b/flyfish-web/src/main/java/com/flyfish/framework/service/impl/BaseReactiveServiceImpl.java @@ -1,7 +1,7 @@ package com.flyfish.framework.service.impl; -import com.flyfish.framework.auditor.BeanAuditor; -import com.flyfish.framework.auditor.BeanPoster; +import com.flyfish.framework.auditor.ReactiveBeanAuditor; +import com.flyfish.framework.auditor.ReactiveBeanPoster; import com.flyfish.framework.bean.SyncVo; import com.flyfish.framework.domain.base.AuditDomain; import com.flyfish.framework.domain.base.Domain; @@ -17,6 +17,7 @@ import org.springframework.data.domain.Example; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; +import org.springframework.data.util.CastUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,11 +37,11 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe @Autowired protected DefaultReactiveRepository repository; @Autowired(required = false) - protected BeanAuditor auditor; + protected ReactiveBeanAuditor auditor; @Autowired(required = false) - protected BeanPoster poster; + protected ReactiveBeanPoster poster; @Autowired - private BeanAuditor operationAuditor; + private ReactiveBeanAuditor operationAuditor; /** * 查询 @@ -220,7 +221,7 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe */ @Override public Mono create(T entity) { - return repository.insert(audit(entity)).map(this::post); + return audit(entity).flatMap(repository::insert).flatMap(this::post); } /** @@ -230,7 +231,7 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe */ @Override public Mono createSelective(T entity) { - return repository.insert(audit(entity)).map(this::post); + return audit(entity).flatMap(repository::insert).flatMap(this::post); } /** @@ -260,7 +261,7 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe */ @Override public Mono updateById(T entity) { - return repository.save(audit(entity)).map(this::post); + return audit(entity).flatMap(repository::save).flatMap(this::post); } /** @@ -335,13 +336,13 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe .filter(CollectionUtils::isNotEmpty) .flatMapMany(list -> repository.findAllById(list)) .collectMap(Domain::getId, t -> t) - .map(map -> entities.stream() + .flatMap(map -> Flux.fromIterable(entities) // 补全已保存信息 .map(t -> map.containsKey(t.getId()) ? CopyUtils.copyProps(t, map.get(t.getId())) : t) - .map(this::audit) - .collect(Collectors.toList())) + .flatMap(this::audit) + .collectList()) .flatMapMany(list -> repository.saveAll(list)) - .map(this::post); + .flatMap(this::post); } return Flux.fromIterable(entities); } @@ -367,16 +368,16 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe * * @param entity 实体 */ - protected T audit(T entity) { + protected Mono audit(T entity) { // 用户审查 if (entity instanceof AuditDomain) { - operationAuditor.audit((AuditDomain) entity); + return CastUtils.cast(operationAuditor.audit((AuditDomain) entity)); } // 自定义审查 if (auditor != null) { - auditor.audit(entity); + return auditor.audit(entity); } - return entity; + return Mono.just(entity); } /** @@ -385,16 +386,15 @@ public class BaseReactiveServiceImpl implements BaseReactiveSe * @param entity 实体 * @return 结果 */ - protected T post(T entity) { + protected Mono post(T entity) { if (null != poster) { - poster.post(entity); + return poster.post(entity); } - return entity; + return Mono.just(entity); } - @SuppressWarnings("unchecked") public > R getRepository() { - return (R) repository; + return CastUtils.cast(repository); } }