カフカイベントストリーミングAIと自動化

カフカイベントストリーミングAIと自動化の魅力' (The allure of Kafka event streaming AI and automation)

Apache Kafkaは、データの移動において企業アーキテクチャで明確なリーダーとして台頭しています(DBトランザクションからイベントストリーミングへ)。Kafkaの動作原理やこのテクノロジースタックの拡張方法(オンプレミスまたはクラウド)について説明するプレゼンテーションが数多く存在します。このプロジェクトの次のフェーズでは、ChatGPTを使用してメッセージを消費し、メッセージを豊かにし、変換し、永続化するマイクロサービスを構築します。この例では、数秒ごとにJSONの温度読み取りを送信するIoTデバイス(RaspberryPi)からの入力を消費します。

メッセージを消費する

Kafkaの各イベントメッセージが生成されると(およびログに記録されると)、それぞれのメッセージを処理するためにKafkaのマイクロサービス消費者が準備されています。ChatGPTにPythonコードの生成を依頼し、特定の「トピック」からのポーリングと読み取りの基本を提供してもらいました。取得したコードは、トピック、キー、およびJSONペイロードを消費するためのスタートポイントとなります。ChatGPTが作成したコードは、SQLAlchemyを使用してこれをデータベースに永続化するためのものです。次に、JSONペイロードを変換し、ソースの温度がある範囲の外側かどうかに基づいて、検証、計算、および新しいメッセージのセットを生成するために、API Logic Server(GitHub上のオープンソースプロジェクト)のルールを使用したかったです。

注意: ChatGPTはConfluent Kafkaライブラリ(およびそれらのDocker Kafkaコンテナの使用)を選択しました-コードを変更して他のPython Kafkaライブラリを使用することもできます。

SQLAlchemyモデル

API Logic Server (ALS: Pythonのオープンソースプラットフォーム)を使用して、MySQLデータベースに接続します。ALSはテーブルを読み込み、SQLAlchemy ORMモデル、react-adminユーザーインターフェース、safrs-JSON Open API(Swagger)、およびそれぞれのORMエンドポイントの実行中のRESTウェブサービスを作成します。新しいTemperatureテーブルには、タイムスタンプ、IoTデバイスID、および温度読み取りが格納されます。ここでは、ALSコマンドラインユーティリティを使用してORMモデルを作成します:

API Logic Serverが生成したクラスは、Temperatureの値を保持するために使用されます。

変更

SQLデータベースにKafka JSON消費者メッセージを再保存する代わりに(およびルールを実行する代わりに)、JSONペイロードをアンラップ(util.row_to_entity)し、JSONペイロードを保存する代わりにTemperature テーブルに挿入します。温度読み取りごとに宣言的なルールが処理を担当します。

コンシューマーがメッセージを受信すると、セッションに追加され、commit_eventルール(以下)がトリガーされます。

宣言的なロジック:メッセージを生成する

API Logic Server(SQLAlchemy、Flask、およびLogicBankスプレッドシートのようなルールエンジン:式、合計、カウント、コピー、制約、イベントなどを使用した自動化フレームワーク)を使用して、ORMエンティティTemperatureに宣言的なcommit_eventルールを追加します。各メッセージがTemperatureテーブルに永続化されると、commit_eventルールが呼び出されます。温度読み取りがMAX_TEMPを超えるかMIN_TEMPより小さい場合、トピック“TempRangeAlert”にKafkaメッセージを送信します。正規の範囲(32132)内でデータを受け取ることを確認する制約も追加します。アラートメッセージを処理する別のイベントコンシューマーに処理を委ねます。

温度読み取りがMAX_TEMPより大きいかMIN_TEMPより小さい場合にのみアラートメッセージを生成します。ルールは常に順不同であり、仕様が変更されると導入される可能性があります。

TDD Behaveテスト

TDD(テスト駆動開発)を使用して、Behaveテストを使ってTemperatureテーブルにレコードを直接挿入し、その戻り値KafkaMessageSentをチェックすることができます。BehaveはFeature/Scenario.featureファイル)から始まります。各シナリオに対して、Behaveデコレータを使用して対応するPythonクラスを作成します。

機能の定義

TDD Pythonクラス

サマリー

KafkaのメッセージコードをConsumerとProducerの両方に生成するためにChatGPTを使用することは、良いスタート地点のようです。KafkaのためにConfluent Dockerをインストールします。API Logic Serverを使用して宣言的なロジックルールを追加し、SQLデータベースへのトランザクションの通常のフローに数式、制約、イベントを追加し、新しいKafkaメッセージを生成(および変換)することは素晴らしい組み合わせです。ChatGPTと宣言的なロジックは、”ペアプログラミング”の次のレベルです。

We will continue to update VoAGI; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

データサイエンス

「Jupyter AIに会おう Jupyterノートブックで人工知能の力を解き放つ」

人工知能(AI)とコーディングの革新的な進歩において、Project Jupyterはそのツールキットに画期的な追加を導入します。それ...

機械学習

AIの脅威:自動化された世界における見えない課題

この記事では、2023年に現れるAIの脅威、AIシステムのセキュリティーの複雑さ、そしてAI駆動の防御、規制、教育の重要性につ...

機械学習

「トランスフォーマーの簡素化:理解できる単語を使った最先端の自然言語処理(NLP)-パート2- 入力」

ドラゴンは卵から孵り、赤ちゃんはおなかから飛び出し、AIに生成されたテキストは入力から始まります私たちはみんなどこかか...

データサイエンス

オープンAIによるこの動きは、AGIへの道を開くだろう

人工知能(AI)の能力向上を目指した画期的な取り組みの一環として、OpenAIはデータパートナーシップイニシアチブを発表しま...

AIニュース

「デリー政府、提案された電子都市にAIハブを建設する計画」

技術の進歩に向けた重要な一歩として、デリー政府は提案された電子都市にAIハブを構築することを計画しています。この都市は...

機械学習

「機械学習モデルからの情報漏洩を分析し、制約するための新しいAIの理論的枠組み」

機械学習アルゴリズムは、複雑で敏感な問題に適用されることから、プライバシーとセキュリティの懸念を引き起こしています。...