flink DataStream API使用及原理

傳統的大數據處理方式一般是批處理式的,也就是說,今天所收集的數據,我們明天再把今天收集到的數據算出來,以供大家使用,但是在很多情況下,數據的時效性對於業務的成敗是非常關鍵的。

Spark 和 Flink 都是通用的開源大規模處理引擎,目標是在一個系統中支持所有的數據處理以帶來效能的提升。兩者都有相對比較成熟的生態系統。是下一代大數據引擎最有力的競爭者。

Spark 的生態總體更完善一些,在機器學習的集成和易用性上暫時領先。

Flink 在流計算上有明顯優勢,核心架構和模型也更透徹和靈活一些。

本文主要通過實例來分析flink的流式處理過程,並通過源碼的方式來介紹流式處理的內部機制。

DataStream整體概述

主要分5部分,下面我們來分別介紹:

 1.運行環境StreamExecutionEnvironment

StreamExecutionEnvironment是個抽象類,是流式處理的容器,實現類有兩個,分別是

LocalStreamEnvironment:
RemoteStreamEnvironment:
/**
 * The StreamExecutionEnvironment is the context in which a streaming program is executed. A
 * {@link LocalStreamEnvironment} will cause execution in the current JVM, a
 * {@link RemoteStreamEnvironment} will cause execution on a remote setup.
 *
 * <p>The environment provides methods to control the job execution (such as setting the parallelism
 * or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).
 *
 * @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment
 * @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
 */

2.數據源DataSource數據輸入

包含了輸入格式InputFormat

    /**
     * Creates a new data source.
     *
     * @param context The environment in which the data source gets executed.
     * @param inputFormat The input format that the data source executes.
     * @param type The type of the elements produced by this input format.
     */
    public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {
        super(context, type);

        this.dataSourceLocationName = dataSourceLocationName;

        if (inputFormat == null) {
            throw new IllegalArgumentException("The input format may not be null.");
        }

        this.inputFormat = inputFormat;

        if (inputFormat instanceof NonParallelInput) {
            this.parallelism = 1;
        }
    }

 flink將數據源主要分為內置數據源和第三方數據源,內置數據源有 文件,網絡socket端口及集合類型數據;第三方數據源實用Connector的方式來連接如kafka Connector,es connector等,自己定義的話,可以實現SourceFunction,封裝成Connector來做。

 

3.DataStream轉換

DataStream:同一個類型的流元素,DataStream可以通過transformation轉換成另外的DataStream,示例如下

@link DataStream#map

@link DataStream#filter

 StreamOperator:流式算子的基本接口,三個實現類

AbstractStreamOperator:

OneInputStreamOperator:

TwoInputStreamOperator:

/**
 * Basic interface for stream operators. Implementers would implement one of
 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
 * {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators
 * that process elements.
 *
 * <p>The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}
 * offers default implementation for the lifecycle and properties methods.
 *
 * <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using
 * the timer service, timer callbacks are also guaranteed not to be called concurrently with
 * methods on {@code StreamOperator}.
 *
 * @param <OUT> The output type of the operator
 */

 4.DataStreamSink輸出

    /**
     * Adds the given sink to this DataStream. Only streams with sinks added
     * will be executed once the {@link StreamExecutionEnvironment#execute()}
     * method is called.
     *
     * @param sinkFunction
     *            The object containing the sink's invoke function.
     * @return The closed DataStream.
     */
    public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }

5.執行

