Cara menggunakan PARTION pada Python

Dalam latihan ini, Anda membuat aplikasi Kinesis Data Analytics Python yang mengalirkan data ke sink Amazon Simple Storage Service.

Untuk menyiapkan prasyarat yang diperlukan untuk latihan ini, selesaikan latihan Memulai (Python) terlebih dulu.

Buat Sumber Daya Dependen

Sebelum membuat aplikasi Kinesis Data Analytics untuk latihan ini, Anda membuat sumber daya dependen berikut:

  • Kinesis data stream (ExampleInputStream)

  • Bucket Amazon S3 untuk menyimpan kode dan output aplikasi (ka-app-code-)

Anda dapat membuat aliran Kinesis dan bucket Amazon S3 menggunakan konsol. Untuk petunjuk membuat sumber daya ini, lihat topik berikut:

  • Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Beri nama aliran data ExampleInputStream Anda.

  • Bagaimana Cara Membuat Bucket S3? di Panduan Developer Amazon Simple Storage Service. Beri bucket Amazon S3 nama yang unik secara global dengan menambahkan nama login Anda, seperti ka-app-code-.

Tulis Catatan Sampel ke Aliran Input

Di bagian ini, Anda menggunakan script Python untuk menulis catatan sampel ke aliran untuk diproses aplikasi.

Skrip Python di bagian ini menggunakan AWS CLI. Anda harus mengonfigurasi AWS CLI untuk menggunakan kredensial akun dan wilayah default Anda. Untuk mengonfigurasi AWS CLI Anda, masukkan berikut ini:

aws configure

  1. Buat file bernama stock.py dengan konten berikut:

     
    import datetime
    import json
    import random
    import boto3
    
    STREAM_NAME = "ExampleInputStream"
    
    
    def get_data():
        return {
            'EVENT_TIME': datetime.datetime.now().isoformat(),
            'TICKER': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
            'PRICE': round(random.random() * 100, 2)}
    
    
    def generate(stream_name, kinesis_client):
        while True:
            data = get_data()
            print(data)
            kinesis_client.put_record(
                StreamName=stream_name,
                Data=json.dumps(data),
                PartitionKey="partitionkey")
    
    
    if __name__ == '__main__':
        generate(STREAM_NAME, boto3.client('kinesis'))
    
  2. Jalankan skrip stock.py.

    $ python stock.py

    Biarkan skrip tetap berjalan saat menyelesaikan sisa tutorial.

Unduh dan Periksa Kode Aplikasi

Kode aplikasi Python untuk contoh ini tersedia dari GitHub. Untuk mengunduh kode aplikasi, lakukan hal berikut:

  1. Instal klien Git jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Menginstal Git.

  2. Klon repositori jarak jauh dengan perintah berikut:

    git clone https://github.com/aws-samples/>amazon-kinesis-data-analytics-java-examples
  3. Buka direktori amazon-kinesis-data-analytics-java-examples/python/S3Sink.

Kode aplikasi terletak di file streaming-file-sink.py. Perhatikan hal tentang kode aplikasi berikut:

  • Aplikasi menggunakan sumber tabel Kinesis untuk membaca dari aliran sumber. Cuplikan berikut memanggil fungsi create_table untuk membuat sumber tabel Kinesis:

    table_env.execute_sql(
            create_table(input_table_name, input_stream, input_region, stream_initpos)
        )

    Fungsi create_table menggunakan perintah SQL untuk membuat tabel yang didukung oleh sumber streaming:

    def create_table(table_name, stream_name, region, stream_initpos):
        return """ CREATE TABLE {0} (
                    ticker VARCHAR(6),
                    price DOUBLE,
                    event_time TIMESTAMP(3),
                    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    
                  )
                  PARTITIONED BY (ticker)
                  WITH (
                    'connector' = 'kinesis',
                    'stream' = '{1}',
                    'aws.region' = '{2}',
                    'scan.stream.initpos' = '{3}',
                    'sink.partitioner-field-delimiter' = ';',
                    'sink.producer.collection-max-count' = '100',
                    'format' = 'json',
                    'json.timestamp-format.standard' = 'ISO-8601'
                  ) """.format(
            table_name, stream_name, region, stream_initpos
        )
    
  • Aplikasi menggunakan konektor filesystem untuk mengirim catatan ke bucket Amazon S3:

     def create_sink_table(table_name, bucket_name):
        return """ CREATE TABLE {0} (
                    ticker VARCHAR(6),
                    price DOUBLE,
                    event_time TIMESTAMP(3),
                    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    
                  )
                  PARTITIONED BY (ticker)
                  WITH (
                      'connector'='filesystem',
                      'path'='s3a://{1}/',
                      'format'='csv',
                      'sink.partition-commit.policy.kind'='success-file',
                      'sink.partition-commit.delay' = '1 min'
                  ) """.format(
            table_name, bucket_name)
    
  • Aplikasi ini menggunakan konektor Flink Kinesis, dari file amazon-kinesis-connector-flink-2.0.0.jar.

