Trong các khối hệ thống kinh doanh, dữ liệu thường đề nghị được cập nhật trong nhiều thành phần lưu lại trữ. Ví dụ, một hệ thống đặt hàng có thể kết thúc công việc bằng cách ghi tài liệu vào đại lý dữ liệu, một bạn dạng sao dữ liệu nữa sẽ tiến hành ghi vào Elasticsearch để hỗ trợ team BI phân tích dữ liệu
Tổng quan phía xử lý· CDC Data từ database lên kafka
· cách xử trí message và, đẩy tài liệu lên các vùng tài liệu khác
· trong trường hòa hợp phát sinh lỗi trong quy trình init dữ liệu, message được giữ giàng vào bảng log thực hiện gửi lại message sau đó 1 khoảng thời hạn quy định
· trong trường vừa lòng lưu log bị lỗi, message sẽ được đẩy ngược trở lại vào kafka nhằm chạy lại quy trình ban đầu
Chi tiết phía xử lýGiai đoạn 1: CDC Data tự database lên kafka
Giai đoạn 2: Xử lý tài liệu và đẩy tài liệu lên những vùng dữ liệu khác
Kafka cho phép các producer công bố các messages cơ mà consumers rất có thể đọc. Nó chỉ dễ dàng và đơn giản là một tập hợp những topic được phân thành một hoặc nhiều phân vùng (partitions). Từng phân vùng là một trong chuỗi các message được bố trí theo máy tự tuyến tính, trong đó mỗi thông báo được xác minh bằng chỉ mục của chúng. Những message cho được viết sinh sống cuối một phân vùng và những tin nhắn được consumers đọc tuần tự.
Bạn đang xem: Xử lý dữ liệu thời gian thực với Apache Kafka (Real-time Data Processing with Apache Kafka)
Flink là 1 trong những framework xử trí luồng. Phong cách thiết kế với tốc độ xử lý cao, độ trễ phải chăng của Flink rất tương xứng cho những yếu tố về thời gian và thời gian thực của sản phẩm. Luồng sự kiện không xẩy ra giới hạn hoàn toàn có thể được cách xử lý ở quy mô lớn bằng phương pháp sử dụng các hàm xử trí hoặc toán tử tiếp tục được tiến hành trong Java cùng được thân tặng Flink.
CHI TIẾT VỀ CÔNG NGHỆKafka và hệ thống xử lý dữ liệu hướng sự kiện (Event — Driven Systems)Lợi ích của giải pháp xử lý luồng và những trường hợp áp dụng Apache Kafka
Sự di chuyển sang các khối hệ thống hướng sự khiếu nại (EDS).Khi nhưng facebook, instagram cập nhật thông tin của bạn bè từng giây, từng phút hay những nền tảng gốc rễ tin tức như Twitter vẫn mải mê push notification cho bọn họ mỗi khi bao gồm tin tức mới đâu kia trên cố gắng giới. Đằng kế tiếp là những hệ thống hướng sự khiếu nại (Event — Driven System)
VD: cùng với 1 khối hệ thống xử lý dữ liệu nhiều nguồn, quy mô truyền thống thì áp dụng cần liên kết đến từng nguồn để đưa dữ liệu về xử lý, Kafka hôm nay đóng sứ mệnh trung gian làm việc giữa, từ bây giờ dữ liệu tức những nguồn không giống nhau sẽ đẩy vào kafka, ứng dụng chỉ việc kết nối đến Kafka để mang dữ liệu.
Kafka là 1 khối hệ thống messaging queue phân tán, cho phép gửi và nhận (publish & subcribe) các bạn dạng ghi theo luồng
Kafka có những tính năng khá nổi bật như:
Khả năng mở rộng: có không ít khía cạnh trong kĩ năng mở rộng của Kafka, hoàn toàn có thể kể cho như việc cung ứng mở rộng cụm kafka theo hướng ngang (tương từ như giải pháp mà Hdfs mở rộng), hay không ngừng mở rộng các thành phần thúc đẩy với các như producer, consumer… Kafka được thiết kế với để không ngừng mở rộng rất tốt.Tính chịu cài đặt và sự ổn định: Kafka hoàn toàn có thể làm việc với 1 lượng không hề nhỏ dữ liệu, nhờ cơ chế mở rộng và partitioning, một hệ thống chỉ 7–8 nodes rất có thể xử lý tới hàng chục tỉ phiên bản ghi từng ngày với tính bất biến cao.Tin cậy dữ liệu: dữ liệu đẩy vào kafka được tàng trữ phân tán, có replication, và được flush xuống ổ cứng trên từng node cùng rất cơ chế thống trị từng bạn dạng ghi thông quan tiền offset nên việc mất dữ liệu hay xử lý thiếu, sót, trùng lặp là khó xảy ra. Trong cả khi bao gồm một hoặc vài ba node trên cụm bị dừng hoạt động, những ứng dụng shop với Kafka đều phải có thể vận động bình thường.Zero-downtime: Bất kì khối hệ thống nào kể cả phân tán hay không thì vấn đề này đều cực kì quan trọng. Một hệ thống Kafka được thông số kỹ thuật tốt rất có thể vận hành với thời gian chết (downtime) = 0. Mọi chuyển động từ vấn đề thêm nguồn dữ liệu, không ngừng mở rộng cụm hay thậm chí còn nâng cấp cấu hình đều có thể thực hiện nhưng mà không làm ảnh hưởng tới dịch vụ.Một vài ba use-case thực tế:
Phát hiện gian lậu hoặc phi lý trong giao dịch thanh toán thẻ tín dụng: tin tức giao dịch của khách hàng được xử lý thông qua các mô hình Machine — Learning nhằm phát hiện không bình thường hoặc gian lận, từ đó dừng những giao dịch nghi ngờ.Hệ thống xe tự lái: các thông tin từ cảm biến được xử lý thời hạn thực để lấy ra hành vi lái xe.Gợi ý sản phẩm dựa trên hành động khách hàng: Các thao tác làm việc của người tiêu dùng trên khối hệ thống được ghi nhấn thành những sự kiện và xử trí realtime, tiếp nối phân tích và gửi ra những sản phẩm, thương mại & dịch vụ tương ứng cùng với hành vi của người tiêu dùng (Netflix sẽ sử dụng mô hình này để mang ra recommend phim thời gian thực dựa vào tìm kiếm với hành vi xem nội dung).Kiến trúc và lý lẽ cơ bản của Apache Kafka.Các mối cung cấp dữ liệu
Như đã nói ở bài trước, hệ thống sự kiện — driven chuyển phiên quanh những sự khiếu nại (event), một giao dịch mới được đặt lên trang thương mại dịch vụ điện tử, một kiện mặt hàng được xuất kho, một thanh toán tài bao gồm online vừa thực hiện hay dễ dàng hơn, chúng ta vừa post 1 tấm ảnh lên Facebook. Đó là những sự kiện, thực tế những sự kiện luôn diễn ra quanh ta. Tuy nhiên, khi gồm Big Data với các hệ thống xử lý đầy đủ lớn, chúng ta mới dần thân mật hơn đến các sự kiện, tra cứu cách thu thập và đối chiếu chúng để sở hữu được số đông giá trị mới.
Nhìn ví dụ hơn 1 chút, các nghiệp vụ liên quan đến mặt hàng của trang thương mại điện tử cơ phiên bản có những các loại sự kiện nào?
Một món hàng new được chuyển vào giao dịch, điều chỉnh tăng giá cho thành phầm A, tiết kiệm chi phí với chính sách giảm giá sản phẩm B. User đã tải hết sản phẩm trong kho và sản phẩm C phải đem về trạng thái hết hàng… Đó còn chưa kể đến các sự kiện tương quan đến khách hàng như User tìm một sản phẩm mà không có kết quả, user click vào một mặt hàng được recommend trên trang chủ hay câu hỏi thanh toán, cấp dưỡng giỏ hàng …
Những sự khiếu nại ấy diễn ra liên tục, rời rạc cơ mà lại rất có giá trị để đảm bảo an toàn hệ thống cung cấp trải nghiệm tốt nhất cho tất cả những người dùng. Đứng làm việc phía Backend, đó không chỉ là là đa dạng và phong phú về khía cạnh dữ liệu, về thời gian đáp ứng nhu cầu mà hơn không còn là số lượng sự kiện sẽ rất to lớn với rất nhiều tổ chức, công ty có không ít khách hàng. Vậy làm sao để quản ngại lý, tổ chức triển khai và sử dụng công dụng những dữ liệu ấy, làm sao để 1 sự kiện ra đời được tích lũy và cách xử lý kịp thời?
Hệ thống messaging queue phân tán như Kafka là một lựa lựa chọn rất phù hợp cho các bước này.
Như đang trình bày ở phần trước, Producer chịu trọng trách đẩy tài liệu từ nguồn phía bên ngoài vào cụm. Một đơn vị chức năng dữ liệu được nhờ cất hộ đi là Producer
Record. Bao gồmTopic (bắt buộc): thương hiệu topic nhận dữ liệuPartition (optional): Partition vào topicKey (optional): Key của phiên bản ghiValue (bắt buộc): dữ liệu của bạn dạng ghi
Trên tế bào hình, hoàn toàn có thể thấy rằng trước lúc được đẩy vào topic, 1 bạn dạng ghi vẫn trải qua 2 quá trình xử lý:
Serializer: Là câu hỏi chuyển tài liệu trên bộ nhớ Heap thành mảng byte nhằm truyền qua mạng.Partitioner: kế hoạch chọn partition cho dữ phiên bản ghi, theo mang định sẽ là roud — robin lần lượt phần nhiều trên toàn bộ partitions.Ngoài ra Kafka cung ứng sẵn qui định retry khi có lỗi trong quá trình gửi dữ liệu (Kafka cung cấp 1 vài ba option cho việc này)
Có 2 phần nên lưu ý:Khung color đỏ: Cấu hinh các kafka, bao hàm cả trình serializer cho Key cùng Value
Khung color blue: Đẩy dữ liệu vào cụm thông qua đối tượng người dùng Producer Record.
Producer Acks:
Producer vào kafka cung cấp tính năng Acks, áp dụng để bảo đảm an toàn dữ liệu từ nguồn được đẩy vào cụm theo những mode:
Acks = 0: Producer chỉ đẩy dữ liệu, không buộc phải chờ phản hồi từ Cluster, dữ liệu lỗi trong quy trình đẩy sẽ ảnh hưởng mất.Acks = 1: Producer đẩy dữ liệu xong xuôi khi Leader nhận được bản ghi không cần chờ các follower replicate dữ liệu. (là cơ chế mặc định)Acks = -1: Producer đẩy dữ liệu ngừng khi Leader dìm được phiên bản ghi và toàn thể các follower replicate đủ số lượng thông số kỹ thuật của topic.Có thể thấy rằng Acks = 0 sẽ đẩy tài liệu nhanh hơn cả do ko phải chờ đợi và xử lý các case lỗi tuy nhiên rất có thể mất mát phiên bản ghi. Mode này rất tương xứng với các dữ liệu tất cả lượng béo nhưng không thực sự quan trọng.
Acks = -1 lại là 1 trong những sự bảo đảm an toàn nhất mang đến các phiên bản ghi lúc dữ liệu sau thời điểm nhận còn bảo vệ cả số bạn dạng sao lưu giữ dự phòng, tuy nhiên mode này sẽ lừ đừ hơn những do câu hỏi sao lưu cùng kiểm tra tài liệu mất thêm thời gian, với các dữ liệu đặc biệt như thanh toán khách hàng, phải đánh đổi thời hạn để đảm bảo chất lượng dữ liệu.
Acks = 1 cân bằng giữa 0 và -1, nó đủ bình an trong số đông các trường thích hợp và nhanh hơn Acks = -1. Tuy vậy vẫn có khả năng mất mát tài liệu khi Follower còn chưa kịp sau giữ thì Leader bị down.
Kafka Brokers
Một khối hệ thống messaging queue phân tán như Kafka hay không được setup đơn lẻ. Thực tế khi tiến hành cụm Kafka Productions chứa từ vài ba đến hàng trăm server cài đặt Kafka và được thông số kỹ thuật với nhau để thành 1 cụm. Đó gọi là một trong Kafka Cluster.
Brokers là nguyên tố giữa, chứa dữ liệu từ Produces và hỗ trợ dữ liệu đến Consumers
Sâu xa rộng 1 chút, phần đông “thứ” chạy trên những server trong nhiều là những tiến trình JVM của Kafka, hay còn gọi là Broker, một Kafka Cluster bao gồm nhiều Brokers.
Dữ liệu nhận thấy được lưu trữ trên cụm Kafka được tàng trữ ở ổ đĩa nhưng Broker được cấu hình, sẽ tự động xóa sau 1 khoảng thời hạn retention time (mặc đinh là một trong tuần).
Consumers
Để đọc dữ liệu ra ship hàng xử lý, tổng vừa lòng … kafka cung cấp Kafka Consumer. Tương tự như Producer, ta cũng hoàn toàn có thể đọc dữ liệu trực tiếp thông qua console-consumer. Kafka cluster không đẩy tài liệu cho consumer, quá trình consumer sẽ dữ thế chủ động lấy dữ liệu từ hệ thống ra (pull) chứ kafka cluster không đẩy tài liệu cho consumer.
Nếu cấu hình Apache Ni
Fi đọc tài liệu từ Kafka và đẩy tài liệu sang 1 hệ thống khác như Hdfs, hôm nay Ni
Fi lại đóng vai trò là Kafka Consumer.
Consumers lấy dữ liệu từ Kafka, giao hàng xử lý, phân tích…Sự bóc tách biệt của Producers với Consumers
Kiến trúc thuở đầu của Kafka bao gồm sự bóc biệt rõ ràng giữa phần đẩy dữ liệu (producers) với phần đọc dữ liệu (consumers). Vấn đề này đem lại những ích lợi như sau:
Nếu Consumer chậm/ lỗi trọn vẹn không tác động tới Producers, vì chúng giao tiếp với nhau thông qua Kafka Cluster, từ bây giờ Kafka như một “bộ đệm” điều phối hoạt động ở hai phía.Tương tự, vì đã có bộ đệm, việc thêm 1 vài Consumer cho các nhu cầu mới cũng không ảnh hưởng đến Producers.Ngoài ra, giả dụ Consumer bị lỗi, dữ liệu vẫn an toàn trên Kafka cluser và sẵn sàng chuẩn bị được xử trí tiếp khi phục hồi lại Consumer. Tránh được việc mất mát tài liệu quan trọng.Topic, Partitions với Segment.Topic
Như trên điện thoại thông minh hay sản phẩm tính, ta cũng biến thành có các thư mục dữ liệu tàng trữ nhạc, hình ảnh, video đơn nhất đúng không? Đó là giải pháp cơ bản chúng ta tổ chức các dữ liệu có tương quan đến nhau. Nếu như Kafka cluster như một ổ cứng thì những thư mục nhạc, hình ảnh hay video đó là kafka Topic.
Đó là giải pháp nói để mọi fan dễ hình dung, thực tiễn Kafka cung cấp các tài liệu theo luồng liên tục, topic tại chỗ này được khái niệm là Streams of “related” Messages in Kafka.
Thêm 1 ví dụ ví dụ nữa nhé:
Ta tạo thành 1 topic mang tên là topic_don_hang chứa các phiên bản ghi là thông tin giao dịch được tạo nên mới. Producer vẫn đẩy tài liệu vào topic này (hay nói một cách khác là Producer publish message vào topic), ý muốn đọc dữ liệu deals ra, consumer sẽ subsribe vào topic nhằm pull dữ liệu. Với loại dữ liệu khác, ví dụ như dữ liệu thanh toán, ta sẽ nên 1 topic khác để chứa loại tài liệu này.
Kafka Cluster hoàn toàn có thể tạo không giới hạn số lượng topic.
Tương tác giữa Producer, Consumer với Topic trong khối hệ thống Kafka
Partitions với segments
Về mặt xúc tích và ngắn gọn là vậy, topic là nơi tài liệu đến cùng đi, thường cho các dữ liệu thuộc loại, tất nhiên chúng ta cũng có thể push các loại dữ liệu khác nhau vào cùng 1 topic nhưng bởi vậy sẽ không hữu dụng cho comsumer lúc ngoài vấn đề xử lý còn cần tách bóc loại message này, loại message kia…
Tuy nhiên, topic bạn dạng thân nó được cấu thành từ không ít partitions, có nghĩa là chia tài liệu thành phần nhiều độc lập. Khi tìm hiểu về Big Data, các các bạn sẽ thấy có mang partition xuất hiện không hề ít vì chia bé dại dữ liệu là cách cực tốt để tối ưu được bài toán đọc và ghi tuy vậy song.
Quay trở lại ví dụ, giả sử topic_don_hang có 2 partition là p1 và p2, nếu bạn dùng 1 producer publish phiên bản ghi bắt đầu vào, bạn dạng ghi này rất có thể nằm nghỉ ngơi p1 hoặc p2, tùy bí quyết producer đẩy (mặc định là round-robin, tức là bạn dạng ghi trước vẫn vào p1 thì phiên bản ghi sau đã vào p2 mang lại đều). Để tăng tốc, ta có thể viết 2 producer, mỗi producer đưa ra đẩy tài liệu vào 1 partition, do vậy về logic vận tốc ghi đã tiếp tục tăng gấp 2. Tương tự như với việc đọc tài liệu bằng consumer cũng trở thành tăng khi ta tăng số lượng comsumer.
Topic là khái niệm về mặt logic để tổ chức dữ liệu, kafka tất nhiên cần phải lưu trữ các bản ghi vào topic này trên những server download kafka (kafka broker), partitions là việc tổ chức dữ liệu trên các server này, mỗi partition là 1 trong thư mục đồ dùng lý, mỗi partition gồm nhiều file khác nhau, các file này đó là các file chứa dữ liệu.
Chưa dừng lại ở đó, kafka tổ chức triển khai dữ liệu bên phía trong các tệp tin này thành các đơn vị nhỏ dại hơn call là segment.
Vậy là, 1 Kafka cluster có nhiều Topic, từng topic bao gồm nhiều partition, mỗi partition là tập hợp các segment. Hơi phức hợp nhỉ, hãy coi mô trong khi sau:
Cluster, Topics, Partitions, Segments trong Kafka
Mỗi segment là 1 trong log tệp tin trên ổ đĩa, mặc định dung lượng 1 segment là 1Gb, có nghĩa là hết 1Gb dữ liệu, Kafka đang ngắt file đó và chế tác 1 file mới để cho dữ liệu mới, vớ nhiên số lượng này hoàn toàn có thể điều chỉnh được tùy thuộc vào từng hệ thống.
Giả sử 3 bản ghi màu xanh da trời đạt max dung tích 1 segment, bạn dạng ghi đồ vật 3 đang ghi vào segment bắt đầu và đó là một file chủ quyền với file lưu 3 bạn dạng ghi màu xanh.
Tổ chức tài liệu trong SegmentApache Flink
Apache Flink là một khối hệ thống xử lý phân tán với xử lý tài liệu khối (batch processing). Thành phần chủ quản của Flink cung ứng tính năng cung cấp dữ liệu, liên kết và tăng tài năng chịu lỗi cho các máy chủ trong khối hệ thống cluster.
DataStream API
Stream Processing
Stream processing có thể hiểu dễ dàng và đơn giản stream processing là xử lý dữ liệu ngay lúc nó xuất hiện thêm trong hệ thống, mở rộng hơn thì stream processing có tác dụng làm câu hỏi với luồng dữ liệu vô hạn, quá trình đo lường và tính toán xảy ra liên tục.
Như hình vẽ bên trên: từng vòng tròn đại diện cho thông tin hoặc sự kiện xảy ra trong 1 thời điểm, số lượng sự kiện là không giới hạn và dịch chuyển liên tục từ bỏ trái qua phải.
Data
Stream
Hệ thống xử lý chính của bản vẽ xây dựng Flink rất cần phải nói đến chính là Data
Streams. Những nguồn tài liệu mà Flink gồm thể chào đón đến từ bỏ một khối hệ thống Kafka, Twitter, cùng Zero
MQ. Nguồn dữ liệu sau khi xử lý được ghi vào tập tin dữ liệu hoặc ghi thông qua giao thức socket. Việc buổi giao lưu của Flink thực hiện qua phương pháp JVM trên máy toàn thể hoặc cách xử trí theo kiến trúc cluster. Vượt trình biến đổi dữ liệu được Flink cung cấp trên Data
Streams gồm những: Map, Flat
Map, Filter, Reduce, Fold, Aggregations, Window, Window
All, Window Reduce, Window Fold, Window Join, Window Co
Group, Split,Union và một trong những dạng khác.
Ứng dụng Data
Stream xử lý liên tục và trong thời gian dài, Flink cung cấp cơ chế sút thiểu khả năng lỗi hệ thống thông qua qui định Lightweight Distributed Snapshot; dựa theo phương thức hoạt động của Chandy-Lamport distributed snapshot. Flink vẫn rất có thể vẫn thường xuyên các thừa trình thống kê giám sát dữ liệu vào khi triển khai các duy trì hệ thống. Flink tiến hành kiểm tra trạng thái tài liệu để bảo đảm an toàn nó có thể phục hồi khi mở ra hư lỗi trong dữ liệu.
Data
Stream API cung ứng chức năng biến hóa (transformation) trên những luồng dữ liệu thông qua cơ chế cửa sổ (window). Fan dùng rất có thể tùy biến kích cỡ window, tần suất chào đón dữ liệu hoặc các phương thức hotline dữ liệu. Window bao gồm thể hoạt động dựa trên nhiều cơ chế điều khiển như count, time, cùng delta.
Data
Stream operation
Flink Data
Stream operation overview.
Các Operation trên Data
Stream tất cả bốn loại.
· Loại thứ nhất là thao tác bản ghi solo lẻ, ví dụ như lọc ra các bạn dạng ghi không hề mong muốn (Thao tác lọc) hoặc chuyển đổi từng bạn dạng ghi (Thao tác bản đồ).
· một số loại thứ nhị là multiple records operation. Ví dụ: nhằm đếm tổng khối lượng đơn đặt đơn hàng trong vòng ngực giờ, hãy thêm tất cả các bạn dạng ghi đơn mua hàng trong vòng ngực giờ. Để hỗ trợ loại Operation này, fan ta cần nối các phiên bản ghi quan trọng thông qua Windown để xử lý.
· loại thứ tía là vận hành nhiều luồng và chuyển bọn chúng thành một luồng duy nhất. Ví dụ: các luồng có thể được thích hợp nhất trải qua các Operation, chẳng hạn như Union, Join, Connect. Các Operation này hòa hợp nhất các luồng bằng cách sử dụng những lôgic khác nhau, nhưng sau cuối chúng tạo thành một luồng thống độc nhất vô nhị mới, bởi đó chất nhận được một số Operation xuyên luồng.
· loại thứ tứ là split operations, được cung cấp bởi Data
Stream cùng trái ngược cùng với Merge operations. Những Operation này phân chia một luồng thành những luồng theo luật lệ với từng luồng phân tách bóc là một tập hợp con của luồng trước đó, để các luồng khác biệt được xử lý khác nhau.
Ngữ cảnh của ứng dụng dựa trên buổi giao lưu của một trang dịch vụ thương mại điện tử. Khi người sử dụng thực hiện thanh toán dữ liệu hóa đơn sẽ được lưu trữ trên database (Mysql) của webapp. Khối hệ thống sẽ thu thập và xử trí dữ liệu những hóa đơn một phương pháp realtime. Dữ liệu phân tích sẽ được lưu vào 2 nơi:
Data Warehouse (Postgres): cung cấp các dữ liệu đã được xử lý tương xứng cho các đội marketing hoặc Business Inteligent xem báo cáo cũng như làm những phân tích solo giản.Data Lake (Delta Lake): tàng trữ dữ liệu để ship hàng cho đội so với dữ liệu của khách hàng (DS, DA, DE) cải tiến và phát triển các mã sản phẩm Machine Learning, Deep Learning... Ship hàng cho doanh nghiệp1.1 công dụng mang lại
Giúp đội các đội không có kiến thức những về IT (Marketing, Sale...) hoàn toàn có thể tiếp cận được với dữ liệu của công ty. Từ đó hoàn toàn có thể tự xây dụng được các report hoặc làm những phân tích sâu rộng về dữ liệu của công tyĐảm bảo tài liệu phân tích realtime mà lại không chiếm tài nguyên của DBLưu trữ và cai quản dữ liệu để cung cấp cho nhu yếu phát triển các ứng dụng có áp dụng đến tài liệu lớn
Có thể xử lý tài liệu với dung tích lớn nhờ tính scalable của Spark và Kafka.
1.2 tài liệu đầu vào
Dữ liệu đầu vào với cấu trúc như sau:
Orders:

purchaser: cất ID của công ty mua sản phẩmquantity: số lượng hàng đặtproduct_id: ID của thành phầm bánorder_number: ID của order
Ta sẽ sử dụng vòng lặp nhằm mô phỏng dữ liệu trong thực nạm được insert vào DB liên tục
Customer:

Products:

id: ID của sản phẩmname: thương hiệu sản phẩmdescription: biểu lộ sản phẩmweight: khối lượng của sản phẩmunit_price: đơn giá của sản phẩm
2 Table Products và Customer sẽ tiến hành sử dụng nhằm join cùng với table Orders trong bước xử lý bởi Spark nhằm trích xuất những dữ liệu phải thiết
1.3 Mục tiêu
Có thể vấn đáp được các câu hỏi cơ bản như:Liệt Kê 10 user đưa ra nhiều nhấtTỉ lệ tiền chiếm được của các sản phẩm đang kinh doanh chiếm bao nhiêu %Liệt kê số thành phầm được chào bán nhiều nhất
Sự dịch chuyển về giá chỉ theo ngày
Đội DE, DS, DA rất có thể sử dụng jupyter notebook nhằm đọc dữ liệu streaming và thực hiện phân tích.
2. Triển khai
2.1 Đọc tài liệu streaming

2.1.1 Kafka, Kafka Connect cùng Kafka Connector
Kafka: Kafka là một công nghệ truyền tài liệu phân tán (distributed messaging system) theo tế bào hình truyền thông public-subscribe, bên truyền dữ liệu được hotline là producer mặt subscribe nhận tài liệu theo các topic được call là consumer. Kafkacó tài năng truyền một lượng béo dữ liệu tuy nhiên trong trường thích hợp khi consumer không nhận, tài liệu vẫn được lưu trữ sao lưu lại trên queue và cả trên ổ đĩa bảo đảm an toàn.
Kafka Connect: Kafka Connect là một trong những thành phần của Kafka, dùng làm kết nối Kafka với các khối hệ thống khác như những database, file system, key-value store... Kafka Connect Cluster sẽ bóc tách biệt cùng với Kafka cluster với mục đích để hoàn toàn có thể scale những connector bên trong nó.