/**
     * Executes the JobGraph of the on a mini cluster of ClusterUtil with a user
     * specified name.
     *
     * @param jobName
     *            name of the job
     * @return The result of the job execution, containing elapsed time and accumulators.
     */
    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);

        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.BIND_PORT)) {
            configuration.setString(RestOptions.BIND_PORT, "0");
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

6.總結

  Flink的執行方式類似於管道,它借鑒了數據庫的一些執行原理,實現了自己獨特的執行方式。

7.展望

Stream涉及的內容還包括Watermark,window等概念,因篇幅限制,這篇僅介紹flink DataStream API使用及原理。

下篇將介紹Watermark,下下篇是windows窗口計算。

參考資料

【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc

【2】https://blog.51cto.com/13654660/2087705

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

Spring IoC容器與應用上下文的設計與實現

一、前言

  寫這篇博文的主要目的如下:

  • 通過相關類和接口分析IoC容器到底長什麼樣。
  • 闡述筆者對Spring上下文和容器的理解。
  • 介紹重要的類輔助理解SpringBoot的啟動流程。

 

二、Spring IoC容器的設計

  看看下面這張圖(摘自《Spring技術內幕》),IoC容器的設計分為兩條線,

  1.   BeanFactory ==> HierarchicalBeanFactory ==>ConfigurableBeanFactory ,這條線可以理解成IoC容器的設計路線。
  2.   BeanFactory ==> ListableBeanFactory ==> ApplicationContext ==> ConfigurableApplicationContext ,這條可以成為Spring應用上下文的設計路線。

  為什麼這樣要分兩條線呢,主要是將容器和上下文區分開來。因為在在Spring項目中,上下文對容器不僅是擴展的關係,更重要的是持有的關係,上下文是以屬性的形式持有了容器,開發者可以通過上下文對象獲取到容器。筆者十分傾向於將二者分開來理解。當然也可以將應用上下文理解成容器的高級表現形式。

 

2.1,IoC容器的設計線路

  BeanFactory定義了IoC容器的基本規範,包括getBean()按類型和按名稱的獲取Bean的方法。

   

  HierarchicalBeanFactory 在BeanFactory的基礎上增加了getParentBeanFactory()方法,使BeanFactory具備了雙親IoC容器管理的功能。

  ConfigurableBeanFactory接口提供了配置BeanFactory的各種方法。比如setParentBeanFactory()方法,配置上面提到的雙親IoC容器,addBeanPostProcessor()方法,配置Bean後置處理器等。

  到這裏先埋個包袱:到ConfigurableBeanFactory接口為止,IoC容器還沒有具備作為“容器”最基本的功能,那就是能裝東西。

 

2.2、應用上下文設計路線

  上面說了應用上下文是IoC容器的高級表現形式,ListableBeanFactory具備了操作BeanDefinition 的能力,比如getBeanDefinitionCount()方法,可以獲取Bean的總數等。

  ApplicationContext 類那就厲害了,如下代碼所示,實現了一大堆接口

1 public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory,
2         MessageSource, ApplicationEventPublisher, ResourcePatternResolver
  • MessageSource,支持不同的信息源。具備支持國際化的實現,為開發多語言版本的應用提供服務。
  • ResourcePatternResolver,訪問數據源。具備了從不同地方得到Bean定義資源的能力,比如:xml,java config,註解等等。
  • ApplicationEventPublisher,發布事件。使應用上下文具備了事件機制。事件機製為Bean聲明周期的管理提供了便利。

  WebApplicationContext擴展了對web應用的支持。

  ConfigurableApplicationContext就更重要了,看過Spring源碼的都知道一個重要的方法叫refresh,沒錯就是在這個接口中定義的。最重要的是擴展了配置上下文的功能,和控制上下文生命周期的能力等等。

 

三、IoC容器的具體實現類 DefaultListableBeanFactory(重點)

  首先證明一點,為什麼說DefaultListableBeanFactory類是具體的實現類呢?

  隨便啟動一個SpringBoot項目找到第25行代碼(在SpringBoot的啟動流程系列博文中有介紹)

 1 public ConfigurableApplicationContext run(String... args) {
 2     //記錄程序運行時間
 3     StopWatch stopWatch = new StopWatch();
 4     stopWatch.start();
 5     // ConfigurableApplicationContext Spring 的上下文
 6     ConfigurableApplicationContext context = null;
 7     Collection<SpringBootExceptionReporter> exceptionReporters = new ArrayList<>();
 8     configureHeadlessProperty();
 9     //從META-INF/spring.factories中獲取監聽器
10     //1、獲取並啟動監聽器
11     SpringApplicationRunListeners listeners = getRunListeners(args);
12     listeners.starting();
13     try {
14         ApplicationArguments applicationArguments = new DefaultApplicationArguments(
15                 args);
16         //2、構造容器環境
17         ConfigurableEnvironment environment = prepareEnvironment(listeners, applicationArguments);
18         //處理需要忽略的Bean
19         configureIgnoreBeanInfo(environment);
20         //打印banner
21         Banner printedBanner = printBanner(environment);
22         ///3、初始化容器
23         context = createApplicationContext();
24         //實例化SpringBootExceptionReporter.class,用來支持報告關於啟動的錯誤
25         exceptionReporters = getSpringFactoriesInstances(
26                 SpringBootExceptionReporter.class,
27                 new Class[]{ConfigurableApplicationContext.class}, context);
28         //4、刷新容器前的準備階段
29         prepareContext(context, environment, listeners, applicationArguments, printedBanner);
30         //5、刷新容器
31         refreshContext(context);
32         //刷新容器后的擴展接口
33         afterRefresh(context, applicationArguments);
34         stopWatch.stop();
35         if (this.logStartupInfo) {
36             new StartupInfoLogger(this.mainApplicationClass)
37                     .logStarted(getApplicationLog(), stopWatch);
38         }
39         listeners.started(context);
40         callRunners(context, applicationArguments);
41     } catch (Throwable ex) {
42         handleRunFailure(context, ex, exceptionReporters, listeners);
43         throw new IllegalStateException(ex);
44     }
45 
46     try {
47         listeners.running(context);
48     } catch (Throwable ex) {
49         handleRunFailure(context, ex, exceptionReporters, null);
50         throw new IllegalStateException(ex);
51     }
52     return context;
53 }

  debug

  如2標註點所示IoC容器的真實面孔就是這個DefaultListableBeanFactory類了。當然他還有一個子類XmlBeanFactory,不過都已經被標註為棄用了(@Deprecated)在《Spring技術內幕》這本書裏面也是着重的講的這個類,可能當時作者是以SpringMVC項目來講解的吧。XmlBeanFactory顧名思義就是提供了對xml配置方式的支持。呃。。。又勾起了用SpringMVC的痛苦回憶。

  言歸正傳,

  如下圖,看看他的繼承關係

   章節二中提到了很多的IoC容器系列,這樣總結一下吧,俗話說一流企業做標準,二流企業做產品,章節二中的那一坨就是IoC容器的實現標準,本章節我們要總結的類DefaultListableBeanFactory就是IoC容器的具體產品。

  看見上圖中那一堆接口和類是不是有點懵,沒關係,咱們慢慢梳理一下。

 

3.1,作為IoC容器的基礎設計路線

  這條線路在上一章節中已經梳理過了。只是多出了ConfigurableListableBeanFactory接口,ConfigurableListableBeanFactory接口主要是增加指定忽略類型和接口等

 

3.2、作為IoC容器的高級設計路線

  這條設計路線乍一看還是挺複雜的,的確是這樣。

  1, BeanFactory ==> AutowireCapableBeanFactory ==> AbstractAutowireCapableBeanFactory ==> DefaultListableBeanFactory 

  在這條線路中,AutowireCapableBeanFactory接口定義了自動注入bean(autowireBean()),創建bean(createBean()),初始化bean(initializeBean())方法等。那麼真正實現這些方法的類便是AbstractAutowireCapableBeanFactory。

  AbstractAutowireCapableBeanFactory抽象類中實現了AutowireCapableBeanFactory接口定義的方法。在此基礎上通過繼承AbstractBeanFactory具備了操作Bean的能力。

  2, SingletonBeanRegistry ==> DefaultSingletonBeanRegistry ==> FactoryBeanRegistrySupport ==> AbstractBeanFactory ==> AutowireCapableBeanFactory ==> DefaultListableBeanFactory 

  這條關係鏈有點長,在這條鏈中我們要關心的是SingletonBeanRegistry接口,顧名思義,這個接口是單例Bean的註冊接口。當然也不止註冊這麼簡單。如下圖中所示,除了註冊單例之外,還定義獲取單例的方法。

  注意:為什麼只有singleton的註冊中心,而沒有prototype類型的Bean的註冊中心呢?因為單例Bean(singleton)是Spring幫我們創建的並維護的,原型Bean(prototype)是每次獲取都會創建出來一個實例。本質是不同的。

  3, AliasRegistry ==> SimpleAliasRegistry ==> DefaultSingletonBeanRegistry ==> FactoryBeanRegistrySupport ==> AbstractBeanFactory ==> AutowireCapableBeanFactory ==> DefaultListableBeanFactory 

   這條路線呢,主要是提供管理別稱的能力。因為不是重點,在此就不詳細分析了。

  4, AliasRegistry ==> BeanDefinitionRegistry ==> DefaultListableBeanFactory 

  BeanDefinitionRegistry接口要重點說一下,該接口是BeanDefinition的註冊中心。使DefaultListableBeanFactory具備操作BeanDefinition的能力。看一下它有什麼方法。

  包括了註冊,刪除,獲取BeanDefinition的方法。當然這隻是個接口,這些方法的具體實現在DefaultListableBeanFactory中。

 

3.3、DefaultListableBeanFactory幾個重要的父類和接口

3.3.1, AbstractBeanFactory 抽象類

  如上圖所示,AbstractBeanFactory中實現了BeanFactory中定義的幾個重要的方法。常用的註解 @Autowired @Resource(name = “xxx”) 大家都知道一個是按類查找,一個是按名獲取。具體實現這兩個註解的方法就是上圖中圈出來的幾個方法。幾個getBean()方法最終都進入了doGetBean()方法。doGetBean()方法是實際獲得Bean的地方,也是觸發依賴注入發生的地方。在SpringBoot啟動流程總會對這個方法進行詳細的介紹。

3.3.2, AbstractAutowireCapableBeanFactory 抽象類

  AbstractBeanFactory中實現了getBean()方法,AbstractAutowireCapableBeanFactory中實現了Bean的創建方法。

  當我們需要定義一個Bean通常會有這樣寫 @Bean(name = “test”, initMethod = “init”, destroyMethod = “destroy”) 。AbstractAutowireCapableBeanFactory中完成了一個Bean從 create(createBean()) ==> createInstance(createBeanInstance()) ==> init(invokeInitMethods()) 的所有工作。所以這個抽象類的作用不言而喻。具體的創建過程,會在SpringBoot的啟動流程中詳細介紹。

3.3.3, DefaultSingletonBeanRegistry 讓IoC容器擁有作為“容器”的能力

  其實我們經常說的Spring 容器,這個容器其實更多的是BeanFactory所代表的意義:Bean生產工廠。是滴,通常我們理解的容器應該是能裝東西的,但是spring 容器不是代表代表水桶一樣的東西,而是像富士康一樣,生產東西的地方。比如我們需要一個Bean,我們只需要告訴spring,spring就會給我們。所以到目前為止我們還沒有看到IoC作為“容器”的能力。以上言論純屬自己理解,不喜勿噴。

  DefaultSingletonBeanRegistry屬性先貼出來

 1 public class DefaultSingletonBeanRegistry extends SimpleAliasRegistry implements SingletonBeanRegistry {
 2     /**
 3      * Cache of singleton objects: bean name --> bean instance
 4      * 緩存 單例對象
 5      */
 6     private final Map<String, Object> singletonObjects = new ConcurrentHashMap<>(256);//好習慣,創建ConcurrentHashMap,指定初始化因子,縱觀Spring源碼,創建HashMap,都有初始化因子。get
 7 
 8     /**
 9      * Cache of singleton factories: bean name --> ObjectFactory
10      * 緩存 單例工廠
11      */
12     private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap<>(16);
13 
14     /**
15      * Cache of early singleton objects: bean name --> bean instance
16      * 緩存 提前暴露的對象
17      */
18     private final Map<String, Object> earlySingletonObjects = new HashMap<>(16);
19 
20     /**
21      * Set of registered singletons, containing the bean names in registration order
22      * 已經註冊的單例對象集合,按照註冊順序排序的,並且是不可重複的。
23      */
24     private final Set<String> registeredSingletons = new LinkedHashSet<>(256);
25 
26     /**
27      * Names of beans that are currently in creation
28      *
29      */
30     private final Set<String> singletonsCurrentlyInCreation =
31             Collections.newSetFromMap(new ConcurrentHashMap<>(16));
32 
33     /**
34      * Names of beans currently excluded from in creation checks
35      */
36     private final Set<String> inCreationCheckExclusions =
37             Collections.newSetFromMap(new ConcurrentHashMap<>(16));
38 
39     /**
40      * List of suppressed Exceptions, available for associating related causes
41      */
42     @Nullable
43     private Set<Exception> suppressedExceptions;
44 
45     /**
46      * Flag that indicates whether we're currently within destroySingletons
47      */
48     private boolean singletonsCurrentlyInDestruction = false;
49 
50     /**
51      * Disposable bean instances: bean name --> disposable instance
52      * spring是作為一個註冊中心的樣子,在容器shutdown的時候,直接從這裏面找到需要執行destory鈎子的Bean
53      */
54     private final Map<String, Object> disposableBeans = new LinkedHashMap<>();
55 
56     /**
57      * Map between containing bean names: bean name --> Set of bean names that the bean contains
58      * 名稱為name的bean,所持有的beans 的映射關係
59      */
60     private final Map<String, Set<String>> containedBeanMap = new ConcurrentHashMap<>(16);
61 
62     /**
63      * Map between dependent bean names: bean name --> Set of dependent bean names
64      * 名稱為name的bean與其所依賴的bean的映射關係
65      */
66     private final Map<String, Set<String>> dependentBeanMap = new ConcurrentHashMap<>(64);
67 
68     /**
69      * Map between depending bean names: bean name --> Set of bean names for the bean's dependencies
70      */
71     private final Map<String, Set<String>> dependenciesForBeanMap = new ConcurrentHashMap<>(64);
72 }

 

  屬性singletonObjects ,沒錯,就是這個東東,最終存儲單例(singleton)Bean的地方,在SpringBoot啟動流程中,會詳細介紹存取的過程。上面說了原型(prototype)Bean是不需要緩存的,不解釋了。到這裏我們初步看到了IoC作為“容器”該有的樣子。

  DefaultSingletonBeanRegistry上面提到的SingletonBeanRegistry接口的相關方法,並且增加了很多對單例的操作的方法。

3.3.4, DefaultListableBeanFactory (重點)

  上面我們從IoC容器的宏觀設計角度闡述了DefaultListableBeanFactory作為IoC容器的具體實現的設計思想。在這裏來分析一下這個類本身的設計。

  首先看看該類的屬性

 1 public class DefaultListableBeanFactory extends AbstractAutowireCapableBeanFactory
 2         implements ConfigurableListableBeanFactory, BeanDefinitionRegistry, Serializable {
 3     /**
 4      * Map from serialized id to factory instance
 5      * 緩存 序列化ID到 DefaultListableBeanFactory 實例的映射
 6      */
 7     private static final Map<String, Reference<DefaultListableBeanFactory>> serializableFactories =
 8             new ConcurrentHashMap<>(8);
 9 
10     /**
11      * Optional id for this factory, for serialization purposes
12      */
13     @Nullable
14     private String serializationId;
15 
16     /**
17      * Whether to allow re-registration of a different definition with the same name
18      */
19     private boolean allowBeanDefinitionOverriding = true;
20 
21     /**
22      * Whether to allow eager class loading even for lazy-init beans
23      */
24     private boolean allowEagerClassLoading = true;
25 
26     /**
27      * Optional OrderComparator for dependency Lists and arrays
28      */
29     @Nullable
30     private Comparator<Object> dependencyComparator;
31 
32     /**
33      * Resolver to use for checking if a bean definition is an autowire candidate
34      * 被用來解決去校驗一個BeanDefinition是不是自動裝載的候選人
35      */
36     private AutowireCandidateResolver autowireCandidateResolver = new SimpleAutowireCandidateResolver();
37 
38     /**
39      * Map from dependency type to corresponding autowired value
40      * 緩存 類型對應的自動裝載的Bean
41      */
42     private final Map<Class<?>, Object> resolvableDependencies = new ConcurrentHashMap<>(16);
43 
44     /**
45      * Map of bean definition objects, keyed by bean name
46      * 緩存 beanName到BeanDefinition的映射關係
47      */
48     private final Map<String, BeanDefinition> beanDefinitionMap = new ConcurrentHashMap<>(256);
49 
50     /**
51      * Map of singleton and non-singleton bean names, keyed by dependency type
52      * 緩存 類型 和 beanName的映射關係
53      */
54     private final Map<Class<?>, String[]> allBeanNamesByType = new ConcurrentHashMap<>(64);
55 
56     /**
57      * Map of singleton-only bean names, keyed by dependency type
58      * 緩存 類型 和 單例Bean names的映射
59      */
60     private final Map<Class<?>, String[]> singletonBeanNamesByType = new ConcurrentHashMap<>(64);
61 
62     /**
63      * List of bean definition names, in registration order
64      * 緩存 beanDefinition name的list
65      */
66     private volatile List<String> beanDefinitionNames = new ArrayList<>(256);
67 
68     /**
69      * List of names of manually registered singletons, in registration order
70      */
71     private volatile Set<String> manualSingletonNames = new LinkedHashSet<>(16);
72 
73     /**
74      * Cached array of bean definition names in case of frozen configuration
75      */
76     @Nullable
77     private volatile String[] frozenBeanDefinitionNames;
78 
79     /**
80      * Whether bean definition metadata may be cached for all beans
81      */
82     private volatile boolean configurationFrozen = false;
83 }

  在Spring中,實際上是把DefaultListableBeanFactory作為一個默認的功能完整的IoC容器來使用。 DefaultListableBeanFactory作為一個功能完整的容器具備了除以上父類所具有功能外,還加入了對BeanDefinition的管理和維護。從上面的代碼可以看到一個重要的屬性:beanDefinitionMap。beanDefinitionMap緩存了Bean name到 BeanDefinition的映射。到這裡是不是發現了IoC容器另外一個作為“容器”的能力。在我的理解範圍內,IoC容器作為“容器”真正裝的兩個最總要的能力算是總結完了,一個是裝單例(Singleton)Bean,一個是裝BeanDefinition。

 

3.3.5, BeanDefinition 

  Spring通過定義BeanDefinition來管理基於Spring的應用中的各種對象以及他們之間的相互依賴關係。BeanDefinition抽象了我們對Bean的定義,是讓容器起作用的主要數據類型。我么都知道在計算機世界里,所有的功能都是建立在通過數據對現實進行抽象的基礎上的。IoC容器是用來管理對象依賴關係的,對IoC容器來說,BeanDefinition就是對依賴反轉模式中管理的對象依賴關係的數據抽象,也是容器實現依賴反轉功能的核心數據結構,依賴反轉功能都是圍繞對這個BeanDefinition的處理來完成的。這些BeanDefinition就像是容器里裝的水,有了這些基本數據,容器才能發揮作用。簡單一句話來說,BeanDefinition就是Bean的元數據,BeanDefinition存放了對Bean的基本描述,包括Bean擁有什麼屬性,方法,Bean的位置等等Bean的各種信息。IoC容器可以通過BeanDefinition生成Bean。

  BeanDefinition究竟長什麼樣呢?

  在同第三章debug的地方一樣,點開beanFactory,然後查看beanDefinitionMap屬性。

  OK,BeanDefinition就是長這樣了。具體怎麼通過它生成Bean,在SpringBoot啟動流程中會詳細介紹。

 

四、SpringBoot web工程中的上下文 AnnotationConfigServletWebServerApplicationContext

  在SpringBoot工程中,應用類型分為三種,如下代碼所示。

 1 public enum WebApplicationType {
 2     /**
 3      * 應用程序不是web應用,也不應該用web服務器去啟動
 4      */
 5     NONE,
 6     /**
 7      * 應用程序應作為基於servlet的web應用程序運行,並應啟動嵌入式servlet web(tomcat)服務器。
 8      */
 9     SERVLET,
10     /**
11      * 應用程序應作為 reactive web應用程序運行,並應啟動嵌入式 reactive web服務器。
12      */
13     REACTIVE
14 }

  對應三種應用類型,SpringBoot項目有三種對應的應用上下文,我們以web工程為例,即其上下文為AnnotationConfigServletWebServerApplicationContext

 1 public static final String DEFAULT_WEB_CONTEXT_CLASS = "org.springframework.boot."
 2         + "web.servlet.context.AnnotationConfigServletWebServerApplicationContext";
 3 public static final String DEFAULT_REACTIVE_WEB_CONTEXT_CLASS = "org.springframework."
 4         + "boot.web.reactive.context.AnnotationConfigReactiveWebServerApplicationContext";
 5 public static final String DEFAULT_CONTEXT_CLASS = "org.springframework.context."
 6         + "annotation.AnnotationConfigApplicationContext";
 7         
 8 protected ConfigurableApplicationContext createApplicationContext() {
 9     Class<?> contextClass = this.applicationContextClass;
10     if (contextClass == null) {
11         try {
12             switch (this.webApplicationType) {
13                 case SERVLET:
14                     contextClass = Class.forName(DEFAULT_WEB_CONTEXT_CLASS);
15                     break;
16                 case REACTIVE:
17                     contextClass = Class.forName(DEFAULT_REACTIVE_WEB_CONTEXT_CLASS);
18                     break;
19                 default:
20                     contextClass = Class.forName(DEFAULT_CONTEXT_CLASS);
21             }
22         } catch (ClassNotFoundException ex) {
23             throw new IllegalStateException(
24                     "Unable create a default ApplicationContext, "
25                             + "please specify an ApplicationContextClass",
26                     ex);
27         }
28     }
29     return (ConfigurableApplicationContext) BeanUtils.instantiateClass(contextClass);
30 }

  我們先看一下AnnotationConfigServletWebServerApplicationContext的設計。 

   在2.2中已經介紹了ApplicationContext的設計

  關於AnnotationConfigServletWebServerApplicationContext詳細的設計路線在這裏就不像DefaultListableBeanFactory容器那麼詳細的去講解了。在第二章說過,應用上下文可以理解成IoC容器的高級表現形式,拿上圖和DefaultListableBeanFactory的繼承關係圖,不難發現,應用上下文確實是在IoC容器的基礎上豐富了一些高級功能。在第二章中,我們還說過應用上下文對IoC容器是持有的關係。繼續看第二章debug的截圖,context就是AnnotationConfigServletWebServerApplicationContext的神秘面孔,他的一個屬性beanFactory就是IoC容器(DefaultListableBeanFactory)。所以他們之間是持有,和擴展的關係。

  接下來看GenericApplicationContext類

1 public class GenericApplicationContext extends AbstractApplicationContext implements BeanDefinitionRegistry {
2     private final DefaultListableBeanFactory beanFactory;
3     ...
4 }

   第一行赫然定義了beanFactory屬性,正是DefaultListableBeanFactory對象。

  關於上下文還有另外一個最重要的方法refresh,上文中說道該方法是在ConfigurableApplicationContext接口中定義的,那麼在哪實現的該方法呢?

  看AbstractApplicationContext類。

 1 @Override
 2 public void refresh() throws BeansException, IllegalStateException {
 3     synchronized (this.startupShutdownMonitor) {
 4         // Prepare this context for refreshing.
 5         //刷新上下文環境
 6         prepareRefresh();
 7 
 8         // Tell the subclass to refresh the internal bean factory.
 9         //這裡是在子類中啟動 refreshBeanFactory() 的地方
10         ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
11 
12         // Prepare the bean factory for use in this context.
13         //準備bean工廠,以便在此上下文中使用
14         prepareBeanFactory(beanFactory);
15 
16         try {
17             // Allows post-processing of the bean factory in context subclasses.
18             //設置 beanFactory 的後置處理
19             postProcessBeanFactory(beanFactory);
20 
21             // Invoke factory processors registered as beans in the context.
22             //調用 BeanFactory 的后處理器,這些處理器是在Bean 定義中向容器註冊的
23             invokeBeanFactoryPostProcessors(beanFactory);
24 
25             // Register bean processors that intercept bean creation.
26             //註冊Bean的后處理器,在Bean創建過程中調用
27             registerBeanPostProcessors(beanFactory);
28 
29             // Initialize message source for this context.
30             //對上下文中的消息源進行初始化
31             initMessageSource();
32 
33             // Initialize event multicaster for this context.
34             //初始化上下文中的事件機制
35             initApplicationEventMulticaster();
36 
37             // Initialize other special beans in specific context subclasses.
38             //初始化其他特殊的Bean
39             onRefresh();
40 
41             // Check for listener beans and register them.
42             //檢查監聽Bean並且將這些監聽Bean向容器註冊
43             registerListeners();
44 
45             // Instantiate all remaining (non-lazy-init) singletons.
46             //實例化所有的(non-lazy-init)單件
47             finishBeanFactoryInitialization(beanFactory);
48 
49             // Last step: publish corresponding event.
50             //發布容器事件,結束Refresh過程
51             finishRefresh();
52         } catch (BeansException ex) {
53             if (logger.isWarnEnabled()) {
54                 logger.warn("Exception encountered during context initialization - " +
55                         "cancelling refresh attempt: " + ex);
56             }
57 
58             // Destroy already created singletons to avoid dangling resources.
59             destroyBeans();
60 
61             // Reset 'active' flag.
62             cancelRefresh(ex);
63 
64             // Propagate exception to caller.
65             throw ex;
66         } finally {
67             // Reset common introspection caches in Spring's core, since we
68             // might not ever need metadata for singleton beans anymore...
69             resetCommonCaches();
70         }
71     }
72 }

   OK,應用上下文就介紹到這裏。

 

 五、IoC容器的初始化過程

   在這裏我們先口述一下IoC容器的初始化過程吧,源碼分析,請移步SpringBoot啟動流程分析。

  簡單來說IoC容器的初始化過程是由前面介紹的refresh()方法啟動的,這個方法標志著IoC容器的正式啟動。具體來說,這個啟動包括三個過程

1 BeanDefinition的Resource定位
2 BeanDefinition的載入
3 向IoC容器註冊BeanDefinition

 

   1、第一個過程:Resource定位

  這個定位指的是BeanDefinition的資源定位,它由ResourceLoader通過統一的Resource接口完成,這個Resource對各種形式的BeanDefinition的使用都提供了統一接口。對於這些BeanDefinition的存在形式,可以是通過像SpringMVC中的xml定義的Bean,也可以是像在類路徑中的Bean定義信息,比如使用@Component等註解定義的。這個過程類似於容器尋找數據的過程,就像用水桶裝水先要把水找到一樣。

  結合SpringBoot說一下這個過程,對於SpringBoot,我們都知道他的包掃描是從主類所在的包開始掃描的,那這個定位的過程在SpringBoot中具體是這樣的,在refresh容器之前(prepareContext()方法中),會先將主類解析成BeanDefinition,然後在refresh方法中並且是掃描Bean之前,解析主類的BeanDefinition獲取basePackage的路徑。這樣就完成了定位的過程。(先不討論SpringBoot中指定掃描包路徑和自動裝配)

  2、第二個過程:BeanDefinition的載入

  這個載入過程是把用戶定義好的Bean表示成IoC容器內部的數據結構,而這個容器內部的數據結構就是BeanDefinition。

  在SpringBoot中,上面我們說到通過主類找到了basePackage,SpringBoot會將該路徑拼接成:classpath*:org/springframework/boot/demo/**/*.class這樣的形式,然後一個叫做PathMatchingResourcePatternResolver的類會將該路徑下所有的.class文件都加載進來,然後遍歷判斷是不是有@Component註解,如果有的話,就是我們要裝載的BeanDefinition。大致過程就是這樣的了。

  注意:@Configuration,@Controller,@Service等註解底層都是@Component註解,只不過包裝了一層罷了。

   3、第三個過程:註冊BeanDefinition

   這個過程通過調用上文提到的BeanDefinitionRegister接口的實現來完成。這個註冊過程把載入過程中解析得到的BeanDefinition向IoC容器進行註冊。通過上文的分析,我們可以看到,在IoC容器中將BeanDefinition注入到一個ConcurrentHashMap中,IoC容器就是通過這個HashMap來持有這些BeanDefinition數據的。比如DefaultListableBeanFactory 中的beanDefinitionMap屬性。

六、IoC容器的依賴注入

  上面對IoC容器的初始化過程進行了詳細的介紹,這個過程完成的主要的工作是在IoC容器中建立BeanDefinition數據映射。在此過程中並沒有看到IoC容器對Bean的依賴關係進行注入。依賴注入是Spring實現“控制反轉”的重要一環。Spring將依賴關係交給IoC容器來完成。

  依賴控制反轉的實現有很多種方式。在Spring中,IoC容器是實現這個模式的載體,它可以在對象生成或者初始化時直接將數據注入到對象中,也可以通過將對象注入到對象數據域中的方式來注入對方法調用的依賴。這種依賴注入是可以遞歸的,對象被逐層注入。

 

 

  原創不易,轉載請註明出處。

  如有錯誤的地方還請留言指正。

 

參考文獻:

  《Spring技術內幕–深入解析Spring框架與設計原理(第二版)》

 

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

Asp.NETCore讓FromServices回來

起因

這两天,我忽然有點懷念 Asp.NET MVC 5 之前的時代,原因是我看到項目裏面有這麼一段代碼(其實不止一段,幾乎每個 Controller 都是)

    [Route("home")]
    [ApiController]
    public class HomeController : ControllerBase
    {
        private readonly IConfiguration configuration;
        private readonly IHostingEnvironment environment;
        private readonly CarService carService;
        private readonly PostServices postServices;
        private readonly TokenService tokenService;
        private readonly TopicService topicService;
        private readonly UserService userService;

        public HomeController(IConfiguration configuration,
                              IHostingEnvironment environment,
                              CarService carService,
                              PostServices postServices,
                              TokenService tokenService,
                              TopicService topicService,
                              UserService userService)
        {
            this.configuration = configuration;
            this.environment = environment;
            this.carService = carService;
            this.postServices = postServices;
            this.tokenService = tokenService;
            this.topicService = topicService;
            this.userService = userService;
        }

        [HttpGet("index")]
        public ActionResult<string> Index()
        {
            return "Hello world!";
        }
    }

在構造函數裏面聲明了一堆依賴注入的實例,外面還得聲明相應的接收字段,使用代碼克隆掃描,零零散散的充斥在各個 Controller 的構造函數中。在 Asp.NET MVC 5 之前,我們可以把上面的代碼簡化為下面的形式:

    [Route("home")]
    [ApiController]
    public class HomeController : ControllerBase
    {
        [FromServices] public IConfiguration Configuration { get; set; }
        [FromServices] public IHostingEnvironment Environment { get; set; }
        [FromServices] public CarService CarService { get; set; }
        [FromServices] public PostServices PostServices { get; set; }
        [FromServices] public TokenService TokenService { get; set; }
        [FromServices] public TopicService TopicService { get; set; }
        [FromServices] public UserService UserService { get; set; }

        public HomeController()
        {
        }

        [HttpGet("index")]
        public ActionResult<string> Index()
        {
            return "Hello world!";
        }
    }

但是,在 .NETCore 中,上面的這斷代碼是會報錯的,原因就是特性:FromServicesAttribute 只能應用於 AttributeTargets.Parameter,導航到 FromServicesAttribute 查看源碼

namespace Microsoft.AspNetCore.Mvc
{
    /// <summary>
    /// Specifies that an action parameter should be bound using the request services.
    /// </summary>
    /// <example>
    /// In this example an implementation of IProductModelRequestService is registered as a service.
    /// Then in the GetProduct action, the parameter is bound to an instance of IProductModelRequestService
    /// which is resolved from the request services.
    ///
    /// <code>
    /// [HttpGet]
    /// public ProductModel GetProduct([FromServices] IProductModelRequestService productModelRequest)
    /// {
    ///     return productModelRequest.Value;
    /// }
    /// </code>
    /// </example>
    [AttributeUsage(AttributeTargets.Parameter, AllowMultiple = false, Inherited = true)]
    public class FromServicesAttribute : Attribute, IBindingSourceMetadata
    {
        /// <inheritdoc />
        public BindingSource BindingSource => BindingSource.Services;
    }
}

那麼問題來了,AttributeUsage 是什麼時候移除了 AttributeTargets.Property 呢?答案是:2015年11月17日,是一個叫做 Pranav K 的哥們革了 FromServiceAttribute 的命,下面是他的代碼提交記錄

Limit [FromServices] to apply only to parameters
https://github.com/aspnet/Mvc/commit/2a89caed05a1bc9f06d32e15d984cd21598ab6fb

這哥們的 Commit Message 很簡潔:限制 FromServices 僅作用於 parameters 。高手過招,人狠話不多,刀刀致命!從此,廣大 .NETCore 開發者告別了屬性注入。經過我不懈努力的搜索后,發現其實在 Pranav K 提交代碼兩天後,他居然自己開了一個 Issue,你說氣人不?

關於廢除 FromServices 的討論
https://github.com/aspnet/Mvc/issues/3578

在這個貼子裏面,許多開發者表達了自己的不滿,我還看到了有人像我一樣,表達了自己想要一個簡潔的構造函數的這樣樸素的請求;但是,對於屬性注入可能導致濫用的問題也產生了激烈的討論,還有屬性注入要求成員必須標記為 public 這些硬性要求,不得不說,這個帖子成功的引起了人們的注意,但是很明顯,作者不打算修改 FromServices 支持屬性注入。

自己動手,豐衣足食

沒關係,官方沒有自帶的話,我們自己動手做一個也是一樣的效果,在此之前,我們還應該關注另外一種從 service 中獲取實例的方式,就是常見的通過 HttpContext 請求上下文獲取服務實例的方式:

 var obj = HttpContext.RequestServices.GetService(typeof(Type));

上面的這種方式,其實是反模式的,官方也建議盡量避免使用,說完了廢話,就自動動手擼一個屬性注入特性類:PropertyFromServiceAttribute

[AttributeUsage(AttributeTargets.Property, AllowMultiple = false, Inherited = true)]
public class PropertyFromServiceAttribute : Attribute, IBindingSourceMetadata
{
    public BindingSource BindingSource => BindingSource.Services;
}

沒有多餘的代碼,就是標記為 AttributeTargets.Property 即可

應用到類成員
    [Route("home")]
    [ApiController]
    public class HomeController : ControllerBase
    {
        [PropertyFromService] public IConfiguration Configuration { get; set; }
        [PropertyFromService] public IHostingEnvironment Environment { get; set; }
        [PropertyFromService] public CarService CarService { get; set; }
        [PropertyFromService] public PostServices PostServices { get; set; }
        [PropertyFromService] public TokenService TokenService { get; set; }
        [PropertyFromService] public TopicService TopicService { get; set; }
        [PropertyFromService] public UserService UserService { get; set; }

        public HomeController()
        {

        }

        [HttpGet("index")]
        public ActionResult<string> Index()
        {
            return "Hello world!";
        }
    }

請大聲的回答,上面的代碼是不是非常的乾淨整潔!但是,像上面這樣使用屬性注入有一個小問題,在對象未初始化之前,該屬性為 null,意味着在類的構造函數中,該成員變量不可用,不過不要緊,這點小問題完全可用通過在構造函數中注入解決;更重要的是,並非每個實例都需要在構造函數中使用,是吧。

示例代碼

託管在 Github 上了 https://github.com/lianggx/Examples/tree/master/Ron.DI

** 如果你喜歡這篇文章,請給我點贊,讓更多同學可以看到,筆芯~

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

JS數據結構第二篇—鏈表

一、什麼是鏈表 

鏈表是一種鏈式存儲的線性表,是由一組節點組成的集合,每一個節點都存儲了下一個節點的地址;指向另一個節點的引用叫鏈;和數組中的元素內存地址是連續的相比,鏈表中的所有元素的內存地址不一定是連續的。結構模擬如圖:

一般來說,說到鏈表,就要提下數組,一般鏈表都是和數組進行對比。

在很多編程語言中,數組的長度時固定的,所以數組中的增加和刪除比較麻煩,需要頻繁的移動數組中的其他元素。

然而,JavaScript中的數組並不存在上述問題,JS中的數組相對其他語言使用上更方便,因為JS中的數組本質是一個類似數組的對象,這就使得JS的數組雖然使用更方便,但比其他語言(C++、Java、C#)的數組效率要低。

所以,在實際應用中如果發現數組很慢,就可以考慮使用鏈表來替代它。除了對數據的隨機訪問,鏈表幾乎可以用在任何可以使用一維數組的情況中。如果需要隨機訪問,數組仍然是更好的選擇。

 

二、鏈表的設計

為了對鏈表更好的使用,我們設計了類LinkedList, 對鏈表中節點的增刪改查方法進行了封裝。結構如圖:

其中size和head為LinkedList構造函數私有屬性,size記錄鏈表中有多少個節點,head指向鏈表的頭結點。

根據需要對外暴露了以下方法(可以根據需要自定義其他方法):

 單向LinkedList完整設計代碼:

/**
 * 自定義鏈表:對外公開的方法有
 * append(element) 在鏈表最後追加節點
 * insert(index, element) 根據索引index, 在索引位置插入節點
 * remove(element)  刪除節點
 * removeAt(index)  刪除指定索引節點
 * removeAll(element) 刪除所有匹配的節點
 * set(index, element) 根據索引,修改對應索引的節點值
 * get(index)  根據索引獲取節點信息
 * indexOf(element) 獲取某個節點的索引位置
 * clear()  清空所有節點
 * length()   返回節點長度
 * print() 打印所有節點信息
 * toString() 打印所有節點信息,同print
 * */
const LinkedList = function(){
    let head = null;
    let size = 0;   //記錄鏈表元素個數

    //Node模型
    function LinkNode(element, next){
        this.element = element;
        this.next = next;
    }

    //元素越界檢查, 越界拋出異常
    function outOfBounds(index){
        if (index < 0 || index >= size){
            throw("抱歉,目標位置不存在!");
        }
    }

    //根據索引,獲取目標對象
    function node(index){
        outOfBounds(index);

        let obj = head;
        for (let i = 0; i < index; i++){
            obj = obj.next;
        }

        return obj;
    }

    //新增一個元素
     function append(element){
        if (size == 0){
            head = new LinkNode(element, null);
        }
        else{
            let obj = node(size-1);
            obj.next = new LinkNode(element, null);
        }
         size++;
    }

    //插入一個元素
     function insert(index, element){
        if (index == 0){
            head = new LinkNode(element, head);
        }
        else{
            let obj = node(index-1);
            obj.next = new LinkNode(element, obj.next);
        }
         size++;
    }

    //修改元素
    function set(index, element){
        let obj = node(index);
        obj.element = element;
    }

    //根據值移除節點元素
    function remove(element){
        if (size < 1) return null;

        if (head.element == element){
            head = head.next;
            size--;
            return element;
        }
        else{
            let temp = head;
            while(temp.next){
                if (temp.next.element == element){
                    temp.next = temp.next.next;
                    size--;
                    return element;
                }
                else{
                    temp = temp.next;
                }
            }
        }
        return null;
    }

    //根據索引移除節點
     function removeAt(index){
         outOfBounds(index);
         let element = null;

         if (index == 0){
             element = head.element;
             head = head.next;
         }
         else{
             let prev = node(index-1);
             element = prev.next.element;
             prev.next = prev.next.next;
         }
         size--;
        return element;
    }

    //移除鏈表裡面的所有匹配值element的元素
     function removeAll(element){

        let virHead = new LinkNode(null, head); //創建一個虛擬頭結點,head為次節點
         let tempNode = virHead, ele = null;

         while(tempNode.next){
             if (tempNode.next.element == element){
                 tempNode.next = tempNode.next.next;
                 size--;
                 ele = element;
             }
             else{
                tempNode = tempNode.next;
             }
         }

         //重新賦值
         head = virHead.next;

        return ele;
    }

    //獲取某個元素
    function get(index){
        return node(index).element;
    }

    //獲取元素索引
    function indexOf(element){
        let obj = head, index = -1;

        for (let i = 0; i < size; i++){
            if (obj.element == element){
                index = i;
                break;
            }
            obj = obj.next;
        }
        return index;
    }

    //清除所有元素
    function clear(){
        head = null;
        size = 0;
    }

    //屬性轉字符串
    function getObjString(obj){

        let str = "";

        if (obj instanceof Array){
            str += "[";
            for (let i = 0; i < obj.length; i++){
                str += getObjString(obj[i]);
            }
            str = str.substring(0, str.length - 2);
            str += "], "
        }
        else if (obj instanceof Object){
            str += "{";
            for (var key in obj){
                let item = obj[key];
                str += "\"" + key + "\": " + getObjString(item);
            }
            str = str.substring(0, str.length-2);
            str += "}, "
        }
        else if (typeof obj == "string"){
            str += "\"" + obj + "\"" + ", ";
        }
        else{
            str += obj + ", ";
        }

        return str;
    }
    function toString(){
        let str = "", obj = head;
        for (let i = 0; i < size; i++){
            str += getObjString(obj.element);
            obj = obj.next;
        }
        if (str.length > 0) str = str.substring(0, str.length -2);
        return str;
    }
    //打印所有元素
    function print(){
        console.log(this.toString())
    }

    //對外公開方法
    this.append = append;
    this.insert = insert;
    this.remove = remove;
    this.removeAt = removeAt;
    this.removeAll = removeAll;
    this.set = set;
    this.get = get;
    this.indexOf = indexOf;
    this.length = function(){
        return size;
    }
    this.clear = clear;
    this.print = print;
    this.toString = toString;
}


////測試
// let obj = new LinkedList();
// let obj1 = { title: "全明星比賽", stores: [{name: "張飛vs岳飛", store: "2:3"}, { name: "關羽vs秦瓊", store: "5:5"}]};
//
// obj.append(99);
// obj.append("hello")
// obj.append(true)
// obj.insert(3, obj1);
// obj.insert(0, [12, false, "Good", 81]);
// obj.print();
// console.log("obj1.index: ", obj.indexOf(obj1));
// obj.remove(0);
// obj.removeAll(obj1);
// obj.print();

////測試2
console.log("\n\n......test2.....")
var obj2 = new LinkedList();
obj2.append(8); obj2.insert(1,99); obj2.append('abc'); obj2.append(8); obj2.append(false);
obj2.append(12); obj2.append(8); obj2.append('123'); obj2.append(8);
obj2.print();
obj2.removeAll(8); //刪除所有8
obj2.print();

View Code

 

另外,可以在LinkedList中增加一個虛擬節點,即在頭結點之前增加一個節點,一直保留,結構如圖:

這裏代碼就不提供了,在上一份鏈表代碼中的removeAll(刪除鏈表中指定值的所有節點)方法中有用到虛擬頭結點, 下面的練習題中也有應用到虛擬頭結點,應用場景還是蠻多的。

 

三、鏈表練習題

推薦一個神奇的網站,可以以動畫的方式演示各種數據結構增刪改查變化,先來張展示鏈表的增刪效果圖看看:

網址:https://visualgo.net/zh

 

接下來做幾個鏈表的練習題,題目來自力扣,可以先自己先做一下,看看自己得分,再對比下官方提供的代碼demo

3.1 刪除排序鏈表中的重複元素_第83題

參考demo:

/**
 * 給定一個排序鏈表,刪除所有重複的元素,使得每個元素只出現一次。
 示例 1:
 輸入: 1->1->2
 輸出: 1->2

 示例 2:
 輸入: 1->1->2->3->3
 輸出: 1->2->3

 力扣得分:
 執行用時 :108 ms, 在所有 JavaScript 提交中擊敗77.12%的用戶
 內存消耗 :37.4 MB, 在所有 JavaScript 提交中擊敗了5.03%的用戶
 */
/**
 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */

function ListNode(val){
    this.val = val;
    this.next = null;
}

/**
 * @param {ListNode} head
 * @return {ListNode}
 */
var deleteDuplicates = function(head) {

    let virHead = new ListNode(0); //增加一個虛擬節點
    virHead.next = head;
    let temp = virHead, obj = {};

    while(temp.next){
        if (obj[temp.next.val]){ //表示為重複節點,刪除這個節點
            temp.next = temp.next.next;
        }
        else{ //
            obj[temp.next.val] = 1;
            temp = temp.next;
        }
    }
    return virHead.next;
}

//測試
var obj = new ListNode(1);
obj.next = new ListNode(2);
obj.next.next = new ListNode(1);
obj.next.next.next = new ListNode(3);
obj.next.next.next.next = new ListNode(1);
obj.next.next.next.next.next = new ListNode(2);
obj.next.next.next.next.next.next = new ListNode(3);
console.log(obj);
console.log(".>>>>>>刪除重複節點:")
console.log(deleteDuplicates(obj));

View Code

 

3.2 判斷是否環形鏈表_第141題

參考demo:

/**
 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */

/**
 * @param {ListNode} head
 * @return {boolean}
 */
var hasCycle = function(head) {
    //快慢指針,快指針每次走兩步,慢指針每次走一步
    let obj1 = head, obj2 = head; //obj1快指針,obj2為慢指針

    while(obj2){
      obj2 = obj2.next;

      if (obj1){
          obj1 = obj1.next;
      }

      if (obj1){
          obj1 = obj1.next;
      }

      if (obj2 == obj1 && obj1) return true;
    }
    return false;
};

function ListNode(val){
    this.val = val;
    this.next = null;
}

//測試
console.log(">>>>>>環形鏈表》》測試》》")
let node1 = new ListNode(1);
let node2 = new ListNode(2);
let node3 = new ListNode(3);
let node4 = new ListNode(4);

node1.next = node2;
node2.next = node3;
node3.next = node4;
node4.next = node2;

let res = hasCycle(node1);
console.log("res: ", res);

View Code

 

3.3 移除鏈表中給定值的所有元素_第203題

 

參考demo1:

/**
 刪除鏈表中等於給定值 val 的所有節點。
 示例:
 輸入: 1->2->6->3->4->5->6, val = 6
 輸出: 1->2->3->4->5

 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */
/**
 * 在力扣中得分:耗時160ms, 打敗Javascript中17.87%; 內存消耗37.5M, 打敗JavaScript中24.79% , 更優化的寫法是?
 * @param {ListNode} head
 * @param {number} val
 * @return {ListNode}
 */
var removeElements = function(head, val) {
    let newHead = null, curNode = null;
    while(head){
        if (head.val != val){
            if (curNode){
                curNode.next = new ListNode(head.val);
                curNode = curNode.next;
            }
            else{
                curNode = new ListNode(head.val);
                newHead = curNode;
            }
        }
        head = head.next;
    }
    return newHead;
}

function ListNode(val){
    this.val = val;
    this.next = null;
}


//測試
console.log(">>>>移除鏈表元素測試》》》")
var node = new ListNode(1);
node.next = new ListNode(2);
// node.next.next = new ListNode(5);
// node.next.next.next = new ListNode(4);
// node.next.next.next.next = new ListNode(6);
// node.next.next.next.next.next = new ListNode(8);
// node.next.next.next.next.next.next = new ListNode(4);

// var newNode = removeElements(node, 6);
// console.log(newNode);

var newNode = removeElements(node, 2);
console.log(newNode);

View Code

參考demo2:

/**
 刪除鏈表中等於給定值 val 的所有節點。
 示例:
 輸入: 1->2->6->3->4->5->6, val = 6
 輸出: 1->2->3->4->5

 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */
/** 第二種寫法
 * 在力扣中得分:耗時112ms, 打敗Javascript中90.28%; 內存消耗37.5M, 打敗JavaScript中24.79%
 * @param {ListNode} head
 * @param {number} val
 * @return {ListNode}
 */
var removeElements = function(head, val) {
    if (!head) return head;

    let newHead = new ListNode(-1);
    newHead.next = head; //把head作為newHead的下一個
    let tmpNode = newHead;

    while(tmpNode.next){
        if (tmpNode.next.val == val){
            tmpNode.next = tmpNode.next.next;
        }
        else{
            tmpNode = tmpNode.next;
        }
    }
    return newHead.next; //返回newHead的下一個,就是我們想要的結果
}

function ListNode(val){
    this.val = val;
    this.next = null;
}


//測試
console.log(">>>>移除鏈表元素測試》》》")
var node = new ListNode(1);
node.next = new ListNode(2);
// node.next.next = new ListNode(5);
// node.next.next.next = new ListNode(4);
// node.next.next.next.next = new ListNode(6);
// node.next.next.next.next.next = new ListNode(8);
// node.next.next.next.next.next.next = new ListNode(4);

// var newNode = removeElements(node, 6);
// console.log(newNode);

var newNode = removeElements(node, 2);
console.log(newNode);

View Code

 

3.4 反轉鏈表_第206題

 

參考demo1_迭代方式:

/*
 反轉一個單鏈表。使用迭代方式實現
 示例:
 輸入: 1->2->3->4->5->NULL
 輸出: 5->4->3->2->1->NULL

 力扣中測試執行用時 : 76 ms, 在所有 JavaScript 提交中擊敗了97.74%的用戶
 內存消耗 :36 MB, 在所有 JavaScript 提交中擊敗了6.92%的用戶
 * */

function ListNode(val){
    this.val = val;
    this.next = null;
}
/**
 * @param {ListNode} head
 * @return {ListNode}
 */
var reverseList = function(head) {
    let newHead = null;
    while(head){
        let tmpNode= newHead;
        newHead = new ListNode(head.val);
        newHead.next = tmpNode;
        head = head.next;
    }
    return newHead;
}


////測試
var node = new ListNode(9);
node.next = new ListNode(99);
node.next.next = new ListNode(999);
node.next.next.next = new ListNode(33);

console.log("原鏈表:", node);
console.log(".....反轉....")
console.log(reverseList(node))

View Code

參考demo2_遞歸方式:

/*
 反轉一個單鏈表。 使用遞歸方式實現
 示例:
 輸入: 1->2->3->4->5->NULL
 輸出: 5->4->3->2->1->NULL

 力扣測試得分:
 執行用時 :80 ms, 在所有 JavaScript 提交中擊敗了95.56%的用戶
 內存消耗 :36.3 MB, 在所有 JavaScript 提交中擊敗了5.03%的用戶
* */

function ListNode(val){
    this.val = val;
    this.next = null;
}
/**
 * @param {ListNode} head
 * @return {ListNode}
 */
var reverseList = function(head) {
    return getNewNode(head).first;
}

/**
 * 遞歸,好繞啊:
 * 推演:加入2->3->4->5 遞歸:
 * @param node
 */
function getNewNode(node){

    if (!node) return {first: null, cur: null };

    var cur = new ListNode(node.val);

    ////一直遞歸遞歸,拿到原鏈表最後一個元素開始返回
    var res = getNewNode(node.next);

    if (res.first) {
        res.cur.next = cur; //設置

        return {
            first: res.first, //反轉鏈表的第一個元素
            cur: cur
        }
    }

    console.log("666_node.val: ", node.val);
    /**
     * 原鏈表最後一個元素會執行到這裏,最後一個元素作為反轉鏈表的第一個元素返回
     */

    return {
        first: cur, //反轉鏈表的第一個元素
        cur: cur    //每次遞歸返回的一個元素
    };
}

//測試
var node = new ListNode(2);
node.next = new ListNode(3);
node.next.next = new ListNode(4);
node.next.next.next = new ListNode(5);
console.log("\n\n*****原鏈表****")
console.log(node);
console.log("......反轉.....")
console.log(reverseList(node));

View Code

 

3.5 查找鏈表的中間結點_第876題

參考代碼demo1_迭代方式:

/**
 * 給定一個帶有頭結點 head 的非空單鏈表,返回鏈表的中間結點。
 如果有兩个中間結點,則返回第二个中間結點。

 示例 1:
 輸入:[1,2,3,4,5]
 輸出:此列表中的結點 3 (序列化形式:[3,4,5])
 返回的結點值為 3 。 (測評系統對該結點序列化表述是 [3,4,5])。
 注意,我們返回了一個 ListNode 類型的對象 ans,這樣:
 ans.val = 3, ans.next.val = 4, ans.next.next.val = 5, 以及 ans.next.next.next = NULL.

 示例 2:
 輸入:[1,2,3,4,5,6]
 輸出:此列表中的結點 4 (序列化形式:[4,5,6])
 由於該列表有兩个中間結點,值分別為 3 和 4,我們返回第二個結點。
  

 提示:
 給定鏈表的結點數介於 1 和 100 之間。

 力扣得分:
 執行用時 :108 ms, 在所有 JavaScript 提交中擊敗了19.44%的用戶
 內存消耗 :33.6 MB, 在所有 JavaScript 提交中擊敗了74.60%的用戶
 */
/**
 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */

function ListNode(val){
    this.val = val;
    this.next = null;
}

/**
 * @param {ListNode} head
 * @return {ListNode}
 */
var middleNode = function(head) {

    if (!head) return head;

    let arr = [];
    while(head){
        arr.push(head);
        head = head.next;
    }

    let len = arr.length;
    return len % 2 == 0 ? arr[len/2] : arr[(len-1)/2];
};

//測試
var obj = new ListNode(1), temp = obj;
for (let i = 0; i < 6; i++){
    temp.next = new ListNode(2+i);
    temp = temp.next;
}
console.log(obj);
console.log("獲取中間節點:")
console.log(middleNode(obj));

View Code

參考代碼demo2_快慢指針:

/**
 * 給定一個帶有頭結點 head 的非空單鏈表,返回鏈表的中間結點。
 如果有兩个中間結點,則返回第二个中間結點。

 示例 1:
 輸入:[1,2,3,4,5]
 輸出:此列表中的結點 3 (序列化形式:[3,4,5])
 返回的結點值為 3 。 (測評系統對該結點序列化表述是 [3,4,5])。
 注意,我們返回了一個 ListNode 類型的對象 ans,這樣:
 ans.val = 3, ans.next.val = 4, ans.next.next.val = 5, 以及 ans.next.next.next = NULL.

 示例 2:
 輸入:[1,2,3,4,5,6]
 輸出:此列表中的結點 4 (序列化形式:[4,5,6])
 由於該列表有兩个中間結點,值分別為 3 和 4,我們返回第二個結點。
  

 提示:
 給定鏈表的結點數介於 1 和 100 之間。

 力扣得分:
 執行用時 :120 ms, 在所有 JavaScript 提交中擊敗了12.22%的用戶
 內存消耗 :34.1 MB, 在所有 JavaScript 提交中擊敗了11.11%的用戶

 官方答案,官方這個確實簡潔:
 let slow = fast = head;
 while (fast && fast.next) {
        slow = slow.next;
        fast = fast.next.next;
    }
 return slow;

 官方力扣得分:
 執行用時 :64 ms, 在所有 JavaScript 提交中擊敗了99.44%的用戶
 內存消耗 :34.1 MB, 在所有 JavaScript 提交中擊敗了11.11%的用戶

 */
/**
 * Definition for singly-linked list.
 * function ListNode(val) {
 *     this.val = val;
 *     this.next = null;
 * }
 */

function ListNode(val){
    this.val = val;
    this.next = null;
}

/** 用快慢指針來處理下
 * @param {ListNode} head
 * @return {ListNode}
 */
var middleNode = function(head) {
    // let slow = head, fast = head;
    // while(slow){
    //     if (fast){
    //         fast = fast.next;
    //         if (fast){
    //             fast = fast.next;
    //         }
    //         else{
    //             return slow;
    //         }
    //     }
    //     else{
    //         return slow;
    //     }
    //     slow = slow.next;
    // }
    // return head;

    //官方答案:簡潔明了
    let slow = fast = head;
    while (fast && fast.next) {
        slow = slow.next;
        fast = fast.next.next;
    }
    return slow;
};

//測試
var obj = new ListNode(1), temp = obj;
for (let i = 0; i < 6; i++){
    temp.next = new ListNode(2+i);
    temp = temp.next;
}
console.log(obj);
console.log("獲取中間節點:")
console.log(middleNode(obj));

obj = new ListNode(90), temp = obj;
for (let i = 0; i < 5; i++){
    temp.next = new ListNode(91+i);
    temp = temp.next;
}
console.log(obj);
console.log("獲取中間節點:")
console.log(middleNode(obj));

View Code

 

參考Demo地址:https://github.com/xiaotanit/Tan_DataStruct

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

Spring5源碼深度解析(一)之理解Configuration註解

代碼地址:https://github.com/showkawa/spring-annotation/tree/master/src/main/java/com/brian

1.Spring體繫結構

1.1、Spring Core:主要組件是BeanFactory,創建JavaBean的工廠,使用控制反轉(IOC) 模式  將應用程序的配置和依賴性規範與實際的應用程序代碼分開。

1.2、Spring AOP:集成了面向切面的編程功能(AOP把一個業務流程分成幾部分,例如權限檢查、業務處理、日誌記錄,每個部分單獨處理,然後把它們組裝成完整的業務流程。每個部分被稱為切面),

 可以將聲明性事物管理集成到應用程序中。

1.3、Spring Cntext:一個核心配置文件,為Spring框架提供上下文信息。

1.4、Spring Do:Spring操作數據庫的模塊。

1.5、Spring ORM:Spring集成了各種orm(object relationship mapping 對象關係映射)框架的模塊,集成mybatis

1.6、Spring Web集成各種優秀的web層框架的模塊(Struts、Springmvc)

1.7、Spring web MVC:Spring web層框架

 2.Configuration註解分析內容(@Configuration,@ComponentScan,@Scope,@Lazy)

2.1 @Configuration

 @Configuration用於定義配置類,可替換xml配置文件,被註解的類內部包含有一個或多個被@Bean註解的方法,這些方法將會被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext類進行掃描,並用於構建bean定義,初始化Spring容器。

2.1.1 @Configuration標註在類上,相當於把該類作為spring的xml配置文件中的<beans>,作用為:配置spring容器(應用上下文)

@Configuration
public class MainConfigOfLifeCycle { }

//測試方法

public static void main(String[] args) {
ApplicationContext acac =
new AnnotationConfigApplicationContext(MainConfigOfLifeCycle.class);
System.out.println("ioc容器創建成功");

//關閉ioc容器
((AnnotationConfigApplicationContext) acac).close();
}
 

相當於spring的xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:jdbc="http://www.springframework.org/schema/jdbc"  
    xmlns:jee="http://www.springframework.org/schema/jee" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:util="http://www.springframework.org/schema/util" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-5.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-5.0.xsd
        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-5.0.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-5.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-5.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-5.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-5.0.xsd" default-lazy-init="false">


</beans>

2.2 @ComponentScan用法 

 ComponentScan字面意思就是組件掃描,就是根據定義的掃描路徑,把符合掃描規則的類裝配到spring容器中

  2.2.1 ComponentScan參數說明

/*
* @ComponentScan
* value:只當於掃描的的包
* excludeFilters = 指定掃描的時候按照什麼規則排除哪些組件
* includeFilters = 指定掃描的時候只需要包含哪些組件
* Filter.ANNOTATION:按照註解
* Filter.ASSIGNABLE_TYPE: 按照給定的類型
* */

FilterType 有五種類型

ANNOTATION:註解類型

ASSIGNABLE_TYPE:ANNOTATION:指定的類型

ASPECTJ:按照Aspectj的表達式,基本上不會用到

REGEX:按照正則表達式

CUSTOM:自定義規則

package com.brian.config;

import com.brian.bean.Alan;
import com.brian.bean.Brian;
import com.brian.bean.BrianBeanFactory;
import com.brian.bean.Person;
import com.brian.condition.BrianCondition;
import com.brian.condition.BrianSelector;
import com.brian.service.BookService;
import org.springframework.context.annotation.*;
import org.springframework.stereotype.Controller;

@Configuration //告訴spring這是一個配置類
/*
* @ComponentScan
*   value:只當於掃描的的包
*   excludeFilters = 指定掃描的時候按照什麼規則排除哪些組件
*   includeFilters = 指定掃描的時候只需要包含哪些組件
*   Filter.ANNOTATION:按照註解
*   Filter.ASSIGNABLE_TYPE: 按照給定的類型
* */

@ComponentScans(value = {
        @ComponentScan(value = "com.brian",includeFilters = {
//                @ComponentScan.Filter(type = FilterType.ANNOTATION,classes = {Controller.class}),
//                @ComponentScan.Filter(type=FilterType.ASSIGNABLE_TYPE,classes = {BookService.class}),
                @ComponentScan.Filter(type = FilterType.CUSTOM,classes = {BrianTypeFilter.class})
        },useDefaultFilters = false)
})
@Import({Brian.class,Alan.class,BrianSelector.class})
public class MainConfig {

    @Bean("person") //給容器中註冊一個Bean;類型為返回值的類型;id默認是方法名作為id
    public Person person(){
        return new Person("Alan",18);
    }


    /*
    * @Conditional() 按照條件註冊
    *
    * */
    @Conditional({BrianCondition.class})
    @Bean("person01")
    public Person person01() {
        return new Person("Brian",17);
    }

    @Conditional({BrianCondition.class})
    @Bean("person02")
    public Person person02() {
        return new Person("wenTao",19);
    }

    /*
    *
    *給容器中註冊組件
    * 1,包掃描+ 組件標註註解(@Controller/@Service/@Repository/@Component)[自己寫的方法]
    * 2, @Bean [導入的第三方包裏面的組件]
    * 3,@Import [快速的給容器導入一個組件]
    *       1.@Import(要導入的組件class)
    *       2.ImportSelector:返回需要導入的組件的全類名數組
    *       3.ImportBeanDefinitionRegistrar: 手動註冊bean到容器
    *  4. 使用Spring提供的FactoryBean
    * */
    @Bean
    public BrianBeanFactory brianBeanFactory() {
        return new BrianBeanFactory();
    }

}

2.3 @Scope

默認情況Spring容器是單例的

singleton單例模式:全局有且僅有一個實例。

prototype原型模式:每次獲取Bean的時候都會有一個新的實例。

request

request表示針對每次請求都會產生一個新的Bean對象,並且該Bean對象僅在當前Http請求內有效。

session

session作用域表示煤氣請求都會產生一個新的Bean對象,並且該Bean僅在當前Http session內有效。

測試@Scopeprototype原型模式

Configuration配置類

@Configuration
@ComponentScan("com.brian.bean")
public class MainConfigOfLifeCycle {
    @Scope("prototype")
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public Alan getAlan () {
        return new Alan();
    }
}

測試類

public class MainTest {
    public static void main(String[] args) {
         /*ApplicationContext acac =
                 new AnnotationConfigApplicationContext(MainConfig.class);*/
         ApplicationContext acac =
                 new AnnotationConfigApplicationContext(MainConfigOfLifeCycle.class);
        System.out.println("ioc容器創建成功");
        Alan alan1 =  acac.getBean(Alan.class);
        Alan alan2 =  acac.getBean(Alan.class);
        System.out.println("比較兩個Alan實例: " + (alan1 == alan2));

        //關閉ioc容器
        ((AnnotationConfigApplicationContext) acac).close();
    }
}

2.4 @Lazy

Lazy表示為懶加載,當真正需要引用獲取的時候才會被加載

True 表示為懶加載 false表示為在IOC容器加載的時候被創建。

 

測試@Lazy(false)餓漢模式加載

Configuration配置類

@Configuration
@ComponentScan("com.brian.bean")
public class MainConfigOfLifeCycle {
    //@Scope("prototype")
    @Lazy(false)
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public Alan getAlan () {
        return new Alan();
    }


}

測試類

public class MainTest {
    public static void main(String[] args) {
         /*ApplicationContext acac =
                 new AnnotationConfigApplicationContext(MainConfig.class);*/
         ApplicationContext acac =
                 new AnnotationConfigApplicationContext(MainConfigOfLifeCycle.class);
        System.out.println("ioc容器創建成功");
      //  Alan alan1 =  acac.getBean(Alan.class);
       // Alan alan2 =  acac.getBean(Alan.class);
        //System.out.println("比較兩個Alan實例: " + (alan1 == alan2));

        //關閉ioc容器
        ((AnnotationConfigApplicationContext) acac).close();
    }
}

 看下結果會發現在餓漢模式下,即使沒用使用AnnotationConfigApplicationContext.getBean()獲取對象,對象也被加載進了IOC容器

測試@Lazy默認懶加載

 Configuration配置類

@Configuration
@ComponentScan("com.brian.bean")
public class MainConfigOfLifeCycle {
    //@Scope("prototype")
    @Lazy
    @Bean(initMethod = "init", destroyMethod = "destroy")
    public Alan getAlan () {
        return new Alan();
    }


}

測試類保持不表

測試結果中,沒有輸出Alan這個對象創建和銷毀的打印信息

 

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

大話Spark(8)-源碼之DAGScheduler

DAGScheduler的主要作用有2個:

一、把job劃分成多個Stage(Stage內部并行運行,整個作業按照Stage的順序依次執行)
二、提交任務

以下分別介紹下DAGScheduler是如何做這2件事情的,然後再跟源碼看下DAGScheduler的實現。

一、如何把Job劃分成多個Stage

1) 回顧下寬依賴和窄依賴

窄依賴:父RDD的每個分區只被子RDD的一個分區使用。(map,filter,union操作等)
寬依賴:父RDD的分區可能被多個子RDD的分區使用。(reduceByKey,groupByKey等)

如下圖所示,左側的算子為窄依賴, 右側為寬依賴


窄依賴可以支持在同一個集群Executor上,以管道形式順序執行多條命令,例如在執行了map后,緊接着執行filter。分區內的計算收斂,不需要依賴所有分區的數據,可以并行地在不同節點進行計算。所以它的失敗回復也更有效,因為它只需要重新計算丟失的parent partition即可。最重要的是窄依賴沒有shuffle過程,而寬依賴由於父RDD的分區可能被多個子RDD的分區使用,所以一定伴隨着shuffle操作。

2) DAGScheduler 如何把job劃分成多個Stage