Di bagian ini, Anda mengunggah kode aplikasi ke bucket Amazon S3 yang Anda buat di bagian Buat Sumber Daya Dependen.

  1. Gunakan aplikasi kompresi pilihan Anda untuk mengompresi file streaming-file-sink.py dan amazon-kinesis-connector-flink-2.0.0.jar. Beri nama arsip myapp.zip.

  2. Di konsol Amazon S3, pilih bucket ka-app-code-, dan pilih Upload (Unggah).

  3. Di langkah Pilih file, pilih Add files (Tambahkan berkas). Navigasikan ke file myapp.zip yang Anda buat di langkah sebelumnya.

  4. Anda tidak perlu mengubah pengaturan apa pun untuk objek, jadi pilih Upload (Unggah).

Kode aplikasi Anda sekarang disimpan di bucket Amazon S3 yang dapat diakses aplikasi Anda.

Buat dan Jalankan Aplikasi Kinesis Data Analytics

Ikuti langkah-langkah ini untuk membuat, mengonfigurasi, memperbarui, dan menjalankan aplikasi menggunakan konsol.

Buat Aplikasi

  1. Buka konsol Kinesis Data Analytics di https://console.aws.amazon.com/kinesisanalytics.

  2. Di dasbor Kinesis Data Analytics, pilih Create analytics application (Buat aplikasi analitik).

  3. Di halaman Kinesis Analytics - Buat aplikasi, masukkan detail aplikasi sebagai berikut:

    • Untuk Application name (Nama aplikasi), masukkan MyApplication.

    • Untuk Runtime, pilih Apache Flink.

      Kinesis Data Analytics menggunakan Apache Flink versi 1.11.1.

    • Biarkan menu tarik turun versi sebagai Apache Flink 1.11 (Versi yang Direkomendasikan).

  4. Untuk Access permissions (Izin akses), pilih Create / update IAM role kinesis-analytics-MyApplication-us-west-2 (Buat/perbarui IAM role ).

  5. Pilih Create application (Buat aplikasi).

Saat membuat aplikasi Kinesis Data Analytics menggunakan konsol, Anda memiliki opsi untuk memiliki IAM role dan kebijakan IAM yang dibuat untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. Sumber daya IAM ini diberi nama menggunakan nama aplikasi dan Wilayah sebagai berikut:

  • Kebijakan: kinesis-analytics-service-MyApplication-us-west-2

  • Peran: kinesis-analytics-MyApplication-us-west-2

Konfigurasikan Aplikasi

  1. Di halaman MyApplication, pilih Configure (Konfigurasikan).

  2. Di halaman Konfigurasikan aplikasi, berikan Code location (Lokasi kode):

    • Untuk Bucket Amazon S3, masukkan ka-app-code-.

    • Untuk Jalur ke objek Amazon S3, masukkan myapp.zip.

  3. Di bawah Akses ke sumber daya aplikasi, untuk Access permissions (Izin akses), pilih Create / update IAM role kinesis-analytics-MyApplication-us-west-2 (Pilih/perbarui IAM role ).

  4. Di bawah Properties (Properti), pilih Add group (Tambahkan grup). Untuk ID Grup, masukkan consumer.config.0.

  5. Masukkan properti dan nilai aplikasi berikut:

    KunciNilai
    input.stream.name ExampleInputStream
    aws.region us-west-2
    flink.stream.initpos LATEST

    Pilih Save (Simpan).

  6. Di bawah Properties (Properti), pilih Add group (Tambahkan grup) lagi. Untuk ID Grup, masukkan kinesis.analytics.flink.run.options. Grup properti khusus ini memberi tahu aplikasi Anda tempat untuk menemukan sumber daya kodenya. Untuk informasi selengkapnya, lihat Menentukan File Kode Anda.

  7. Masukkan properti dan nilai aplikasi berikut:

    KunciNilai
    python streaming-file-sink.py
    jarfile amazon-kinesis-connector-flink-2.0.0.jar

  8. Di bawah Pemantauan, pastikan Memantau tingkat metrik diatur ke Aplikasi.

  9. Untuk Pencatatan CloudWatch, pilih kotak centang Enable (Aktifkan).

  10. Pilih Update (Perbarui).

Ketika Anda memilih untuk mengaktifkan pencatatan CloudWatch, Kinesis Data Analytics akan membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:

  • Grup log: /aws/kinesis-analytics/MyApplication

  • Aliran log: kinesis-analytics-log-stream

Aliran log ini digunakan untuk memantau aplikasi. Ini bukan aliran log yang sama dengan yang digunakan aplikasi untuk mengirim hasil.