Kafka Connector: Kafka Connector có thiết kế để chạy vào Kafka Connect Cluster, yếu tố này sẽ tiến hành sử dụng để đọc tài liệu từ các nguồn khác vào kafka topic hoặc đọc tài liệu từ kafka topic gửi đến các nguồn khác.
2.1.2 Debezium
Debezium: là 1 source connector của Kafka Connect có tác dụng ghi nhận các sự biến hóa của database (Change Data Capture CDC). Cùng với My
SQL database, Debezium đã đọc được các sự đổi khác này trải qua binlog từ đó giảm thiểu download lên server.
Trong văn cảnh này Debezium vẫn được setup để thừa nhận được những thông tin thay đổi từ bảng orders của database inventory nên để sở hữu thể kiểm tra dữ liệu json cảm nhận từ consumer ta có thể dùng giải pháp sau:
Phần dữ liệu đổi khác nhận được trường đoản cú Debezium sẽ tiến hành đặt phía bên trong mục “payload" của chuỗi json trả về và trách nhiệm của ta là giải pháp xử lý chuỗi dữ liệu này bằng Spark Streaming.
2.1.3 Strimzi
Strimzi: chũm vì cài đặt Kafka thẳng qua helm chart và ta sẽ buộc phải tự cai quản về khía cạnh tài nguyên mang lại từng kafka cluster cũng giống như kafka-connect dường như cũng như sẽ chạm mặt nhiều trở ngại khi cài đặt các gói library mang đến kafka-connect. Strimzi là một trong những Custom Operator của Kubernetes sẽ cung ứng ta có thể tạo các component của kafka một cách đơn giản bằng các file yaml đồng thời hỗ trợ cho ta hoàn toàn có thể download các library đến connector mà không cần thiết phải build lại image.
api
Version: kafka.strimzi.io/v1beta2kind: Kafka
Connectmetadata: name: debezium-mysql-connect labels: app: mysql-debezium-strimzi annotations: strimzi.io/use-connector-resources: "true"spec: replicas: 1 bootstrap
Servers: "simple-connect-kafka-kafka-bootstrap:9092" config: group.id: debezium offset.storage.topic: debezium-mysql-offsets config.storage.topic: debezium-mysql-configs status.storage.topic: debezium-mysql-status config.storage.replication.factor: 2 offset.storage.replication.factor: 2 status.storage.replication.factor: 2 key.converter: org.apache.kafka.connect.json.Json
Converter value.converter: org.apache.kafka.connect.json.Json
Converter key.converter.schemas.enable: true value.converter.schemas.enable: true config.providers.file.class: org.apache.kafka.common.config.provider.File
Config
Provider external
Configuration: volumes: - name: connect-config secret: secret
Name: debezium-mysql-credentials build: output: type: docker image: *****/debezium-kafka-connect:v1.0 push
Secret: docker-registry-credential plugins: - name: debezium-connector-mysql artifacts: - type: tgz url: template: pod: image
Pull
Secrets: - name: docker-registry-credential---api
Version: "kafka.strimzi.io/v1beta2"kind: "Kafka
Connector"metadata: name: "inventory-connector" labels: app: mysql-debe-strim strimzi.io/cluster: debezium-mysql-connectspec: class: io.debezium.connector.mysql.My
Sql
Connector tasks
Max: 1 config: database.hostname: "192.168.***.***" database.port: "3306" database.user: "root" database.password: "debezium" database.server.id: "184054" database.server.name: "dbserver1" database.whitelist: "inventory" database.history.kafka.bootstrap.servers: "simple-connect-kafka-kafka-bootstrap:9092" database.history.kafka.topic: "schema-changes.orders" include.schema.changes: "true"
2.2 Xử lý tài liệu với Spark Structure Streaming
2.2.1 Tổng quan về Spark Structure Streaming
Apache Spark là một trong framework mã mối cung cấp mở giám sát cụm. Vận tốc xử lý của Spark đạt được do việc đo lường và thống kê được tiến hành cùng thời gian trên các máy khác nhau. Đồng thời việc đo lường được thực hiện ở bộ nhớ trong (in-memories) giỏi thực hiện hoàn toàn trên RAM.Xem thêm: Tổng hợp những bài viết số 2 lớp 11 đề 3 lớp 11 đề 3, viết bài tập làm văn số 3 lớp 11 đề 3
Về thực chất Spark sẽ không còn xử lý dữ liệu streaming như hình thức của Apache Flink, mà spark vẫn xử lý tài liệu theo từng micro batch cùng ta có thể config interval của từng batch làm thế nào cho phù hợp. Với câu hỏi mỗi micro-batch có thời gian rất nhỏ dại nên việc spark giải pháp xử lý dữ dữ liệu gần như streaming
Như hình bên trên ta hoàn toàn có thể thấy tài liệu streaming đã thêm vào trong 1 bảng không giới hạn và thời gian của từng micro-batch ta tất cả thể thiết lập được. Lấy ví dụ khi thời gian của mỗi micro-batch là 1s ta rất có thể hiểu spark streaming quản lý theo phương pháp sau
Sau khi đã bao gồm được công dụng của query thì Spark sẽ bắt buộc lưu trữ hiệu quả này vào một nơi lưu trữ nào đó theo 1 trong các 3 chính sách sau:
Complete: Spark sẽ cất giữ toàn bộ tác dụng xử lý được xem tới thời điểm gần nhấtUpdate: Spark vẫn chỉ lưu giữ lại các dữ liệu new tính tại thời điểm gần nhất. Trong trường đúng theo không thể đổi khác được tài liệu ở nơi lưu trữ thì các dữ liệu này sẽ tiến hành thêm vào như là 1 dữ liệu mới
Append: Spark vẫn chỉ lưu lại lại những dữ liệu new vào vị trí lưu trữ, tính tại thời điểm gần nhất
2.2.2 Xử lý dữ liệu orders với Spark
Như đã đề cập ở chỗ trên, sau thời điểm dữ liệu được debezium lưu giữ vào Kafka. Gói thông tin sẽ phía bên trong phần “payload” nên ta vẫn tìm giải pháp trích xuất dữ liệu này
{"schema":{"type":"struct","fields":<"type":"int32","optional":false,"field":"purchaser","type":"int32","optional":false,"field":"quantity","type":"int32","optional":false,"field":"product_id",{"type":"string","optional":true,"name":"io.debezium.time.Zoned
orders_schema = Struct
Type(< Struct
Field("purchaser", Integer
Type(), True), Struct
Field("quantity", Integer
Type(), True), Struct
Field("product_id", Integer
Type(), True), Struct
Field("order_time", Timestamp
Type(), True), Struct
Field("order_number", String
Type(), True)>)schema = Struct
Type(< Struct
Field("schema", String
Type()), Struct
Field("payload", orders_schema)>)df = spark.read
Stream \ .format("kafka") \ .option("kafka.bootstrap.servers", "simple-connect-kafka-kafka-bootstrap.nht-ns.svc.cluster.local:9092") \ .option("subscribe", "dbserver1.inventory.orders") \ .option("starting
Offsets", "latest") \ .load() \ .select(func.from_json(func.col("value").cast("string"), schema).alias("parsed_value")) \Sau khi có dữ liệu đầu vào ta bắt đầu thực hiện các xử lý cơ phiên bản như join những bảng để lâ với nhau và group
By
def processing(df, batch
ID):...# tiến hành join 2 bảng Customer với Product với phiên bản Orders với # mang ra các trường thông tin cần thiết join
DF = df.join(customer_table, customer_table.id == df.purchaser, "inner") .join(product_table, product_table.id == df.product_id, "inner") .select
Expr("order_number", "order_time", "email", "purchaser", "name as product_name", "quantity", "unit_price") # Tính tổng số tiền vàng một order cal
DF = join
DF.with
Column( "total_price", join
DF.quantity * join
DF.unit_price) # Nhóm email của người tiêu dùng với số tiền nhưng user này đã đưa ra total_spent_DF = cal
DF.group
By("email") .agg(sum("total_price").alias("total_spent") ) # team tên các sản phẩm theo tổng số lượng đã bán và tổng cộng tiền nhận ra product_DF = cal
DF.group
By("product_name") .agg(sum("quantity").alias("products_selled"), sum("total_price").alias("total_price") ) # Nhóm thời gian và tên sản phẩm theo trung bình 1-1 giá của sản phẩm product_price = cal
DF.group
By("order_time","product_name") .agg(avg("unit_price").alias("ave_unit_price") )Cuối cùng, sau khi đã xong xuôi các công đoạn xử lý dữ liệu Spark vẫn lưu lại hiệu quả xử lý vào trong 1 nơi tàng trữ khác (Postgres và Delta Lake)
...# lưu lại lại tài liệu vào Postgres Databaseproduct_price.write.format("jdbc").mode("append") \ .option("url", "jdbc:postgresql://192.168.***.***:5432/streaming_data") \ .option("driver", "org.postgresql.Driver") \ .option("dbtable", "ave_product_price") \ .option("user", "postgres") \ .option("password", "postgres") \ .save()# Lưu tài liệu vào Delta Lake cal
DF.write.format("delta").mode("append") \ .option("merge
Schema", "true") \ .save("s3a://delta-lake/customer-invoice") ...
2.3 Hiển thị, truy vấn xuất dữ liệu
Sau lúc đã tài liệu đã được xử trí và được lưu trữ tại Data warehouse với Data Lake thì các đội gớm doanh có thể truy xuất và phân tích dữ liệu realtime qua những tool BI như Tableu, Power
BI hoặc Superset
Đối với những nhóm thao tác làm việc với dữ liệu ngoài sử dụng các tool BI thì còn rất có thể sử dụng Jupyter Notebook nhằm truy xuất dữ liệu trực tiếp từ bỏ Delta Lake để mày mò sâu vào dữ liệu