DAGScheduler會把job劃分成多個Stage,如下圖sparkui上的截圖所示,job 0 被劃分成了3個stage

DAGScheduler劃分Stage的過程如下:
DAGScheduler會從觸發action操作的那個RDD開始往前倒推,首先會為最後一個RDD創建一個stage,然後往前倒推的時候,如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD創建一個新的stage,那個RDD就是新的stage的最後一個RDD。然後依次類推,繼續往前倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部遍歷完成為止。

3) wordcount的Stage劃分

在前面大話spark(3)-一圖深入理解WordCount程序在Spark中的執行過程中,我畫過一張wordcount作業的Stage的劃分的圖,如下:

可以看出上圖中,第一個stage的3個task并行執行,遇到reduceByKey這個產生shuffle的操作開始劃分出新的Stage。但是其實這張圖是不準確的。
其實對於每一種有shuffle的操作,比如groupByKey、reduceByKey、countByKey的底層都對應了三個RDD:MapPartitionsRDD、ShuffleRdd、MapPartitionsRDD
(寬依賴shuffle生成的rdd為ShuffleRdd)
其中Shuffle發生在第一個RDD和第二個RDD之間,前面說過如果發現對某個RDD是寬依賴(產生Shuffle),那麼就會將寬依賴的那個RDD創建一個新的stage
所以說上圖中 reduceByKey操作其實對應了3個RDD,其中第一個RDD會被劃分到Stage1中!