Edit Kebijakan IAM

Edit kebijakan IAM untuk menambahkan izin mengakses Kinesis data streams.

  1. Buka konsol IAM di https://console.aws.amazon.com/iam/.

  2. Pilih Policies (Kebijakan). Pilih kebijakan kinesis-analytics-service-MyApplication-us-west-2 yang dibuat konsol untuk Anda di bagian sebelumnya.

  3. Di halaman Ringkasan, pilih Edit policy (Edit kebijakan). Pilih tab JSON.

  4. Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti ID akun sampel (012345678901) dengan ID akun Anda.

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ReadCode",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "logs:DescribeLogGroups",
                    "s3:GetObjectVersion"
                ],
                "Resource": [
                    "arn:aws:logs:us-west-2:012345678901:log-group:*",
                    "arn:aws:s3:::ka-app-code-/myapp.zip"
                ]
            },
            {
                "Sid": "DescribeLogStreams",
                "Effect": "Allow",
                "Action": "logs:DescribeLogStreams",
                "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
            },
            {
                "Sid": "PutLogEvents",
                "Effect": "Allow",
                "Action": "logs:PutLogEvents",
                "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
            },
            {
                "Sid": "ListCloudwatchLogGroups",
                "Effect": "Allow",
                "Action": [
                    "logs:DescribeLogGroups"
                ],
                "Resource": [
                    "arn:aws:logs:us-west-2:012345678901:log-group:*"
                ]
            },
            {
                "Sid": "ReadInputStream",
                "Effect": "Allow",
                "Action": "kinesis:*",
                "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
            },
            {
                "Sid": "WriteObjects",
                "Effect": "Allow",
                "Action": [
                    "s3:Abort*",
                    "s3:DeleteObject*",
                    "s3:GetObject*",
                    "s3:GetBucket*",
                    "s3:List*",
                    "s3:ListBucket",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::ka-app-",
                    "arn:aws:s3:::ka-app-/*"
                ]
            }
        ]
    }

Jalankan Aplikasi

  1. Di halaman MyApplication, pilih Run (Jalankan). Biarkan opsi Run without snapshot (Jalankan tanpa snapshot) dipilih, dan konfirmasikan tindakan.

  2. Ketika aplikasi berjalan, refresh halaman. Konsol menunjukkan Grafik aplikasi.

Cara menggunakan PARTION pada Python

Anda dapat memeriksa metrik Kinesis Data Analytics di konsol CloudWatch untuk memastikan aplikasi berfungsi.

Pembersihan Sumber Daya AWS

Bagian ini mencakup prosedur untuk membersihkan sumber daya AWS yang dibuat dalam tutorial Jendela Geser.

Hapus aplikasi Kinesis Data Analytics Anda

  1. Buka konsol Kinesis Data Analytics di https://console.aws.amazon.com/kinesisanalytics.

  2. Di panel Kinesis Data Analytics, pilih MyApplication.

  3. Di halaman aplikasi, pilih Delete (Hapus), lalu konfirmasikan penghapusan.

Hapus Kinesis Data Stream Anda

  1. Buka konsol Kinesis di https://console.aws.amazon.com/kinesis.

  2. Di panel Kinesis Data Streams, pilih ExampleInputStream.

  3. Di halaman ExampleInputStream, pilih Delete Kinesis Stream (Hapus Aliran Kinesis), lalu konfirmasikan penghapusan.

Hapus Objek dan Bucket Amazon S3 Anda

  1. Buka konsol Amazon S3 di https://console.aws.amazon.com/s3/.

  2. Pilih bucket ka-app-code-.

  3. Pilih Delete (Hapus), lalu masukkan nama bucket untuk mengonfirmasi penghapusan.

Hapus Sumber Daya IAM Anda

  1. Buka konsol IAM di https://console.aws.amazon.com/iam/.

  2. Di bilah navigasi, pilih Policies (Kebijakan).

  3. Di kontrol filter, masukkan kinesis.

  4. Pilih kebijakan kinesis-analytics-service-MyApplication-.

  5. Pilih Policy Actions (Tindakan Kebijakan), lalu pilih Delete (Hapus).

  6. Di bilah navigasi, pilih Roles (Peran).

  7. Pilih peran kinesis-analytics-MyApplication-.

  8. Pilih Delete role (Hapus peran), lalu konfirmasi penghapusan.

Hapus Sumber Daya CloudWatch Anda

  1. Buka konsol CloudWatch di https://console.aws.amazon.com/cloudwatch/.

  2. Di bilah navigasi, pilih Logs.

  3. Pilih grup log /aws/kinesis-analytics/MyApplication.

  4. Pilih Delete Log Group (Hapus Grup Log), lalu konfirmasi penghapusan.