آپاچی کافکا (Apache Kafka) یک کارگزار پیام توزیعشده محبوب است که برای مدیریت حجم زیادی از دادههای بیدرنگ طراحی شده است. یک خوشه (Cluster) کافکا بسیار مقیاسپذیر و مقاوم به خطا است. همچنین در مقایسه با سایر کارگزاران پیام مانند ActiveMQ و RabbitMQ دارای توان عملیاتی بسیار بالاتری است. اگرچه پس از نصب آپاچی کافکا از این ابزار بهعنوان یک سیستم پیامرسانی انتشار/اشتراک استفاده میشود، بسیاری از سازمانها نیز از آن برای تجمیع گزارش استفاده میکنند، زیرا ذخیرهسازی دائمی را برای پیامهای منتشرشده ارائه میدهد.
یک سیستم پیامرسانی انتشار/اشتراک به یک یا چند تولیدکننده اجازه میدهد تا پیامها را بدون در نظر گرفتن تعداد مصرفکنندگان یا نحوه پردازش پیامها منتشر کنند. مشتریان مشترک بهطور خودکار در مورد بهروزرسانیها و ایجاد پیامهای جدید مطلع میشوند. این سیستم کارآمدتر و مقیاسپذیرتر از سیستمهایی است که مشتریان بهصورت دورهای نظرسنجی میکنند تا مشخص شود آیا پیامهای جدید در دسترس هستند یا خیر. در این آموزش، نحوه نصب آپاچی کافکا 2.8.2 را روی لینوکس ابونتو 20.04 بررسی خواهیم کرد.
پیشنیازها
برای دنبال کردن مراحل آموزش شما به موارد زیر نیاز خواهید داشت:
- یک سرور مجازی ابونتو 20.04 با حداقل چهار گیگابایت رم و یک کاربر غیر روت با امتیازات sudo. اگر آپاچی کافکا را در سرورهایی با رم کمتر از چهار گیگابایت نصب کنید ممکن است سرویس کافکا از کار بیفتد.
- OpenJDK 11 باید روی سرور شما نصب شده باشد. کافکا به زبان جاوا نوشته شده است و بنابراین، به JVM نیاز دارد.
مرحله اول: ایجاد یک کاربر برای کافکا
از آنجایی که کافکا میتواند درخواست را از طریق شبکه انجام دهد، اولین قدم شما ایجاد یک کاربر اختصاصی برای این سرویس است. درصورتیکه شخصی سرور کافکا را به خطر بیندازد، احتمال آسیب رسیدن به سرور ابونتو شما به حداقل میرسد. در این مرحله با نحوه ایجاد یک کاربر اختصاصی کافکا آشنا میشوید.
بهعنوان کاربر sudo غیر روت وارد سرور مجازی لینوکس خود شوید، سپس کاربری به نام kafka ایجاد کنید:
sudo adduser kafka
برای تنظیم رمزعبور و ایجاد کاربر kafka، دستورات زیر را دنبال کنید:
سپس کاربر kafka را با دستور adduser به گروه sudo اضافه کنید. برای نصب وابستگیهای کافکا به این امتیازات نیاز دارید:
sudo adduser kafka sudo
اکنون کاربر kafka شما ایجاد شده است. با استفاده از دستور su به حساب کاربری kafka وارد شوید:
su -l kafka
اکنون که یک کاربر kafka را ایجاد کردهاید، آماده دانلود و استخراج باینریهای کافکا هستید.
مرحله دوم: دانلود و استخراج باینریهای کافکا
در این مرحله، باینریهای کافکا را دانلود و در پوشههای اختصاصی در دایرکتوری Home کاربر kafka خود استخراج میکنید.
برای شروع، یک دایرکتوری در مسیر home/kafka/ تحت عنوان Downloads بسازید تا دانلودهای شما در آنجا ذخیره شود:
mkdir ~/Downloads
برای دانلود باینری کافکا از دستور curl استفاده کنید:
curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz
یک دایرکتوری به نام kafka ایجاد کنید و به این دایرکتوری بروید. شما از این دایرکتوری بهعنوان دایرکتوری پایه برای نصب کافکا استفاده میکنید:
mkdir ~/kafka && cd ~/kafka
فایل فشردهای را که دانلود کردهاید با استفاده از دستور tar استخراج کنید:
tar -xvzf ~/Downloads/kafka.tgz --strip 1
برای اینکه مطمئن شوید محتوای فایل فشرده دقیقا در مسیر /kafka/~ استخراج میشود و نه در یک دایرکتوری دیگر (مانند /kafka/kafka_2.13-2.8.2/~) که در آن مسیر قرار دارد، از فلگ «–strip 1» استفاده میکنید.
پس از نصب Apache Kafka میتوانید پیکربندی سرور کافکا خود را شروع کنید.
مرحله سوم: پیکربندی سرور کافکا
یک تاپیک (Topic) کافکا به دسته، گروه یا نام فیلدی که پیامها را میتوان برای آن منتشر کرد اشاره دارد. بااینحال، کافکا بهطور پیشفرض امکان حذف یک تاپیک را به شما ارائه نمیدهد. برای اصلاح این مورد باید فایل پیکربندی را ویرایش کنید که در این مرحله نحوه انجام آن را توضیح میدهیم.
گزینههای پیکربندی کافکا در server.properties مشخص شده است. این فایل را با nano یا ویرایشگر متن محبوب خود باز کنید:
nano ~/kafka/config/server.properties
ابتدا، تنظیماتی را اضافه کنید که به شما اجازه میدهد تاپیک های کافکا را حذف کنید. خط زیر را به پایین فایل اضافه کنید:
delete.topic.enable = true
حالا با تغییر خصوصیت log.dirs میتوانید دایرکتوری که لاگهای کافکا در آن ذخیره میشود را تغییر دهید. خصوصیت log.dirs را پیدا کرده و مسیر موجود را با مسیر مشخصشده جایگزین کنید:
log.dirs=/home/kafka/logs
فایل را ذخیره کنید و ببندید.
اکنون که کافکا را پیکربندی کردهاید، میتوانید فایلهای یونیت system را برای سرویس کافکا ایجاد کنید. این فایلها به شما کمک میکنند تا اقدامات معمول سرویس مانند راهاندازی، توقف و ریستارت شدن کافکا را به روشی مطابق با سایر سرویسهای لینوکس انجام دهید.
کافکا برای مدیریت وضعیت کلاستر و تنظیمات خود از Zookeeper استفاده میکند. این ابزار در بسیاری از سیستمهای توزیعشده بهکار میرود و میتوانید در اسناد رسمی Zookeeper اطلاعات بیشتری را درباره این ابزار مطالعه کنید. شما از Zookeeper بهعنوان یک سرویس در کنار فایلهای یونیت استفاده خواهید کرد.
فایل یونیت را برای Zookeeper ایجاد کنید:
sudo nano /etc/systemd/system/zookeeper.service
فایل یونیت را بهاینصورت تعریف کنید:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
بخش [Unit] مشخص میکنید که Zookeeper برای شروع به کار به شبکه و سیستم فایل نیاز دارد.
بخش [Service] مشخص میکند که فایل یونیت system برای شروع به کار و توقف سرویس باید از فایلهای شل zookeeper-server-start.sh و zookeeper-server-stop.sh استفاده کند. این بخش همچنین مشخص میکند که Zookeeper در صورت توقف غیرعادی باید ریستارت شود.
پس از افزودن این محتوا، فایل را ذخیره کنید و ببندید.
در مرحله بعدی باید فایل سرویس system را برای کافکا ایجاد کنید:
sudo nano /etc/systemd/system/kafka.service
یونیت را بهاینصورت تعریف کنید:
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
بخش [Unit] مشخص میکند که این فایل یونیت به zookeeper.service وابسته است. این بخش تضمین میکند که وقتی سرویس کافکا اجرا میشود، zookeeper نیز بهصورت خودکار شروع به کار میکند.
بخش [Service] مشخص میکند که system برای شروع به کار و توقف سرویس باید از فایلهای شل kafka-server-start.sh و kafka-server-stop.sh استفاده کند. این بخش همچنین مشخص میکند که کافکا در صورت توقف غیر عادی باید حتما ریستارت شود.
فایل را ذخیره کنید و ببندید.
در این مرحله از نصب آپاچی کافکا که یونیتها را تعریف کردید، باید با استفاده از دستور زیر کافکا را اجرا کنید:
sudo systemctl start kafka
فایل را ذخیره کنید و ببندید.
اکنون که پیکربندی کافکا را انجام دادهاید، میتوانید فایلهای یونیت system را برای اجرا و فعال کردن سرور کافکا در Startup ایجاد کنید.
مرحله چهارم: ایجاد فایلهای یونیت system و راهاندازی سرور کافکا
در این قسمت فایلهای یونیت system را برای سرویس کافکا ایجاد میکنید. این فایلها به شما کمک میکنند تا اقدامات معمول سرویس مانند راهاندازی، توقف و ریستارت شدن کافکا را به روشی مطابق با سایر سرویسهای لینوکس در یک سرور مجازی لینوکس انجام دهید.
کافکا برای مدیریت وضعیت کلاستر و پیکربندی خود از Zookeeper استفاده میکند. این ابزار در بسیاری از سیستمهای توزیعشده استفاده میشود و میتوانید در اسناد رسمی Zookeeper اطلاعات بیشتری درباره این ابزار بخوانید. شما از Zookeeper بهعنوان یک سرویس در کنار این فایلهای یونیت استفاده میکنید.
فایل یونیت را برای zookeeper ایجاد کنید:
sudo nano /etc/systemd/system/zookeeper.service
فایل یونیت را بهصورت زیر تعریف کنید:
[Unit] Requires=network.target remote-fs.target After=network.target remote-fs.target [Service] Type=simple User=kafka ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
در بخش [Unit] مشخص شده است که systemd به منظور شروع و توقف سرویس باید از فایلهای شل zookeeper-server-start.sh و zookeeper-server-stop.sh استفاده کند. همچنین مشخص شده است که Zookeeper در صورت توقف غیرعادی باید ریستارت شود.
پس از افزودن این چند خط، فایل را ذخیره کنید و ببندید.
حالا باید فایل سرویس systemd را برای کافکا ایجاد کنید:
sudo nano /etc/systemd/system/kafka.service
فایل یونیت را بهصورت زیر تعریف کنید:
[Unit] Requires=zookeeper.service After=zookeeper.service [Service] Type=simple User=kafka ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1' ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
بخش [Unit] مشخص میکنید که این فایل یونیت به zookeeper.service وابسته است. بهاینترتیب تضمین میشود که zookeeper هر زمان که سرویس کافکا شروع به کار میکند، اجرا خواهد شد.
بخش [Service] مشخص میکند که systemd بهمنظور شروع و توقف سرویس باید از فایلهای شل kafka-server-start.sh و kafka-server-stop.sh استفاده کند. این بخش همچنین مشخص میکند که کافکا در صورت توقف غیرعادی حتما باید ریستارت شود.
فایل را ذخیره کنید و ببندید.
حالا که فایلهای یونیت را ایجاد کردید، با دستور زیر کافکا را اجرا کنید:
sudo systemctl start kafka
برای اطمینان از شروع موفقیتآمیز کافکا، لاگهای journal موجود برای یونیت کافکا را بررسی کنید:
sudo systemctl status kafka
با اجرای این دستور، خروجی مشابه زیر را دریافت میکنید:
Output
-
kafka.service
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset> Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago Main PID: 17770 (sh) Tasks: 69 (limit: 4677) Memory: 321.9M CGroup: /system.slice/kafka.service ├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho> └─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>
اکنون سرور کافکا بر روی پورت 9092 فهرست شده که پورت پیشفرضی است که سرور کافکا از آن استفاده میکند.
سرویس کافکا باید اجرا شده باشد. اما اگر سرور خود را ریستارت کنید، کافکا بهصورت خودکار ریستارت نمیشود. برای فعال کردن مجدد سرویس کافکا پس از بوت، از دستور زیر استفاده کنید:
sudo systemctl enable zookeeper
با اجرای دستور بالا، پاسخی مشابه زیر که توسط symlink ایجاد شده است دریافت میکنید:
Output Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.
اکنون دستور زیر را اجرا کنید:
sudo systemctl enable kafka
با اجرای دستور بالا، پاسخی مشابه زیر که توسط symlink ایجاد شده است دریافت میکنید:
Output Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.
در این مرحله، شما سرویسهای kafka و zookeeper را اجرا کردید. در مرحله بعدی، شما نصب آپاچی کافکا را بررسی میکنید.
مرحله پنجم: آزمایش نصب آپاچی کافکا
در این مرحله شما موفقیتآمیز بودن نصب Apache Kafka را آزمایش میکنید. شما یک پیام Hello World را منتشر و مصرف میکنید تا مطمئن شوید سرور کافکا عملکرد درست و مطابق انتظاری دارد.
منتشر کردن پیامها در کافکا به موارد زیر نیاز دارد:
- یک تولیدکننده (Producer) که رکوردها و داده را در تاپیکها منتشر میکند.
- یک مصرفکننده (Consumer) که پیامها و داده را از تاپیکها میخواند.
برای شروع، یک تاپیک با نام TutorialTopic ایجاد کنید:
~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic
شما میتوانید با استفاده از اسکریپت «kafka-console-producer.sh»، یک تولیدکننده را از طریق خط فرمان ایجاد کنید. این اسکریپت از شما hostname سرور کافکا، یک پورت و تاپیک موردنظر را بهعنوان آرگومان درخواست میکند.
پس از اجرای اسکریپت پاسخی مشابه زیر دریافت میکنید که نشان میدهد تاپیک ایجاد شده است.
Created topic TutorialTopic
حالا استرینگ “Hello, World” را در تاپیک TutorialTopic منتشر کنید:
echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
سپس باید با استفاده از اسکریپت «kafka-console-consumer.sh» یک مصرفکننده کافکا ایجاد کنید. این اسکریپت از شما آرگومانهای hostname و پورت سرور Zookeeper را به همراه نام تاپیک درخواست میکند. دستور زیر پیامهای منتشرشده در تاپیک TutorialTopic را مصرف میکند. به شیوه استفاده از فلگ «–from-beginning» توجه کنید؛ این فلگ اجازه میدهد پیامهایی که قبل از ایجاد مصرفکننده منتشر شدهاند نیز قابل مصرف باشد.
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning
اگر هیچ مشکلی با پیکربندی وجود نداشته باشد، شما پیام « Hello, World» را بهعنوان پاسخ در پنجره ترمینال خود دریافت میکنید:
Output Hello, World
اسکریپت در حال اجرا باقی مانده و منتظر میماند تا پیامهای بیشتری منتشر شود. برای اینکه از صحت این موضوع مطمئن شوید، یک پنجره ترمینال جدید باز کرده و در سرور خود لاگین کنید. یادتان باشد بهعنوان کاربر kafka لاگین کنید:
su -l kafka
در این ترمینال جدید، یک تولیدکننده را اجرا کنید تا پیامهای جدیدی را منتشر کند:
echo "Hello World from Sammy at Mobinhost!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null
این پیام در خروجی مصرفکننده در ترمینال اصلی شما فراخوانی میشود:
Output Hello, World Hello World from Sammy at Mobinhost!
پس از اتمام تست، دکمههای CTRL+C را فشار دهید تا اسکریپت مصرفکننده در ترمینال اصلی متوقف شود.
اکنون شما سرور کافکا را با موفقیت بر روی یک سرور مجازی لینوکس با ابونتو 20.04 نصب و پیکربندی کردهاید. در گام بعدی، شما با انجام چند کار دیگر، امنیت سرور کافکا را تقویت میکنید.
مرحله ششم) تقویت سرور کافکا
پس از اتمام نصب آپاچی کافکا شما میتوانید امتیازات ادمینی کاربر kafka را حذف کرده و امنیت سرور کافکا خود را تقویت کنید.
قبل از انجام این کار، باید از حساب کاربری kafka خارج شوید و با یک کاربر sudo غیر روت دیگر لاگین کنید. اگر هنوز هم در Session شل یکسان با سشنی که از ابتدای مقاله استفاده کردهاید حضور دارید، دستور exit را تایپ کنید.
کاربر kafka را از گروه sudo حذف کنید:
sudo deluser kafka sudo
به منظور افزایش امنیت سرور کافکا، با استفاده از دستور passwd رمزعبور کاربر kafka را قفل کنید. این کار تضمین میکند تا هیچ کاربری قادر نیست با استفاده از این حساب کاربری در سرور لاگین کند:
sudo passwd kafka -l
فلگ «-l» دستوری که امکان تغییر رمزعبور یک حساب کاربری را فراهم میکند (passwd) را قفل میکند.
در این مرحله، فقط کاربر روت یا یک کاربر sudo میتواند با استفاده از دستور زیر با حساب kafka لاگین کند:
sudo su - kafka
در آینده، اگر خواستید قابلیت تغییر رمزعبور را فعال کنید، از دستور passwd به همراه فلگ «-u» استفاده کنید:
sudo passwd kafka -u
اکنون شما توانستید با موفقیت امتیازات ادمینی کاربر kafka را محدود کنید. از این مرحله به بعد شما آمادهاید تا از کافکا استفاده کنید. همچنین انجام گام زیر اختیاری است و KafkaT را به سیستم شما اضافه میکند.
مرحله هفتم) نصب KafkaT
KafkaT به منظور بهبود توانایی شما برای مشاهده جزئیات کلاستر کافکا و انجام وظایف مدیریتی خاص از طریق خط فرمان توسعه یافته است. KafkaT با زبان Ruby نوشته شده است و به همین دلیل باید پکیج منیجر RubyGems را نصب کنید. شما همچنین به پکیج build-essential نیاز دارید تا سایر وابستگیهای KafkaT را بسازد.
با استفاده از دستور apt میتوانید Ruby و پکیج build-essential را نصب کنید:
sudo apt install ruby ruby-dev build-essential
اکنون شما میتوانید با دستور gem ابزار KafkaT را نصب کنید:
sudo CFLAGS=-Wno-error=format-overflow gem install kafkat
فلگ «Wno-error=format-overflow» برای کنترل هشدارها و خطاهایی که در فرایند نصب kafkat ایجاد میشود ضروری است.
پس از اتمام نصب، شما پاسخی مانند زیر دریافت میکنید که نشان میدهد نصب با موفقیت انجام شده است:
Output ... Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds 8 gems installed
KafkaT از «.kafkatcfg» بهعنوان فایل پیکربندی استفاده میکند تا دایرکتوریهای نصب و لاگ سرور کافکا شما را تعیین کند. «.kafkatcfg» همچنین باید دارای یک نقطه ورودی KafkaT به نمونه ZooKeeper شما باشد.
یک فایل جدید تحت عنوان «.kafkatcfg» بسازید:
nano ~/.kafkatcfg
خطوط زیر را به آن اضافه کنید. این خطوط اطلاعات لازم در خصوص سرور کافکا و نمونه Zookeeper شما را مشخص میکند.
{ "kafka_path": "~/kafka", "log_path": "/home/kafka/logs", "zk_path": "localhost:2181" }
فایل را ذخیره کنید و ببندید. حالا شما آمادهاید تا از KafkaT استفاده کنید.
برای مشاهده جزئیات بیشتر در مورد تمام پارتیشنهای Kafka از دستور زیر استفاده کنید:
kafkat partitions
با اجرای این دستور، خروجی زیر را مشاهده میکنید:
Output [DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible. /var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated ... Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...
این خروجی «TutorialTopic» و «__consumer_offsets» را شامل میشود. «__consumer_offsets» یک تاپیک داخلی است که کافکا از آن برای نگهداری اطلاعات مرتبط با کلاینت استفاده میکند. در صورت تمایل میتوانید خطوط مربوط به «__consumer_offsets» را نادیده بگیرید.
جمعبندی
با دنبال کردن مراحل این آموزش میتوانید نصب آپاچی کافکا را با موفقیت به پایان برسانید. شما حالا میتوانید با استفاده از کلاینتهای کافکا، این ابزار را در زبان برنامهنویسی دلخواهتان ادغام کنید.