4) DAGScheduler劃分Stage源碼

RDD類中所有的action算子觸發計算都會調用sc.runjob方法, 而sc.runjob方法底層都會調用到SparkContext中的dagscheduler對象的runJob方法
例如count這個action操作
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

一直追着runJob方法往底層看最終調用dagScheduler.runJob,傳入調用這個方法的rdd

dagScheduler.runJob內部調用submitJob提交當前的action到scheduler
submitJob內部調用DAGSchedulerEventProcessLoop發送JobSubmitted的信息,
在JobSubmitted內部最終調用dagScheduler的handleJobSubmitted(dagScheduler的核心入口)。

handleJobSubmitted方法如下:

上面代碼中submitStage提交作業,其內代碼如下:

submitStage方法中調用getMissingParentStages方法獲取finalStage的父stage,
如果不存在,則使用submitMissingTasks方法提交執行;
如果存在,則把該stage放到waitingStages中,同時遞歸調用submitStage。通過該算法把存在父stage的stage放入waitingStages中,不存在的作為作業運行的入口。

其中最重要的getMissingParentStages中是stage劃分的核心代碼,如下:

這裏就是前面說到的stage劃分的方式,查看最後一個rdd的依賴,如果是窄依賴,則不創建新的stage,如果是寬依賴,則用getOrCreateShuffledMapStage方法創建新的rdd,依次往前推。

