Dalam latihan ini, Anda membuat aplikasi Kinesis Data Analytics Python yang mengalirkan data ke sink Amazon Simple Storage Service.
Untuk menyiapkan prasyarat yang diperlukan untuk latihan ini, selesaikan latihan Memulai [Python] terlebih dulu.
Buat Sumber Daya Dependen
Sebelum membuat aplikasi Kinesis Data Analytics untuk latihan ini, Anda membuat sumber daya dependen berikut:
Kinesis data stream [
ExampleInputStream
]Bucket Amazon S3 untuk menyimpan kode dan output aplikasi [
ka-app-code-
]
Anda dapat membuat aliran Kinesis dan bucket Amazon S3 menggunakan konsol. Untuk petunjuk membuat sumber daya ini, lihat topik berikut:
Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Beri nama aliran data
ExampleInputStream
Anda.Bagaimana Cara Membuat Bucket S3? di Panduan Developer Amazon Simple Storage Service. Beri bucket Amazon S3 nama yang unik secara global dengan menambahkan nama login Anda, seperti
ka-app-code-
.
Tulis Catatan Sampel ke Aliran Input
Di bagian ini, Anda menggunakan script Python untuk menulis catatan sampel ke aliran untuk diproses aplikasi.
Skrip Python di bagian ini menggunakan AWS CLI. Anda harus mengonfigurasi AWS CLI untuk menggunakan kredensial akun dan wilayah default Anda. Untuk mengonfigurasi AWS CLI Anda, masukkan berikut ini:
aws configure
Buat file bernama
stock.py
dengan konten berikut:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data[]: return { 'EVENT_TIME': datetime.datetime.now[].isoformat[], 'TICKER': random.choice[['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']], 'PRICE': round[random.random[] * 100, 2]} def generate[stream_name, kinesis_client]: while True: data = get_data[] print[data] kinesis_client.put_record[ StreamName=stream_name, Data=json.dumps[data], PartitionKey="partitionkey"] if __name__ == '__main__': generate[STREAM_NAME, boto3.client['kinesis']]
Jalankan skrip
stock.py
.$ python stock.py
Biarkan skrip tetap berjalan saat menyelesaikan sisa tutorial.
Unduh dan Periksa Kode Aplikasi
Kode aplikasi Python untuk contoh ini tersedia dari GitHub. Untuk mengunduh kode aplikasi, lakukan hal berikut:
Instal klien Git jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Menginstal Git.
Klon repositori jarak jauh dengan perintah berikut:
git clone //github.com/aws-samples/>amazon-kinesis-data-analytics-java-examples
Buka direktori
amazon-kinesis-data-analytics-java-examples/python/S3Sink
.
Kode aplikasi terletak di file streaming-file-sink.py
. Perhatikan hal tentang kode aplikasi berikut:
Aplikasi menggunakan sumber tabel Kinesis untuk membaca dari aliran sumber. Cuplikan berikut memanggil fungsi
create_table
untuk membuat sumber tabel Kinesis:table_env.execute_sql[ create_table[input_table_name, input_stream, input_region, stream_initpos] ]
Fungsi
create_table
menggunakan perintah SQL untuk membuat tabel yang didukung oleh sumber streaming:def create_table[table_name, stream_name, region, stream_initpos]: return """ CREATE TABLE {0} [ ticker VARCHAR[6], price DOUBLE, event_time TIMESTAMP[3], WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ] PARTITIONED BY [ticker] WITH [
'connector' = 'kinesis',
'stream' = '{1}', 'aws.region' = '{2}', 'scan.stream.initpos' = '{3}', 'sink.partitioner-field-delimiter' = ';', 'sink.producer.collection-max-count' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ] """.format[ table_name, stream_name, region, stream_initpos ]Aplikasi menggunakan konektor
filesystem
untuk mengirim catatan ke bucket Amazon S3:def create_sink_table[table_name, bucket_name]: return """ CREATE TABLE {0} [ ticker VARCHAR[6], price DOUBLE, event_time TIMESTAMP[3], WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ] PARTITIONED BY [ticker] WITH [
'connector'='filesystem'
, 'path'='s3a://{1}/', 'format'='csv', 'sink.partition-commit.policy.kind'='success-file', 'sink.partition-commit.delay' = '1 min' ] """.format[ table_name, bucket_name]Aplikasi ini menggunakan konektor Flink Kinesis, dari file
amazon-kinesis-connector-flink-2.0.0.jar
.
Kompres dan Unggah Kode Python Streaming Apache Flink
Di bagian ini, Anda mengunggah kode aplikasi ke bucket Amazon S3 yang Anda buat di bagian Buat Sumber Daya Dependen.
Gunakan aplikasi kompresi pilihan Anda untuk mengompresi file
streaming-file-sink.py
danamazon-kinesis-connector-flink-2.0.0.jar
. Beri nama arsipmyapp.zip
.Di konsol Amazon S3, pilih bucket ka-app-code-
, dan pilih Upload [Unggah].
Di langkah Pilih file, pilih Add files [Tambahkan berkas]. Navigasikan ke file
myapp.zip
yang Anda buat di langkah sebelumnya.Anda tidak perlu mengubah pengaturan apa pun untuk objek, jadi pilih Upload [Unggah].
Kode aplikasi Anda sekarang disimpan di bucket Amazon S3 yang dapat diakses aplikasi Anda.
Buat dan Jalankan Aplikasi Kinesis Data Analytics
Ikuti langkah-langkah ini untuk membuat, mengonfigurasi, memperbarui, dan menjalankan aplikasi menggunakan konsol.
Buat Aplikasi
Buka konsol Kinesis Data Analytics di //console.aws.amazon.com/kinesisanalytics.
Di dasbor Kinesis Data Analytics, pilih Create analytics application [Buat aplikasi analitik].
Di halaman Kinesis Analytics - Buat aplikasi, masukkan detail aplikasi sebagai berikut:
Untuk Application name [Nama aplikasi], masukkan
MyApplication
.Untuk Runtime, pilih Apache Flink.
Kinesis Data Analytics menggunakan Apache Flink versi 1.11.1.
Biarkan menu tarik turun versi sebagai Apache Flink 1.11 [Versi yang Direkomendasikan].
Untuk Access permissions [Izin akses], pilih Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
[Buat/perbarui IAM role ].Pilih Create application [Buat aplikasi].
Saat membuat aplikasi Kinesis Data Analytics menggunakan konsol, Anda memiliki opsi untuk memiliki IAM role dan kebijakan IAM yang dibuat untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. Sumber daya IAM ini diberi nama menggunakan nama aplikasi dan Wilayah sebagai berikut:
Kebijakan:
kinesis-analytics-service-
MyApplication
-us-west-2
Peran:
kinesis-analytics-
MyApplication
-us-west-2
Konfigurasikan Aplikasi
Di halaman MyApplication, pilih Configure [Konfigurasikan].
Di halaman Konfigurasikan aplikasi, berikan Code location [Lokasi kode]:
Untuk Bucket Amazon S3, masukkan
ka-app-code-
.Untuk Jalur ke objek Amazon S3, masukkan
myapp.zip
.
Di bawah Akses ke sumber daya aplikasi, untuk Access permissions [Izin akses], pilih Create / update IAM role
kinesis-analytics-MyApplication-us-west-2
[Pilih/perbarui IAM role ].Di bawah Properties [Properti], pilih Add group [Tambahkan grup]. Untuk ID Grup, masukkan
consumer.config.0
.Masukkan properti dan nilai aplikasi berikut:
KunciNilai input.stream.name
ExampleInputStream
aws.region
us-west-2
flink.stream.initpos
LATEST
Pilih Save [Simpan].
Di bawah Properties [Properti], pilih Add group [Tambahkan grup] lagi. Untuk ID Grup, masukkan
kinesis.analytics.flink.run.options
. Grup properti khusus ini memberi tahu aplikasi Anda tempat untuk menemukan sumber daya kodenya. Untuk informasi selengkapnya, lihat Menentukan File Kode Anda.Masukkan properti dan nilai aplikasi berikut:
KunciNilai python
streaming-file-sink.py
jarfile
amazon-kinesis-connector-flink-2.0.0.jar
Di bawah Pemantauan, pastikan Memantau tingkat metrik diatur ke Aplikasi.
Untuk Pencatatan CloudWatch, pilih kotak centang Enable [Aktifkan].
Pilih Update [Perbarui].
Ketika Anda memilih untuk mengaktifkan pencatatan CloudWatch, Kinesis Data Analytics akan membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:
Grup log:
/aws/kinesis-analytics/MyApplication
Aliran log:
kinesis-analytics-log-stream
Aliran log ini digunakan untuk memantau aplikasi. Ini bukan aliran log yang sama dengan yang digunakan aplikasi untuk mengirim hasil.
Edit Kebijakan IAM
Edit kebijakan IAM untuk menambahkan izin mengakses Kinesis data streams.
Buka konsol IAM di //console.aws.amazon.com/iam/.
Pilih Policies [Kebijakan]. Pilih kebijakan
kinesis-analytics-service-MyApplication-us-west-2
yang dibuat konsol untuk Anda di bagian sebelumnya.Di halaman Ringkasan, pilih Edit policy [Edit kebijakan]. Pilih tab JSON.
Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti ID akun sampel [
012345678901
] dengan ID akun Anda.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-
/myapp.zip" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }
, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteObjects", "Effect": "Allow", "Action": [ "s3:Abort*", "s3:DeleteObject*", "s3:GetObject*", "s3:GetBucket*", "s3:List*", "s3:ListBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::ka-app-", "arn:aws:s3:::ka-app-/*" ] }
] }
Jalankan Aplikasi
Di halaman MyApplication, pilih Run [Jalankan]. Biarkan opsi Run without snapshot [Jalankan tanpa snapshot] dipilih, dan konfirmasikan tindakan.
Ketika aplikasi berjalan, refresh halaman. Konsol menunjukkan Grafik aplikasi.
Anda dapat memeriksa metrik Kinesis Data Analytics di konsol CloudWatch untuk memastikan aplikasi berfungsi.
Pembersihan Sumber Daya AWS
Bagian ini mencakup prosedur untuk membersihkan sumber daya AWS yang dibuat dalam tutorial Jendela Geser.
Hapus aplikasi Kinesis Data Analytics Anda
Buka konsol Kinesis Data Analytics di //console.aws.amazon.com/kinesisanalytics.
Di panel Kinesis Data Analytics, pilih MyApplication.
Di halaman aplikasi, pilih Delete [Hapus], lalu konfirmasikan penghapusan.
Hapus Kinesis Data Stream Anda
Buka konsol Kinesis di //console.aws.amazon.com/kinesis.
Di panel Kinesis Data Streams, pilih ExampleInputStream.
Di halaman ExampleInputStream, pilih Delete Kinesis Stream [Hapus Aliran Kinesis], lalu konfirmasikan penghapusan.
Hapus Objek dan Bucket Amazon S3 Anda
Buka konsol Amazon S3 di //console.aws.amazon.com/s3/.
Pilih bucket ka-app-code-
.
Pilih Delete [Hapus], lalu masukkan nama bucket untuk mengonfirmasi penghapusan.
Hapus Sumber Daya IAM Anda
Buka konsol IAM di //console.aws.amazon.com/iam/.
Di bilah navigasi, pilih Policies [Kebijakan].
Di kontrol filter, masukkan kinesis.
Pilih kebijakan kinesis-analytics-service-MyApplication-
.
Pilih Policy Actions [Tindakan Kebijakan], lalu pilih Delete [Hapus].
Di bilah navigasi, pilih Roles [Peran].
Pilih peran kinesis-analytics-MyApplication-
.
Pilih Delete role [Hapus peran], lalu konfirmasi penghapusan.
Hapus Sumber Daya CloudWatch Anda
Buka konsol CloudWatch di //console.aws.amazon.com/cloudwatch/.
Di bilah navigasi, pilih Logs.
Pilih grup log /aws/kinesis-analytics/MyApplication.
Pilih Delete Log Group [Hapus Grup Log], lalu konfirmasi penghapusan.