所以Stage的劃分算法最核心的兩個方法為submitStage何getMissingParentStage

二、提交任務

當Stage提交運行后,在DAGScheduler的submitMissingTasks方法中,會根據Stage的Partition個數拆分對應個數任務,這些任務組成一個TaskSet提交到TaskScheduler進行處理。
對於ResultStage(最後一個Stage)生成ResultTask,對於ShuffleMapStage生成ShuffleMapTask。
每一個TaskSet包含對應Stage的所有task,這些Task的處理邏輯完全一樣,不同的是對應處理的數據,而這些數據是對應其數據分片的(Partition)。
submitMissingTasks如下:

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

CQRS之旅——旅程6(我們系統的版本管理)

旅程6:我們系統的版本管理

準備下一站:升級和遷移

“變化是生活的調味品。”威廉·考珀

此階段的最高目標是了解如何升級包含實現CQRS模式和事件源的限界上下文的系統。團隊在這一階段實現的用戶場景包括對代碼的更改和對數據的更改:更改了一些現有的數據模式並添加了新的數據模式。除了升級系統和遷移數據外,團隊還計劃在沒有停機時間的情況下進行升級和遷移,以便在Microsoft Azure中運行實時系統。

本章的工作術語定義:

本章使用了一些術語,我們將在下面進行描述。有關更多細節和可能的替代定義,請參閱參考指南中的“深入CQRS和ES”。

  • Command(命令):命令是要求系統執行更改系統狀態的操作。命令是必須服從(執行)的一種指令,例如:MakeSeatReservation。在這個限界上下文中,命令要麼來自用戶發起請求時的UI,要麼來自流程管理器(當流程管理器指示聚合執行某個操作時)。單個接收方處理一個命令。命令總線(command bus)傳輸命令,然後命令處理程序將這些命令發送到聚合。發送命令是一個沒有返回值的異步操作。

  • 事件(Event):一個事件,比如OrderConfirmed,描述了系統中發生的一些事情,通常是一個命令的結果。領域模型中的聚合引發事件。事件也可以來自其他限界上下文。多個訂閱者可以處理特定的事件。聚合將事件發布到事件總線。處理程序在事件總線上註冊特定類型的事件,然後將事件傳遞給訂閱服務器。在訂單和註冊限界上下文中,訂閱者是流程管理器和讀取模型生成器。

  • 冪等性(Idempotency):冪等性是一個操作的特性,這意味着該操作可以多次應用而不改變結果。例如,“將x的值設置為10”的操作是冪等的,而“將x的值加1”的操作不是冪等的。在消息傳遞環境中,如果消息可以多次傳遞而不改變結果,則消息是冪等的:這可能是因為消息本身的性質,也可能是因為系統處理消息的方式。

用戶故事:

在這個過程的這個階段,團隊實現了下面描述的用戶故事。

不停機升級

V2版本的目標是升級系統,包括任何必要的數據遷移,而不需要把系統停機。如果這在當前實現中不可行,那麼停機時間應該最小化,並且應該修改系統,以便在將來支持零停機時間升級(從V3版本開始)。

Beth(業務經理)發言:

確保我們能夠在不停機的情況下進行升級,這對我們在市場中的信譽至關重要。

显示剩餘座位數量

目前,當註冊者創建一個訂單時,沒有显示每種座位類型的剩餘座位數量。當註冊者選擇購買座位時,UI應該显示此信息。

處理不需要付費的座位

目前,當註冊者選擇不需要付費的座位時,UI流仍然會將註冊者帶到支付頁面,即使不需要支付任何費用。系統應該檢測什麼時候沒有支付,並調整流程,讓註冊者直接進入訂單的確認頁面。

架構

該應用程序旨在部署到Microsoft Azure。在旅程的那個階段,應用程序由兩個角色組成,一個包含ASP.Net MVC Web應用程序的web角色和一個包含消息處理程序和領域對象的工作角色。應用程序在寫端和讀端都使用Azure SQL DataBase實例進行數據存儲。應用程序使用Azure服務總線來提供其消息傳遞基礎設施。下圖展示了這個高級體繫結構。

在研究和測試解決方案時,可以在本地運行它,可以使用Azure compute emulator,也可以直接運行MVC web應用程序,並運行承載消息處理程序和領域域對象的控制台應用程序。在本地運行應用程序時,可以使用本地SQL Server Express數據庫,並使用一個在SQL Server Express數據庫實現的簡單的消息傳遞基礎設施。

有關運行應用程序的選項的更多信息,請參見附錄1“發布說明”。

模式和概念

在旅程的這個階段,團隊處理的大多數關鍵挑戰都與如何最好地執行從V1到V2的遷移有關。本節將介紹其中的一些挑戰。

處理“事件定義發生更改”的情況

當團隊檢查V2的發布需求,很明顯,我們需要改變在訂單和註冊限界上下文中使用的一些事件來適應一些新特性:RegistrationProcessManager將會改變,當訂單有一個不需要付費的座位時系統將提供一個更好的用戶體驗。

訂單和註冊限界上下文使用事件源,因此在遷移到V2之後,事件存儲將包含舊事件,但將開始保存新事件。當系統事件被重放時,系統必須能正確處理所有的舊事件和新事件。

團隊考慮了兩種方法來處理系統中的這類更改。

在基礎設施中進行事件映射或過濾

在基礎設施中映射和過濾事件消息是一種選擇。此方法是對舊的事件消息和消息格式進行處理,在它們到達領域之前在基礎設施的某個位置處理它們。您可以過濾掉不再相關的舊消息,並使用映射將舊格式的消息轉換為新格式。這種方法最初比較複雜,因為它需要對基礎設施進行更改,但是它可以保持領域域的純粹,領域只需要理解當前的新事件集合就可以了。

在聚合中處理多個版本的消息

在聚合中處理多個版本的消息是另一種選擇。在這種方法中,所有消息類型(包括舊消息和新消息)都傳遞到領域,每個聚合必須能夠處理舊消息和新消息。從短期來看,這可能是一個合適的策略,但它最終會導致域模型受到遺留事件處理程序的污染。

團隊為V2版本選擇了這個選項,因為它包含了最少數量的代碼更改。

Jana(軟件架構師)發言:

當前在聚合中處理舊事件和新事件並不妨礙您以後使用第一種選擇:在基礎設施中使用映射/過濾機制。

履行消息冪等性

V2版本中要解決的一個關鍵問題是使系統更加健壯。在V1版本中,在某些場景中,可能會多次處理某些消息,導致系統中的數據不正確或不一致。

Jana(軟件架構師)發言:

消息冪等性在任何使用消息傳遞的系統中都很重要,這不僅僅是在實現CQRS模式或使用事件源的系統中。

在某些場景中,設計冪等消息是可能的,例如:使用“將座位配額設置為500”的消息,而不是“在座位配額中增加100”的消息。您可以安全地多次處理第一個消息,但不能處理第二個消息。

然而,並不總是能夠使用冪等消息,因此團隊決定使用Azure服務總線的重複刪除特性,以確保它只傳遞一次消息。團隊對基礎設施進行了一些更改,以確保Azure服務總線能夠檢測重複消息,並配置Azure服務總線來執行重複消息檢測。

要了解Contoso是如何實現這一點的,請參閱下面的“不讓命令消息重複”一節。此外,我們需要考慮系統中的消息處理程序如何從隊列和Topic檢索消息。當前的方法使用Azure服務總線peek/lock機制。這是一個分成三個階段的過程:

  1. 處理程序從隊列或Topic檢索消息,並在其中留下消息的鎖定副本。其他客戶端無法看到或訪問鎖定的消息。
  2. 處理程序處理消息。
  3. 處理程序從隊列中刪除鎖定的消息。如果鎖定的消息在固定時間后沒有解鎖或刪除,則解鎖該消息並使其可用,以便再次檢索。

如果步驟由於某種原因失敗,這意味着系統可以不止一次地處理消息。

Jana(軟件架構師)發言:

該團隊計劃在旅程的下一階段解決這個問題(步驟失敗的問題)。更多信息,請參見第7章“添加彈性和優化性能”。

阻止多次處理事件

在V1中,在某些場景里,如果在處理事件時發生錯誤,系統可能多次處理事件。為了避免這種情況,團隊修改了體繫結構,以便每個事件處理程序都有自己對Azure Topic的訂閱。下圖显示了兩個不同的模型。

在V1中,可能發生以下行為:

  1. EventProcessor實例從服務總線中的所有訂閱者那裡接收到OrderPlaced事件。
  2. EventProcessor實例有兩個已註冊的處理程序,RegistrationProcessManagerRouterOrderViewModelGenerator處理程序類,所以會在兩個裡都觸發調用Handle方法。
  3. OrderViewModelGenerator類中的Handle方法執行成功。
  4. RegistrationProcessManagerRouter類中的Handle方法拋出異常。
  5. EventProcessor實例捕獲到異常然後拋棄掉事件消息。消息將自動放回訂閱中。
  6. EventProcessor實例第二次從所有訂閱者那裡接收到OrderPlaced事件。
  7. 事件又觸發兩個處理方法,導致RegistrationProcessManagerRouter類和OrderViewModelGenerator第二次處理事件消息。
  8. 每當RegistrationProcessManagerRouter類拋出異常時,OrderViewModelGenerator類都會觸發處理該事件。

在V2模型中,如果處理程序類拋出異常,EventProcessor實例將事件消息放回與該處理程序類關聯的訂閱。重試邏輯現在只會導致EventProcessor實例重試引發異常的處理程序,因此沒有其他處理程序會重新處理消息。

集成事件的持久化

在V1版本中提出的一個問題是,系統如何持久化從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件。這些事件包括關於會議創建和發布的信息,以及座位類型和配額更改的詳細信息。

在V1版本中,訂單和註冊上下文中的ConferenceViewModelGenerator類通過更新視圖模型並向SeatsAvailability聚合發送命令來處理這些事件,以告訴它更改座位配額值。

這種方法意味着訂單和註冊限界上下文不存儲任何歷史記錄,這可能會導致問題。例如,其他視圖從這裏中查找座椅類型描述時,這裏只包含座椅類型描述的最新值。因此,在其他地方重播一組事件可能會重新生成另一個包含不正確座椅類型描述的讀取模型投影。

團隊考慮了以下五個方法來糾正這種情況:

  • 將所有事件保存在原始限界上下文中(會議管理限界上下文中),並使用共享的事件存儲,訂單和註冊限界上下文中可以訪問該存儲來重播這些事件。接收限界上下文可以重放事件流,直到它需要查看的之前的座椅類型描述時為止。
  • 當所有事件到達接收限界上下文(訂單和註冊限界上下文)時保存它們。
  • 讓視圖模型生成器中的命令處理程序保存事件,只選擇它需要的那些。
  • 讓視圖模型生成器中的命令處理程序保存不同的事件,實際上就是為此視圖模型使用事件源。
  • 將來自所有限界上下文的所有命令和事件消息存儲在消息日誌中。

第一種選擇並不總是可行的。在這種特殊情況下,它可以工作,因為同一個團隊同時實現了限界上下文和基礎設施,使得使用共享事件存儲變得很容易。

Gary(CQRS專家)發言:

儘管從純粹主義者的角度來看,第一個選項破壞了限界上下文之間的嚴格隔離,但在某些場景中,它可能是一個可接受的實用解決方案。

第三種選擇可能存在的風險是,所需的事件集合可能在未來發生變化。如果我們現在不保存事件,它們將永遠丟失。

儘管第五個選項存儲了所有命令和事件,其中一些可能永遠都不需要再次引用,但它確實提供了一個完整的日誌,記錄了系統中發生的所有事情。這對於故障診斷很有用,還可以幫助您滿足尚未確定的需求。該團隊選擇了這個選項而不是選項二,因為它提供了一個更通用的機制,可能具有未來的好處。

持久化事件的目的是,當訂單和註冊上下文需要有關當前座位配額的信息時,可以回放這些事件,以便計算剩餘座位的數量。要一致地計算這些数字,必須始終以相同的順序回放事件。這種順序有幾種選擇:

  • 會議管理限界上下文發送事件的順序。
  • 訂單和註冊上下文接收事件的順序。
  • 訂單和註冊上下文處理事件的順序。

大多數情況下,這些順序是相同的。沒有什麼正確的順序。你只需要選擇一個和它保持一致就行了。因此,選擇由簡單性決定。在本例中,最簡單的方法是按照訂單和註冊限界上下文中處理程序接收事件的順序持久化事件(第二個選項)。

Markus(軟件開發人員)發言:

這種選擇通常不會出現在事件源中。每個聚合會都以固定的順序創建事件,這就是系統用於持久存儲事件的順序。在此場景中,集成事件不是由單個聚合創建的。

為這些事件保存時間戳也有類似的問題。如果將來需要查看特定時間剩餘的座位數量,那麼時間戳可能會很有用。這裏的選擇是,當事件在會議管理限界上下文中創建時,還是在訂單和註冊限界上下文中接收時,應該創建時間戳?當會議管理限界上下文創建事件時,訂單和註冊限界上下文可能由於某種原因離線。因此,團隊決定在會議管理有界上下文發布事件時創建時間戳。

消息排序

團隊創建並運行來驗證V1版本的驗收測試,凸顯出了消息排序的一個潛在問題:執行會議管理限界上下文的驗收測試向訂單和註冊限界上下文發送了一系列命令,這些命令有時會出現順序錯誤。

Markus(軟件開發人員)發言:

當人類用戶真實測試系統的這一部分時,不太會注意到這種效果,因為發出命令的時間間隔要長得多,這使得消息不太可能無序地到達。

團隊考慮了兩種方法來確保消息以正確的順序到達。

  • 第一個方法是使用消息會話,這是Azure服務總線的一個特性。如果您使用消息會話,這將確保會話內的消息以與它們發送時相同的順序傳遞。
  • 第二種方法是修改應用程序中的處理程序,通過使用發送消息時添加到消息中的序列號或時間戳來檢測無序消息。如果接收處理程序檢測到一條無序消息,它將拒絕該消息,並在處理了在被拒絕消息之前發送的消息之後,將其放回稍後處理的隊列或Topic。

在這種情況下,首選的解決方案是使用Azure服務總線消息會話,因為這隻需要對現有代碼進行更少的更改。這兩種方法都會給消息傳遞帶來一些額外的延遲,但是團隊並不認為這會對系統的性能產生顯著的影響。

實現細節

本節描述訂單和註冊限界上下文的實現的一些重要功能。您可能會發現擁有一份代碼拷貝很有用,這樣您就可以繼續學習了。您可以從Download center下載一個副本,或者在GitHub上查看存儲庫:https://github.com/mspnp/cqrs-journey-code。您可以從GitHub上的Tags頁面下載V2版本的代碼。

備註:不要期望代碼示例與參考實現中的代碼完全匹配。本章描述了CQRS過程中的一個步驟,隨着我們了解更多並重構代碼,實現可能會發生變化。

**添加對“不需要支付的訂單”的支持

做出這一改變有三個具體的目標,它們都是相關的。我們希望:

  • 修改RegistrationProcessManager類和相關聚合,以處理不需要支付的訂單。
  • 修改UI中的導航,當訂單不需要支付時跳過付款步驟。
  • 確保系統在升級到V2之後能夠正確地工作,包括使用新事件和舊事件。

RegistrationProcessManager類的更改

在此之前,RegistrationProcessManager類在收到來自UI的註冊者已完成支付的通知后發送了一個ConfirmOrderPayment命令。現在,如果有一個不需要支付訂單,UI將直接向訂單聚合發送一個ConfirmOrder命令。如果訂單需要支付,RegistrationProcessManager類在從UI接收到成功支付的通知后,再向訂單聚合發送一個ConfirmOrder命令。

Jana(軟件架構師)發言:

注意,命令的名稱已從ConfirmOrderPayment更改為ConfirmOrder。這反映了訂單不需要知道任何關於付款的信息。它只需要知道訂單已經確認。類似地,現在有一個新的OrderConfirmed事件用於替代舊的OrderPaymentConfirmed事件。

當訂單聚合接收到ConfirmOrder命令時,它將引發一個OrderConfirmed事件。除被持久化外,該事件還由以下對象處理:

  • OrderViewModelGenerator類,它在其中更新讀取模型中的訂單狀態。
  • SeatAssignments聚合,在其中初始化一個新的SeatAssignments實例。
  • RegistrationProcessManager類,它在其中觸發一個提交座位預訂的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器類中的SpecifyRegistrantAndPaymentDetails action里的。之前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。現在,如果Order對象的新IsFreeOfCharge屬性為true,它將返回一個CompleteRegistrationWithoutPayment(action result)。否則,它返回一個CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}

CompleteRegistrationWithThirdPartyProcessorPayment將用戶重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法將用戶直接重定向到ThankYou action。

數據遷移

會議管理限界上下文在其Azure SQL數據庫實例中的PricedOrders表中存儲來自訂單和註冊限界上下文的訂單信息。以前,會議管理限界上下文接收OrderPaymentConfirmed事件,現在它接收OrderConfirmed事件,該事件包含一個附加的IsFreeOfCharge屬性。這將成為數據庫中的一個新列。

Markus(軟件開發人員)發言:

在遷移過程中,我們不需要修改該表中的現有數據,因為布爾值的默認值為false。所有現有條目都是在系統支持不需要付費的訂單之前創建的。

在遷移過程中,任何正在運行的ConfirmOrderPayment命令都可能丟失,因為它們不再由訂單聚合處理。您應該驗證當前的命令總線沒有這些命令。

Poe(IT運維人員)發言:

我們需要仔細計劃如何部署V2版本,以便確保所有現有的、正在運行的ConfirmOrderPayment命令都由運行V1版本的工作角色實例處理。

系統將RegistrationProcessManager類實例的狀態保存到SQL數據庫表中。這個表的架構沒有變化。遷移后您將看到的惟一更改是StateValue列中的一個新添加值。這反映了RegistrationProcessManager類中的ProcessState枚舉中額外的PaymentConfirmationReceived值,如下面的代碼示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}

在V1版本中,事件源系統為訂單聚合保存的事件包括OrderPaymentConfirmed事件。因此,事件存儲區包含此事件類型的實例。在V2版本中,OrderPaymentConfirmed事件被替換為OrderConfirmed事件。

團隊決定在V2版本中,當反序列化事件時,不在基礎設施級別映射和過濾事件。這意味着,當系統從事件存儲中重播這些事件時,處理程序必須同時理解舊事件和新事件。下面的代碼示例在SeatAssignmentsHandler類中显示了這一點:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}

您還可以在OrderViewModelGenerator類中看到同樣的技術。

Order類中的方法略有不同,因為這是持久化到事件存儲中的事件之一。下面的代碼示例显示了Order類中受保護構造函數的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}

Jana(軟件架構師)發言:

以這種方式處理舊事件對於這個場景非常簡單,因為惟一需要更改的是事件的名稱。如果事件的屬性也發生了變化,情況會更加複雜。將來,Contoso將考慮在基礎設施中進行映射,以避免遺留事件污染領域模型。

在UI中显示剩餘座位

做出這一改變有三個具體的目標,它們都是相關的。我們想要:

  • 修改系統,在會議系統的讀模型中包含每個座位類型的剩餘座位數量信息。
  • 修改UI以显示每種座位類型的剩餘座位數量。
  • 確保升級到V2后系統功能正常。

向讀模型添加關於剩餘座位數量的信息

系統要能显示剩餘座位數量的信息來自兩個地方:

  • 當業務客戶創建新的座位類型或修改座位配額時,會議管理限界上下文將引發SeatCreatedSeatUpdated事件。
  • 在訂單和註冊限界上下文中,當註冊者創建一個訂單的時候,可用座位(SeatsAvailability)聚合將引發SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

備註:ConferenceViewModelGenerator類不使用SeatCreatedSeatUpdated事件。

訂單和註冊限界上下文中的ConferenceViewModelGenerator類現在處理這些事件,並使用它們來計算和存儲讀模型中的座位類型數量。下面的代碼示例显示了ConferenceViewModelGenerator類中的相關處理程序:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}

UpdateAvailableQuantity方法將事件上的版本與讀模型的當前版本進行比較,以檢測可能的重複消息。

Markus(軟件開發人員)發言:

此檢查僅檢測重複的消息,而不是超出序列的消息。

修改UI以显示剩餘的座位數量

現在,當UI向會議的讀模型查詢座位類型列表時,列表包括當前可用的座位數量。下面的代碼示例显示了RegistrationController MVC控制器如何使用SeatType類的AvailableQuantity

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}

數據遷移

保存會議讀模型數據的數據庫有一個新列來保存用於檢查重複事件的版本號,而保存座位類型讀模型數據有一個新列來保存可用的座椅數量。

作為數據遷移的一部分,有必要為每個可用座位(SeatsAvailability)聚合重放事件存儲中的所有事件,以便正確計算可用數量。

不讓命令消息重複

系統目前使用Azure服務總線傳輸消息。當系統從ConferenceProcessor類的啟動代碼初始化Azure服務總線時,它配置Topic來檢測重複的消息,如下面的ServiceBusConfig類的代碼示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
備註:您可以在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow
可以向Topic元素添加這個屬性。默認值是1小時。

但是,為了使重複檢測工作正常,您必須確保每個消息都有一個惟一的ID。下面的代碼示例显示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}

CommandBus類中的BuildMessage方法使用命令Id創建一個惟一的消息Id, Azure服務總線可以使用這個消息Id來檢測重複:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 

保證消息順序

團隊決定使用Azure服務總線消息會話來保證系統中的消息順序。

系統從ConferenceProcessor類中的OnStart方法配置Azure服務總線Topic和訂閱。Settings.xml配置文件中的配置指定了具體的訂閱使用會話。ServiceBusConfig類中的以下代碼示例显示了系統如何創建和配置訂閱。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}

以下來自SessionSubscriptionReceiver類的代碼示例演示了如何使用會話接收消息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}

Markus(軟件開發人員)發言:

您可能會發現,將使用消息會話的ReceiveMessages方法的這個版本與SubscriptionReceiver類中的原始版本進行比較是很有用的。

您必須確保當你發送消息包含一個會話ID,這樣才能使用消息會話接收一條消息。系統使用事件的SourceID作為會話ID,如下面的代碼示例所示的EventBus類中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();

通過這種方式,您可以確保以正確的順序接收來自單個源的所有消息。

Poe(IT運維人員)發言:

在V2版本中,團隊更改了系統創建Azure服務總線Topic和訂閱的方式。之前,SubscriptionReceiver類創建了它們(如果它們還不存在)。現在,系統在應用程序啟動時使用配置數據創建它們。這發生在啟動過程的早期,以避免在系統初始化訂閱之前將消息發送到Topic時丟失消息的風險。

然而,只有當消息按正確的順序傳遞到總線上時,會話才能保證按順序傳遞消息。如果系統異步發送消息,則必須特別注意確保消息以正確的順序放在總線上。在我們的系統中,來自每個單獨聚合實例的事件按順序到達是很重要的,但是我們不關心來自不同聚合實例的事件的順序。因此,儘管系統異步發送事件,EventStoreBusPublisher實例仍然會在發送下一個事件之前等待前一個事件已發送的確認。以下來自TopicSender類的示例說明了這一點:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}

Jana(軟件架構師)發言:

此代碼示例展示了系統如何使用Transient Fault Handling Application Block來讓異步調用可靠。

有關消息排序和Azure服務總線的更多信息,請參見Microsoft Azure Queues and Microsoft Azure Service Bus Queues – Compared and Contrasted

有關異步發送消息和排序的信息,請參閱博客文章Microsoft Azure Service Bus Splitter and Aggregator

從會議管理限界上下文中持久化事件

團隊決定創建一個包含所有發送的命令和事件的消息日誌。這將使訂單和註冊限界上下文能夠從會議管理限界上下文查詢此日誌,以獲取其構建讀模型所需的事件。這不是事件源,因為我們沒有使用這些事件來重建聚合的狀態,儘管我們使用類似的技術來捕獲和持久化這些集成事件。

Gary(CQRS專家)發言:

此消息日誌確保不會丟失任何消息,以便將來能夠滿足其他需求。

向消息添加額外元數據

系統現在將所有消息保存到消息日誌中。為了方便查詢特定命令或事件,系統現在向每個消息添加了更多的元數據。以前,惟一的元數據是事件類型,現在,事件元數據包括事件類型、命名空間、程序集和路徑。系統將元數據添加到EventBus類中的事件和CommandBus類中的命令中。

捕獲消息並將消息持久化到消息日誌中

系統使用Azure服務總線中對會議/命令和會議/事件topic的額外訂閱來接收系統中每條消息的副本。然後,它將消息附加到Azure表存儲中。下面的代碼示例显示了AzureMessageLogWriter類的實例,它用於將消息保存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 

Kind屬性指定消息是命令還是事件。MessageId和CorrelationId屬性由消息傳遞基礎設施設置的,其餘屬性是從消息元數據中設置的。

下面的代碼示例显示了這些消息的分區和RowKey的定義:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId

注意,RowKey保存了消息最初發送的順序,並添加到消息ID上,以確保惟一性,以防兩條消息同時入隊。

Jana(軟件架構師)發言:

這與事件存儲不同,在事件存儲區中,分區鍵標識聚合實例,而RowKey標識聚合的版本號。

數據遷移

當Contoso將系統從V1遷移到V2時,它將使用消息日誌在訂單和註冊限界上下文中重建會議和價格訂單的讀模型。

Gary(CQRS專家)發言:

Contoso可以在需要重建與聚合無關的事件構建的讀模型時來使用消息日誌,例如來自會議管理限界上下文的集成事件。

會議讀模型包含會議的信息,並包含來自會議管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

價格訂單讀模型持有來自於SeatCreated和SeatUpdated事件的信息,這些事件來自於會議管理限界上下文。

然而,在V1中,這些事件消息沒有被持久化,因此讀模型不能在V2中重新填充。為了解決這個問題,團隊實現了一個數據遷移實用程序,它使用一種最佳方法來生成包含要存儲在消息日誌中的丟失數據的事件。例如,在遷移到V2之後,消息日誌不包含ConferenceCreated事件,因此遷移實用程序在會議管理限界上下文使用的數據庫中找到這些信息,並創建丟失的事件。您可以在MigrationToV2項目的Migrator類中的GeneratePastEventLogMessagesForConferenceManagement方法中看到這是如何完成的。

Markus(軟件開發人員)發言:

您可以在這個類中看到,Contoso還將所有現有的事件源事件複製到消息日誌中。

如下面所示,Migrator類中的RegenerateViewModels方法重新構建讀取的模型。它通過調用Query方法從消息日誌中檢索所有事件,然後使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater類來處理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}

Jana(軟件架構師)發言:

查詢可能不會很快,因為它將從多個分區檢索實體。

注意這個方法如何使用NullCommandBus實例來接收來自ConferenceViewModelGenerator實例的任何命令,因為我們只是在這裏重新構建讀模型。

以前,PricedOrderViewModelGenerator使用ConferenceDao類來獲取關於座位的信息。現在,它是自治的,並直接處理SeatCreatedSeatUpdated事件來維護這些信息。作為遷移的一部分,必須將此信息添加到讀模型中。在前面的代碼示例中,PricedOrderViewModelUpdater類只處理SeatCreatedSeatUpdated事件,並將缺失的信息添加到價格訂單讀模型中。

從V1遷移到V2

從V1遷移到V2需要更新已部署的應用程序代碼並遷移數據。在生產環境中執行遷移之前,應該始終在測試環境中演練遷移。以下是所需步驟:

  1. 將V2版本部署到Azure的staging環境中。V2版本有一個MaintenanceMode屬性,最初設置為true。在此模式下,應用程序向用戶显示一條消息,說明站點當前正在進行維護,而工作角色將不處理消息。
  2. 準備好之後,將V2版本(仍然處於維護模式,MaintenanceMode為true)切換到Azure生產環境中。
  3. 讓V1版本(現在在staging環境中運行)運行幾分鐘,以確保所有正在運行的消息都完成了它們的處理。
  4. 運行遷移程序來遷移數據(參見下面)。
  5. 成功完成數據遷移后,將每種工作角色的MaintenanceMode屬性更改為false。
  6. V2版本現在運行在Azure中。

Jana(軟件架構師)發言:

團隊考慮使用單獨的應用程序在升級過程中向用戶显示一條消息,告訴他們站點正在進行維護。然而,在V2版本中使用MaintenanceMode屬性提供了一個更簡單的過程,併為應用程序添加了一個潛在有用的新特性。

Poe(IT運維人員)發言:

由於對事件存儲的更改,不可能執行從V1到V2的無停機升級。然而,團隊所做的更改將確保從V2遷移到V3將不需要停機時間。

Markus(軟件開發人員)發言:

團隊對遷移實用程序應用了各種優化,例如批處理操作,以最小化停機時間。

下面幾節總結了從V1到V2的數據遷移。這些步驟中的一些在前面已經討論過,涉及到應用程序的特定更改或增強。

團隊為V2引入的一個更改是,將所有命令和事件消息的副本保存在消息日誌中,以便作為未來的證據,通過捕獲將來可能使用的所有內容來保證應用程序的安全性。遷移過程考慮到了這個新特性。

因為遷移過程複製了大量的數據,所以您應該在Azure工作角色中運行遷移過程,以最小化成本。遷移實用程序是一個控制台應用程序,因此您可以使用Azure和遠程桌面服務。有關如何在Azure角色實例中運行應用程序的信息,請參見Using Remote Desktop with Microsoft Azure Roles。

Poe(IT運維人員)發言:

在一些組織中,安全策略不允許您在Azure生產環境使用遠程桌面服務。但是,您只需要一個在遷移期間承載遠程桌面會話的工作角色,您可以在遷移完成后刪除它。您還可以將遷移代碼作為工作角色而不是控制台應用程序運行,並確保它記錄遷移的狀態,以便您驗證。

為會議管理限界上下文生成過去的日誌消息

遷移過程的一部分是在可能的情況下重新創建V1版本處理后丟棄的消息,然後將它們添加到消息日誌中。在V1版本中,所有從會議管理限界上下文發送到訂單和註冊限界上下文的集成事件都以這種方式丟失了。系統不能重新創建所有丟失的事件,但可以創建表示遷移時系統狀態的事件。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

遷移事件源里的事件

在V2版本中,事件存儲為每個事件存儲額外的元數據,以便於查詢事件。遷移過程將所有事件從現有事件存儲複製到具有新模式的新事件存儲。

Jana(軟件架構師)發言:

原始事件不會以任何方式更新,而是被視為不可變的。

同時,系統將所有這些事件的副本添加到V2版本中引入的消息日誌中。

有關更多信息,請參見MigrationToV2項目中Migrator類中的MigrateEventSourcedAndGeneratePastEventLogs

重建讀模型**

V2版本包括對訂單和註冊限界上下文中讀模型定義的幾個更改。MigrationToV2項目在訂單和註冊限界上下文中重新構建會議的讀模型和價格訂單的讀模型。

有關更多信息,請參見本章前面的“從會議管理限界上下文中持久化事件”一節。

對測試的影響

在這個過程的這個階段,測試團隊繼續擴展驗收測試集合。他們還創建了一組測試來驗證數據遷移過程。

再說SpecFlow

之前,這組SpecFlow測試以兩種方式實現:通過自動化web瀏覽器模擬用戶交互,或者直接在MVC控制器上操作。這兩種方法都有各自的優缺點,我們在第4章“擴展和增強訂單和註冊限界上下文”中討論過。

在與另一位專家討論了這些測試之後,團隊還實現了第三種方法。從領域驅動設計(DDD)方法的角度來看,UI不是領域模型的一部分,核心團隊的重點應該是在領域專家的幫助下理解領域,並在領域中實現業務邏輯。UI只是机械部分,用於使用戶能夠與領域進行交互。因此,驗收測試應該包括驗證領域模型是否以領域專家期望的方式工作。因此,團隊使用SpecFlow創建了一組驗收測試,這些測試旨在在不影響系統UI部分的情況下測試領域。

下面的代碼示例显示了SelfRegistrationEndToEndWithDomain.feature文件,該文件在Conference.AcceptanceTests項目中的Features\Domain\Registration文件夾里,注意When和Then子句怎麼使用命令和事件的。

Gary(CQRS專家)發言:

通常,如果您的領域模型只使用聚合,您會期望When子句發送命令,Then子句查看事件或異常。然而,在本例中,領域模型包含一個通過發送命令來響應事件的流程管理器。測試將檢查是否發送了所有預期的命令,並引發了所有預期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type                 | rate | quota |
    | General admission         | $199 | 100   |
    | CQRS Workshop             | $500 | 100   |
    | Additional cocktail party | $50  | 100   |
And the selected Order Items
    | seat type                 | quantity |
    | General admission         | 1        |
    | Additional cocktail party | 1        |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted

下面的代碼示例显示了feature文件的一些步驟實現。這些步驟使用命令總線發送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}

在遷移過程中發現的bug

當測試團隊在遷移之後在系統上運行測試時,我們發現訂單和註冊限界上下文中座位類型的數量與遷移之前的數量不同。調查揭示了以下原因。

如果會議從未發布過,則會議管理限界上下文允許業務客戶刪除座位類型,但不會引發集成事件向訂單和註冊限界上下文報告這一情況。所以,當業務客戶創建新的座位類型時,訂單和註冊限界上下文從會議管理限界上下文接收事件,而不是當業務客戶刪除座位類型時。

遷移過程的一部分創建一組集成事件,以替換V1版本處理后丟棄的事件。它通過讀取會議管理限界上下文使用的數據庫來創建這些事件。此過程沒有為已刪除的座位類型創建集成事件。

總之,在V1版本中,已刪除的座位類型錯誤地出現在訂單和註冊限界上下文的讀模型中。在遷移到V2版本之後,這些已刪除的座位類型沒有出現在訂單和註冊限界上下文的讀模型中。

Poe(IT運維人員)發言:

測試遷移過程不僅驗證遷移是否按預期運行,而且可能揭示應用程序本身的bug。

總結

在我們旅程的這個階段,我們對系統進行了版本控制,並完成了V2偽生產版本。這個新版本包含了一些額外的功能和特性,比如支持不需要付費的訂單和在UI中显示更多信息。

我們還對基礎設施做了一些改變。例如,我們使更多的消息具有冪等性,現在持久化集成事件。下一章將描述我們旅程的最後階段,我們將繼續增強基礎設施,並在準備發布V3版本時加強系統。

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!

【拆分版】Docker-compose構建Logstash多實例,基於7.1.0

【拆分版】Docker-compose構建Logstash多實例

寫在最前

說起Logstash,這個組件並沒有什麼集群的概念,與其說是集群,不如說是各自去收集日誌分析過濾存儲到Elasticsearch中。這裏做個多實例的Logstash,其實本質上只是為Logstash指定好佔用的端口,輸入輸出的配置的掛載,如是而已。

本文配置為紅框中的部分:Logstash多節點收集的數據,統統輸出數據到es-tribe,讓這個協調節點自己去負載均衡寫入數據。

配置詳見git倉庫 https://github.com/hellxz/docker-logstash-multiple.git
如有疑問或本文寫得有出入的地方,期望評論指定。

目錄結構

├── docker-ls-multiple-down.sh
├── docker-ls-multiple-up.sh
├── logstash-01
│   ├── config
│   │   ├── logstash.conf
│   │   └── logstash.yml
│   ├── docker-compose.yml
│   └── .env
├── logstash-02
│   ├── config
│   │   ├── logstash.conf
│   │   └── logstash.yml
│   ├── docker-compose.yml
│   └── .env
└── logstash-03
    ├── config
    │   ├── logstash.conf
    │   └── logstash.yml
    ├── docker-compose.yml
    └── .env

文件說明

logstash-01舉例說明

.envdocker-compose.yml提供了Logstash配置文件目錄的位置,如果不放置到其他位置,無需更改

# .env file for docker-compose default. please be careful.
# logstash config dir mount set. change inside dir config file to change logstash cluster settings.
# default use relation path. don't change if you don't know what means.
LOGSTASH_CONFIG_DIR=./config

docker-compose.yml 為docker-compose的配置文件,這裏只讀取了.env的配置文件的路徑,並把路徑下的logstash.conf掛載到容器中logstash目錄下pipeline/logstash.conf,掛載logstash.yml到logstash目錄下config/logstash.yml

version: "3"
services:
    logstash-1:
        image: logstash:7.1.0
        container_name: logstash-1
        volumes:
            - ${LOGSTASH_CONFIG_DIR}/logstash.conf:/usr/share/logstash/pipeline/logstash.conf:rw
            - ${LOGSTASH_CONFIG_DIR}/logstash.yml:/usr/share/logstash/config/logstash.yml:rw
        network_mode: "host"

logstash.conf為模板文件,輸入輸出以及配置都可以在這裏修改

input {     #輸入
  kafka {   #使用kafka方式輸入
    bootstrap_servers => "kafka1:9092,kafka2:9093,kafka3:9094" #kafka集群節點列表
    topics => ["all_logs"] #訂閱名為all_logs的topic
    group_id => "logstash" #設置組為logstash
    codec => json #轉換為json
  }
}

filter { #過濾分詞等都在這裏配置,暫時未配置

}

output {     #輸出
  elasticsearch { #輸出到es
    hosts => ["10.2.114.110:9204"] #es的路徑
    index => "all-logs-%{+YYYY.MM.dd}" #輸出到es的索引名稱,這裡是每天一個索引
    #user => "elastic"
    #password => "changeme"
  }
  stdout {
    codec => rubydebug
  }
}

此處設置並不是本文中的重點,有興趣和需要請參考其它文章的相關配置

logstash.yml 為logstash的配置文件,只寫了些與集群相關的,還有更多請參考其它文章.

# set now host ip to http.host
http.host: 10.2.114.110
# set the es-tribe-node host. let logstash monitor the es.
xpack.monitoring.elasticsearch.hosts:
- http://10.2.114.110:9204
# enable or disable the logstash monitoring the es.
xpack.monitoring.enabled: true

這裏沒有指定Logstash啟動時的端口號,Logstash默認端口為9600,多實例在同主機時,會自動分配9600后的端口
另外兩個腳本文件,僅在使用同一台主機時使用,便捷啟動/關閉多節點Logstash

使用說明

  1. 需要確保多台主機均能正常ping通
  2. 確保Zookeeper集群與Kafka集群已經啟動,並且Logstash訂閱的borkers的列表能對得起來
  3. 確保Elasticsearch集群正常啟動
  4. 宿主機/etc/hosts添加kafka1kafka2kafka3映射到對應的kafka所在的宿主機ip
  5. 修改每個Logstash目錄下的config/logstash.conf中的輸出es部分的ip到es-tribe對應的宿主機ip
  6. 修改每個Logstash目錄下的config/logstash.yml中的http.host為當前宿主機ip, 修改xpack.monitoring.elasticsearch.hosts為當前es-tribe宿主機ip與port
  7. 進入每個Logstash目錄執行docker-compose up -d以啟動集群,執行docker-compose down以關閉集群

本文系原創文章,謝絕轉載

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

JavaScript系列–JavaScript數組高階函數reduce()方法詳解及奇淫技巧

一、前言

reduce() 方法接收一個函數作為累加器,數組中的每個值(從左到右)開始縮減,最終計算為一個值。

reduce() 可以作為一個高階函數,用於函數的 compose。

reduce()方法可以搞定的東西,for循環,或者forEach方法有時候也可以搞定,那為啥要用reduce()?這個問題,之前我也想過,要說原因還真找不到,唯一能找到的是:通往成功的道路有很多,但是總有一條路是最捷徑的,亦或許reduce()逼格更高。

 

二、語法

arr.reduce(callback,initialValue)

返回最後一個值,reduce 為數組中的每一個元素依次執行回調函數,不包括數組中被刪除或從未被賦值的元素,接受四個參數:初始值(或者上一次回調函數的返回值),當前元素值,當前索引,調用 reduce 的數組。

 

三、實例解析intialValue參數

1、第一個例子:

var arr = [1, 2, 3, 4]; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; }) console.log(arr, sum);

打印結果:
1 2 1
3 3 2
6 4 3
[1, 2, 3, 4] 10

 

2、第二個例子

var arr = [1, 2, 3, 4]; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; },0) //注意這裏設置了初始值 console.log(arr, sum);

打印結果:
0 1 0
1 2 1
3 3 2
6 4 3
[1, 2, 3, 4] 10

這個例子index是從0開始的,第一次的prev的值是我們設置的初始值0,數組長度是4,reduce函數循環4次。

結論:如果沒有提供initialValue,reduce 會從索引1的地方開始執行 callback 方法,跳過第一個索引。如果提供initialValue,從索引0開始。

 

注意:如果這個數組為空,運用reduce是什麼情況?

var arr = []; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; }) //報錯,"TypeError: Reduce of empty array with no initial value"

但是要是我們設置了初始值就不會報錯,如下:

var arr = []; var sum = arr.reduce(function(prev, cur, index, arr) { console.log(prev, cur, index); return prev + cur; },0) console.log(arr, sum); // [] 0

所以一般來說,提供初始值更加安全。

 

四、reduce簡單用法

當然最簡單的就是我們常用的數組求和,求乘積了。

var arr = [1, 2, 3, 4]; var sum = arr.reduce((x,y)=>x+y) var mul = arr.reduce((x,y)=>x*y) console.log( sum ); //求和,10 console.log( mul ); //求乘積,24

 

五、reduce高級用法

(1)計算數組中每個元素出現的次數

let names = ['Alice', 'Bob', 'Tiff', 'Bruce', 'Alice']; let nameNum = names.reduce((pre,cur)=>{ if(cur in pre){ pre[cur]++ }else{ pre[cur] = 1 } return pre },{}) console.log(nameNum); //{Alice: 2, Bob: 1, Tiff: 1, Bruce: 1}

 

(2)數組去重

let arr = [1,2,3,4,4,1] let newArr = arr.reduce((pre,cur)=>{ if(!pre.includes(cur)){ return pre.concat(cur) }else{ return pre } },[]) console.log(newArr);// [1, 2, 3, 4]

 

(3)將二維數組轉化為一維

let arr = [[0, 1], [2, 3], [4, 5]] let newArr = arr.reduce((pre,cur)=>{ return pre.concat(cur) },[]) console.log(newArr); // [0, 1, 2, 3, 4, 5]

 

(4)將多維數組轉化為一維

let arr = [[0, 1], [2, 3], [4,[5,6,7]]] const newArr = function(arr){ return arr.reduce((pre,cur)=>pre.concat(Array.isArray(cur)?newArr(cur):cur),[]) } console.log(newArr(arr)); //[0, 1, 2, 3, 4, 5, 6, 7]

 

(5)對象里的屬性求和

var result = [ { subject: 'math', score: 10 }, { subject: 'chinese', score: 20 }, { subject: 'english', score: 30 } ]; var sum = result.reduce(function(prev, cur) { return cur.score + prev; }, 0); console.log(sum) //60

 

(6)將[1,3,1,4]轉為数字1314

function addDigitValue(prev,curr,curIndex,array){ var exponent = (array.length -1) -curIndex; var digitValue = curr*Math.pow(10,exponent); return prev + digitValue; } var arr6 = [1,3,1,4]; var result7 = arr6.reduce(addDigitValue,0) console.info('result7',result7)

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

『開發技巧』Python音頻操作工具PyAudio上手教程

『開發技巧』Python音頻操作工具PyAudio上手教程

0.引子

當需要使用Python處理音頻數據時,使用python讀取與播放聲音必不可少,下面介紹一個好用的處理音頻PyAudio工具包。

PyAudio是Python開源工具包,由名思義,是提供對語音操作的工具包。提供錄音播放處理等功能,可以視作語音領域的OpenCv。

 

1.簡介

 

PyAudio為跨平台音頻I / O庫PortAudio提供Python 綁定。使用PyAudio,您可以輕鬆地使用Python在各種平台上播放和錄製音頻,例如GNU / Linux,Microsoft Windows和Apple Mac OS X / macOS。

PyAudio的靈感來自:

  • pyPortAudio / fastaudio:PortAudio v18 API的Python綁定。
  • tkSnack:Tcl / Tk和Python的跨平台聲音工具包。

 

2.安裝

 

目前的版本是PyAudio v0.2.11。在大多數平台上使用pip安裝PyAudio。對於v0.2.9之前的版本,PyAudio分發安裝二進制文件,這些文件 存檔在這裏。

 

微軟Windows 

使用pip安裝:

python -m pip install pyaudio

筆記:

  • 如果pip尚未與您的Python安裝捆綁在一起,請在此處獲取 。
  • pip將獲取並安裝PyAudio輪(預先打包的二進制文件)。目前,有車輪兼容Python 2.7,3.4,3.5和3.6 的 官方發行版。對於這些版本,可以使用32位和64位車輪。
  • 這些二進制文件包括使用MinGW構建的PortAudio v19 v190600_20161030。它們僅支持Windows MME API,包括對DirectX,ASIO等的支持。如果需要支持未包含的API,則需要編譯PortAudio和PyAudio。

 

Apple Mac OS X.

使用Homebrew安裝必備的portaudio庫,然後使用pip安裝PyAudio:

brew install portaudio 
pip install pyaudio

筆記:

  • 如果尚未安裝,請下載 Homebrew。
  • pip將下載PyAudio源代碼併為您的Python版本構建它。
  • Homebrew和構建PyAudio還需要安裝Xcode命令行工具(更多信息)。

 

Debian / Ubuntu

使用包管理器安裝PyAudio:

sudo apt-get install python-pyaudio python3-pyaudio

如果沒有最新版本的PyAudio,請使用pip安裝它:

pip install pyaudio

筆記:

  • pip將下載PyAudio源併為您的系統構建它。請務必事先安裝portaudio庫開發包(portaudio19-dev)和python開發包(python-all-dev)。
  • 為了更好地隔離系統包,請考慮在virtualenv中安裝PyAudio 。

 

PyAudio來源

源代碼可從Python Package Index(PyPI)下載:pypi.python.org/pypi/PyAudio。

或克隆git存儲庫:

git clone https://people.csail.mit.edu/hubert/git/pyaudio.git

要從源代碼構建PyAudio,您還需要構建 PortAudio v19。有關為各種平台構建PyAudio的一些說明,請參閱編譯提示。要使用Microsoft Visual Studio構建PyAudio,請查看Sebastian Audet的說明。

 

 

3.示例

1).採集音頻

下面以一段代碼演示如何從計算機麥克風採集一段音頻,採集音頻時長 4s,保存文件 output.wav

使用了tqdm模塊,可以方便显示出來讀取過程,如下:

* recording
100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 172/172 [00:03<00:00, 43.40it/s] 
* done recording

import pyaudio import wave from tqdm import tqdm def record_audio(wave_out_path,record_second): CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 2 RATE = 44100 p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) wf = wave.open(wave_out_path, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) print("* recording") for i in tqdm(range(0, int(RATE / CHUNK * record_second))): data = stream.read(CHUNK) wf.writeframes(data) print("* done recording") stream.stop_stream() stream.close() p.terminate() wf.close() record_audio("output.wav",record_second=4)

要使用PyAudio,首先使用pyaudio.PyAudio()(1)實例化PyAudio ,它設置portaudio系統。

要錄製或播放音頻,請使用pyaudio.PyAudio.open() (2)在所需設備上打開所需音頻參數的流。這設置了pyaudio.Stream播放或錄製音頻。

通過使用流式傳輸pyaudio.Stream.write()音頻數據或使用流式傳輸音頻數據來播放音頻 pyaudio.Stream.read()。(3)

請注意,在“阻止模式”中,每個pyaudio.Stream.write()或 pyaudio.Stream.read()阻止直到所有給定/請求的幀都被播放/記錄。或者,要動態生成音頻數據或立即處理錄製的音頻數據,請使用下面概述的“回調模式”。

使用pyaudio.Stream.stop_stream()暫停播放/錄製,並pyaudio.Stream.close()終止流。(4)

最後,使用pyaudio.PyAudio.terminate()(5)終止portaudio會話

 

2).播放音頻

下面使用播放的功能來播放1)中保存的音頻 output.wav

通過tqdm,显示播放進度條,如下:


100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 172/172 [00:03<00:00, 43.40it/s] 

"""PyAudio Example: Play a WAVE file.""" import pyaudio import wave from tqdm import tqdm def play_audio(wave_path): CHUNK = 1024 wf = wave.open(wave_path, 'rb') # instantiate PyAudio (1) p = pyaudio.PyAudio() # open stream (2) stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True) # read data data = wf.readframes(CHUNK) # play stream (3) datas = [] while len(data) > 0: data = wf.readframes(CHUNK) datas.append(data) for d in tqdm(datas): stream.write(d) # stop stream (4) stream.stop_stream() stream.close() # close PyAudio (5) p.terminate() play_audio("output.wav")

2).以回調方式播放音頻

當需要在執行其他程序時同時播放音頻,可以使用回調的方式播放,示例代碼如下:

"""PyAudio Example: Play a WAVE file.""" import pyaudio import wave from tqdm import tqdm import time def play_audio_callback(wave_path): CHUNK = 1024 wf = wave.open(wave_path, 'rb') # instantiate PyAudio (1) p = pyaudio.PyAudio() def callback(in_data, frame_count, time_info, status): data = wf.readframes(frame_count) return (data, pyaudio.paContinue) # open stream (2) stream = p.open(format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True, stream_callback=callback) # read data stream.start_stream() while stream.is_active(): time.sleep(0.1) # stop stream (4) stream.stop_stream() stream.close() # close PyAudio (5) p.terminate() play_audio_callback("output.wav")

 

Reference:

1.http://people.csail.mit.edu/hubert/pyaudio/

 

 

【精選推薦文章】

如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

想要讓你的商品在網路上成為最夯、最多人討論的話題?

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師"嚨底家"!!